10ab2e892SShuo Chen/* sort word by frequency, sorting while counting version.
2df3173cbSShuo Chen
3df3173cbSShuo Chen   1. read input files, do counting, when count map > 10M keys, output to segment files
4df3173cbSShuo Chen      word \t count  -- sorted by word
5df3173cbSShuo Chen   2. read all segment files, do merging & counting, when count map > 10M keys, output to count files, each word goes to one count file only.
6df3173cbSShuo Chen      count \t word  -- sorted by count
7df3173cbSShuo Chen   3. read all count files, do merging and output
8df3173cbSShuo Chen*/
97e3924f8SShuo Chen#include <algorithm>
107e3924f8SShuo Chen#include <fstream>
117e3924f8SShuo Chen#include <iostream>
127e3924f8SShuo Chen#include <map>
137e3924f8SShuo Chen#include <memory>
147e3924f8SShuo Chen#include <unordered_map>
157e3924f8SShuo Chen#include <vector>
167e3924f8SShuo Chen
177e3924f8SShuo Chen#ifdef STD_STRING
187e3924f8SShuo Chen#warning "STD STRING"
197e3924f8SShuo Chen#include <string>
207e3924f8SShuo Chenusing std::string;
217e3924f8SShuo Chen#else
227e3924f8SShuo Chen#include <ext/vstring.h>
237e3924f8SShuo Chentypedef __gnu_cxx::__sso_string string;
247e3924f8SShuo Chen#endif
257e3924f8SShuo Chen
260ab2e892SShuo Chen#include <sys/time.h>
277e3924f8SShuo Chen#include <unistd.h>
287e3924f8SShuo Chen
297e3924f8SShuo Chenconst size_t kMaxSize = 10 * 1000 * 1000;
307e3924f8SShuo Chen
310ab2e892SShuo Cheninline double now()
320ab2e892SShuo Chen{
330ab2e892SShuo Chen  struct timeval tv = { 0, 0 };
340ab2e892SShuo Chen  gettimeofday(&tv, nullptr);
350ab2e892SShuo Chen  return tv.tv_sec + tv.tv_usec / 1000000.0;
360ab2e892SShuo Chen}
370ab2e892SShuo Chen
387e3924f8SShuo Chenint input(int argc, char* argv[])
397e3924f8SShuo Chen{
407e3924f8SShuo Chen  int count = 0;
410ab2e892SShuo Chen  double t = now();
427e3924f8SShuo Chen  for (int i = 1; i < argc; ++i)
437e3924f8SShuo Chen  {
447e3924f8SShuo Chen    std::cout << "  processing input file " << argv[i] << std::endl;
457e3924f8SShuo Chen    std::map<string, int64_t> counts;
467e3924f8SShuo Chen    std::ifstream in(argv[i]);
477e3924f8SShuo Chen    while (in && !in.eof())
487e3924f8SShuo Chen    {
490ab2e892SShuo Chen      double tt = now();
507e3924f8SShuo Chen      counts.clear();
517e3924f8SShuo Chen      string word;
527e3924f8SShuo Chen      while (in >> word)
537e3924f8SShuo Chen      {
547e3924f8SShuo Chen        counts[word]++;
557e3924f8SShuo Chen        if (counts.size() > kMaxSize)
567e3924f8SShuo Chen        {
570ab2e892SShuo Chen          std::cout << "  split " << now() - tt << " sec" << std::endl;
587e3924f8SShuo Chen          break;
597e3924f8SShuo Chen        }
607e3924f8SShuo Chen      }
617e3924f8SShuo Chen
620ab2e892SShuo Chen      tt = now();
637e3924f8SShuo Chen      char buf[256];
647e3924f8SShuo Chen      snprintf(buf, sizeof buf, "segment-%05d", count);
657e3924f8SShuo Chen      std::ofstream out(buf);
667e3924f8SShuo Chen      ++count;
677e3924f8SShuo Chen      for (const auto& kv : counts)
687e3924f8SShuo Chen      {
697e3924f8SShuo Chen        out << kv.first << '\t' << kv.second << '\n';
707e3924f8SShuo Chen      }
710ab2e892SShuo Chen      std::cout << "  writing " << buf << " " << now() - tt << " sec" << std::endl;
727e3924f8SShuo Chen    }
737e3924f8SShuo Chen  }
740ab2e892SShuo Chen  std::cout << "reading done " << count << " segments " << now() - t << " sec" << std::endl;
757e3924f8SShuo Chen  return count;
767e3924f8SShuo Chen}
777e3924f8SShuo Chen
787e3924f8SShuo Chen// ======= combine =======
797e3924f8SShuo Chen
807e3924f8SShuo Chenclass Segment  // copyable
817e3924f8SShuo Chen{
827e3924f8SShuo Chen public:
837e3924f8SShuo Chen  string word;
847e3924f8SShuo Chen  int64_t count = 0;
857e3924f8SShuo Chen
867e3924f8SShuo Chen  explicit Segment(std::istream* in)
877e3924f8SShuo Chen    : in_(in)
887e3924f8SShuo Chen  {
897e3924f8SShuo Chen  }
907e3924f8SShuo Chen
917e3924f8SShuo Chen  bool next()
927e3924f8SShuo Chen  {
937e3924f8SShuo Chen    string line;
947e3924f8SShuo Chen    if (getline(*in_, line))
957e3924f8SShuo Chen    {
967e3924f8SShuo Chen      size_t tab = line.find('\t');
977e3924f8SShuo Chen      if (tab != string::npos)
987e3924f8SShuo Chen      {
997e3924f8SShuo Chen        word = line.substr(0, tab);
1007e3924f8SShuo Chen        count = strtol(line.c_str() + tab, NULL, 10);
1017e3924f8SShuo Chen        return true;
1027e3924f8SShuo Chen      }
1037e3924f8SShuo Chen    }
1047e3924f8SShuo Chen    return false;
1057e3924f8SShuo Chen  }
1067e3924f8SShuo Chen
1077e3924f8SShuo Chen  bool operator<(const Segment& rhs) const
1087e3924f8SShuo Chen  {
1097e3924f8SShuo Chen    return word > rhs.word;
1107e3924f8SShuo Chen  }
1117e3924f8SShuo Chen
1127e3924f8SShuo Chen private:
1137e3924f8SShuo Chen  std::istream* in_;
1147e3924f8SShuo Chen};
1157e3924f8SShuo Chen
1167e3924f8SShuo Chenvoid output(int i, const std::unordered_map<string, int64_t>& counts)
1177e3924f8SShuo Chen{
1180ab2e892SShuo Chen  double t = now();
1197e3924f8SShuo Chen  std::vector<std::pair<int64_t, string>> freq;
1207e3924f8SShuo Chen  for (const auto& entry : counts)
1217e3924f8SShuo Chen  {
1227e3924f8SShuo Chen    freq.push_back(make_pair(entry.second, entry.first));
1237e3924f8SShuo Chen  }
1247e3924f8SShuo Chen  std::sort(freq.begin(), freq.end());
1250ab2e892SShuo Chen  std::cout << "  sorting " << now() - t << " sec" << std::endl;
1267e3924f8SShuo Chen
1270ab2e892SShuo Chen  t = now();
1287e3924f8SShuo Chen  char buf[256];
1297e3924f8SShuo Chen  snprintf(buf, sizeof buf, "count-%05d", i);
1307e3924f8SShuo Chen  std::ofstream out(buf);
1317e3924f8SShuo Chen  for (auto it = freq.rbegin(); it != freq.rend(); ++it)
1327e3924f8SShuo Chen  {
1337e3924f8SShuo Chen    out << it->first << '\t' << it->second << '\n';
1347e3924f8SShuo Chen  }
1350ab2e892SShuo Chen  std::cout << "  writing " << buf << " " << now() - t << " sec" << std::endl;
1367e3924f8SShuo Chen}
1377e3924f8SShuo Chen
1387e3924f8SShuo Chenint combine(int count)
1397e3924f8SShuo Chen{
1407e3924f8SShuo Chen  std::vector<std::unique_ptr<std::ifstream>> inputs;
1417e3924f8SShuo Chen  std::vector<Segment> keys;
1427e3924f8SShuo Chen
1430ab2e892SShuo Chen  double t = now();
1440ab2e892SShuo Chen
1457e3924f8SShuo Chen  for (int i = 0; i < count; ++i)
1467e3924f8SShuo Chen  {
1477e3924f8SShuo Chen    char buf[256];
1487e3924f8SShuo Chen    snprintf(buf, sizeof buf, "segment-%05d", i);
1497e3924f8SShuo Chen    inputs.emplace_back(new std::ifstream(buf));
1507e3924f8SShuo Chen    Segment rec(inputs.back().get());
1517e3924f8SShuo Chen    if (rec.next())
1527e3924f8SShuo Chen    {
1537e3924f8SShuo Chen      keys.push_back(rec);
1547e3924f8SShuo Chen    }
1557e3924f8SShuo Chen    ::unlink(buf);
1567e3924f8SShuo Chen  }
1577e3924f8SShuo Chen
1587e3924f8SShuo Chen  // std::cout << keys.size() << '\n';
1597e3924f8SShuo Chen  int m = 0;
1607e3924f8SShuo Chen  string last;
1617e3924f8SShuo Chen  std::unordered_map<string, int64_t> counts;
1627e3924f8SShuo Chen  std::make_heap(keys.begin(), keys.end());
1630ab2e892SShuo Chen
1640ab2e892SShuo Chen  double tt = now();
1657e3924f8SShuo Chen  while (!keys.empty())
1667e3924f8SShuo Chen  {
1677e3924f8SShuo Chen    std::pop_heap(keys.begin(), keys.end());
1687e3924f8SShuo Chen
1697e3924f8SShuo Chen    if (keys.back().word != last)
1707e3924f8SShuo Chen    {
1717e3924f8SShuo Chen      last = keys.back().word;
1727e3924f8SShuo Chen      if (counts.size() > kMaxSize)
1737e3924f8SShuo Chen      {
1740ab2e892SShuo Chen        std::cout << "    split " << now() - tt << " sec" << std::endl;
1757e3924f8SShuo Chen        output(m++, counts);
1760ab2e892SShuo Chen        tt = now();
1777e3924f8SShuo Chen        counts.clear();
1787e3924f8SShuo Chen      }
1797e3924f8SShuo Chen    }
1807e3924f8SShuo Chen
1817e3924f8SShuo Chen    counts[keys.back().word] += keys.back().count;
1827e3924f8SShuo Chen
1837e3924f8SShuo Chen    if (keys.back().next())
1847e3924f8SShuo Chen    {
1857e3924f8SShuo Chen      std::push_heap(keys.begin(), keys.end());
1867e3924f8SShuo Chen    }
1877e3924f8SShuo Chen    else
1887e3924f8SShuo Chen    {
1897e3924f8SShuo Chen      keys.pop_back();
1907e3924f8SShuo Chen    }
1917e3924f8SShuo Chen  }
1927e3924f8SShuo Chen
1937e3924f8SShuo Chen  if (!counts.empty())
1947e3924f8SShuo Chen  {
1950ab2e892SShuo Chen    std::cout << "    split " << now() - tt << " sec" << std::endl;
1967e3924f8SShuo Chen    output(m++, counts);
1977e3924f8SShuo Chen  }
1980ab2e892SShuo Chen  std::cout << "combining done " << m << " count files " << now() - t << " sec" << std::endl;
1997e3924f8SShuo Chen  return m;
2007e3924f8SShuo Chen}
2017e3924f8SShuo Chen
2027e3924f8SShuo Chen// ======= merge  =======
2037e3924f8SShuo Chen
2047e3924f8SShuo Chenclass Source  // copyable
2057e3924f8SShuo Chen{
2067e3924f8SShuo Chen public:
2077e3924f8SShuo Chen  explicit Source(std::istream* in)
2087e3924f8SShuo Chen    : in_(in)
2097e3924f8SShuo Chen  {
2107e3924f8SShuo Chen  }
2117e3924f8SShuo Chen
2127e3924f8SShuo Chen  bool next()
2137e3924f8SShuo Chen  {
2147e3924f8SShuo Chen    string line;
2157e3924f8SShuo Chen    if (getline(*in_, line))
2167e3924f8SShuo Chen    {
2177e3924f8SShuo Chen      size_t tab = line.find('\t');
2187e3924f8SShuo Chen      if (tab != string::npos)
2197e3924f8SShuo Chen      {
2207e3924f8SShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
2217e3924f8SShuo Chen        if (count_ > 0)
2227e3924f8SShuo Chen        {
2237e3924f8SShuo Chen          word_ = line.substr(tab+1);
2247e3924f8SShuo Chen          return true;
2257e3924f8SShuo Chen        }
2267e3924f8SShuo Chen      }
2277e3924f8SShuo Chen    }
2287e3924f8SShuo Chen    return false;
2297e3924f8SShuo Chen  }
2307e3924f8SShuo Chen
2317e3924f8SShuo Chen  bool operator<(const Source& rhs) const
2327e3924f8SShuo Chen  {
2337e3924f8SShuo Chen    return count_ < rhs.count_;
2347e3924f8SShuo Chen  }
2357e3924f8SShuo Chen
2367e3924f8SShuo Chen  void outputTo(std::ostream& out) const
2377e3924f8SShuo Chen  {
2387e3924f8SShuo Chen    out << count_ << '\t' << word_ << '\n';
2397e3924f8SShuo Chen  }
2407e3924f8SShuo Chen
2417e3924f8SShuo Chen private:
2427e3924f8SShuo Chen  std::istream* in_;
2437e3924f8SShuo Chen  int64_t count_ = 0;
2447e3924f8SShuo Chen  string word_;
2457e3924f8SShuo Chen};
2467e3924f8SShuo Chen
2477e3924f8SShuo Chenvoid merge(int m)
2487e3924f8SShuo Chen{
2497e3924f8SShuo Chen  std::vector<std::unique_ptr<std::ifstream>> inputs;
2507e3924f8SShuo Chen  std::vector<Source> keys;
2517e3924f8SShuo Chen
2520ab2e892SShuo Chen  double t = now();
2537e3924f8SShuo Chen  for (int i = 0; i < m; ++i)
2547e3924f8SShuo Chen  {
2557e3924f8SShuo Chen    char buf[256];
2567e3924f8SShuo Chen    snprintf(buf, sizeof buf, "count-%05d", i);
2577e3924f8SShuo Chen    inputs.emplace_back(new std::ifstream(buf));
2587e3924f8SShuo Chen    Source rec(inputs.back().get());
2597e3924f8SShuo Chen    if (rec.next())
2607e3924f8SShuo Chen    {
2617e3924f8SShuo Chen      keys.push_back(rec);
2627e3924f8SShuo Chen    }
2637e3924f8SShuo Chen    ::unlink(buf);
2647e3924f8SShuo Chen  }
2657e3924f8SShuo Chen
2667e3924f8SShuo Chen  std::ofstream out("output");
2677e3924f8SShuo Chen  std::make_heap(keys.begin(), keys.end());
2687e3924f8SShuo Chen  while (!keys.empty())
2697e3924f8SShuo Chen  {
2707e3924f8SShuo Chen    std::pop_heap(keys.begin(), keys.end());
2717e3924f8SShuo Chen    keys.back().outputTo(out);
2727e3924f8SShuo Chen
2737e3924f8SShuo Chen    if (keys.back().next())
2747e3924f8SShuo Chen    {
2757e3924f8SShuo Chen      std::push_heap(keys.begin(), keys.end());
2767e3924f8SShuo Chen    }
2777e3924f8SShuo Chen    else
2787e3924f8SShuo Chen    {
2797e3924f8SShuo Chen      keys.pop_back();
2807e3924f8SShuo Chen    }
2817e3924f8SShuo Chen  }
2820ab2e892SShuo Chen  std::cout << "merging done " << now() - t << " sec\n";
2837e3924f8SShuo Chen}
2847e3924f8SShuo Chen
2857e3924f8SShuo Chenint main(int argc, char* argv[])
2867e3924f8SShuo Chen{
2877e3924f8SShuo Chen  int count = input(argc, argv);
2887e3924f8SShuo Chen  int m = combine(count);
289df3173cbSShuo Chen  merge(m);
2907e3924f8SShuo Chen}
291