word_freq_shards_basic.cc revision 0ab2e892
10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
140ab2e892SShuo Chen#include <algorithm>
150ab2e892SShuo Chen#include <fstream>
160ab2e892SShuo Chen#include <iostream>
170ab2e892SShuo Chen#include <memory>
180ab2e892SShuo Chen#include <string>
190ab2e892SShuo Chen#include <unordered_map>
200ab2e892SShuo Chen#include <vector>
210ab2e892SShuo Chen
220ab2e892SShuo Chen#include <fcntl.h>
230ab2e892SShuo Chen#include <string.h>
240ab2e892SShuo Chen#include <sys/mman.h>
250ab2e892SShuo Chen#include <sys/time.h>
260ab2e892SShuo Chen#include <unistd.h>
270ab2e892SShuo Chen
280ab2e892SShuo Chenusing std::string;
290ab2e892SShuo Chenusing std::string_view;
300ab2e892SShuo Chenusing std::vector;
310ab2e892SShuo Chenusing std::unique_ptr;
320ab2e892SShuo Chen
330ab2e892SShuo Chenint kShards = 10;
340ab2e892SShuo Chen
350ab2e892SShuo Cheninline double now()
360ab2e892SShuo Chen{
370ab2e892SShuo Chen  struct timeval tv = { 0, 0 };
380ab2e892SShuo Chen  gettimeofday(&tv, nullptr);
390ab2e892SShuo Chen  return tv.tv_sec + tv.tv_usec / 1000000.0;
400ab2e892SShuo Chen}
410ab2e892SShuo Chen
420ab2e892SShuo Chenclass OutputFile // : boost::noncopyable
430ab2e892SShuo Chen{
440ab2e892SShuo Chen public:
450ab2e892SShuo Chen  explicit OutputFile(const string& filename)
460ab2e892SShuo Chen    : file_(::fopen(filename.c_str(), "w+"))
470ab2e892SShuo Chen  {
480ab2e892SShuo Chen    assert(file_);
490ab2e892SShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
500ab2e892SShuo Chen  }
510ab2e892SShuo Chen
520ab2e892SShuo Chen  ~OutputFile()
530ab2e892SShuo Chen  {
540ab2e892SShuo Chen    close();
550ab2e892SShuo Chen  }
560ab2e892SShuo Chen
570ab2e892SShuo Chen  void append(string_view s)
580ab2e892SShuo Chen  {
590ab2e892SShuo Chen    assert(s.size() < 255);
600ab2e892SShuo Chen    uint8_t len = s.size();
610ab2e892SShuo Chen    ::fwrite(&len, 1, sizeof len, file_);
620ab2e892SShuo Chen    ::fwrite(s.data(), 1, len, file_);
630ab2e892SShuo Chen    ++items_;
640ab2e892SShuo Chen  }
650ab2e892SShuo Chen
660ab2e892SShuo Chen  /*
670ab2e892SShuo Chen  void append(uint64_t x)
680ab2e892SShuo Chen  {
690ab2e892SShuo Chen    // FIXME: htobe64(x);
700ab2e892SShuo Chen    ::fwrite(&x, 1, sizeof x, file_);
710ab2e892SShuo Chen  }
720ab2e892SShuo Chen  */
730ab2e892SShuo Chen
740ab2e892SShuo Chen  void flush()
750ab2e892SShuo Chen  {
760ab2e892SShuo Chen    ::fflush(file_);
770ab2e892SShuo Chen  }
780ab2e892SShuo Chen
790ab2e892SShuo Chen  void close()
800ab2e892SShuo Chen  {
810ab2e892SShuo Chen    if (file_)
820ab2e892SShuo Chen      ::fclose(file_);
830ab2e892SShuo Chen    file_ = nullptr;
840ab2e892SShuo Chen  }
850ab2e892SShuo Chen
860ab2e892SShuo Chen  int64_t tell()
870ab2e892SShuo Chen  {
880ab2e892SShuo Chen    return ::ftell(file_);
890ab2e892SShuo Chen  }
900ab2e892SShuo Chen
910ab2e892SShuo Chen  int fd()
920ab2e892SShuo Chen  {
930ab2e892SShuo Chen    return ::fileno(file_);
940ab2e892SShuo Chen  }
950ab2e892SShuo Chen
960ab2e892SShuo Chen  size_t items()
970ab2e892SShuo Chen  {
980ab2e892SShuo Chen    return items_;
990ab2e892SShuo Chen  }
1000ab2e892SShuo Chen
1010ab2e892SShuo Chen private:
1020ab2e892SShuo Chen  FILE* file_;
1030ab2e892SShuo Chen  char buffer_[64 * 1024];
1040ab2e892SShuo Chen  size_t items_ = 0;
1050ab2e892SShuo Chen
1060ab2e892SShuo Chen  OutputFile(const OutputFile&) = delete;
1070ab2e892SShuo Chen  void operator=(const OutputFile&) = delete;
1080ab2e892SShuo Chen};
1090ab2e892SShuo Chen
1100ab2e892SShuo Chenclass Sharder // : boost::noncopyable
1110ab2e892SShuo Chen{
1120ab2e892SShuo Chen public:
1130ab2e892SShuo Chen  Sharder()
1140ab2e892SShuo Chen    : files_(kShards)
1150ab2e892SShuo Chen  {
1160ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
1170ab2e892SShuo Chen    {
1180ab2e892SShuo Chen      char name[256];
1190ab2e892SShuo Chen      snprintf(name, sizeof name, "shard-%05d-of-%05d", i, kShards);
1200ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
1210ab2e892SShuo Chen    }
1220ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
1230ab2e892SShuo Chen  }
1240ab2e892SShuo Chen
1250ab2e892SShuo Chen  void output(string_view word)
1260ab2e892SShuo Chen  {
1270ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
1280ab2e892SShuo Chen    files_[shard]->append(word);
1290ab2e892SShuo Chen  }
1300ab2e892SShuo Chen
1310ab2e892SShuo Chen  void finish()
1320ab2e892SShuo Chen  {
1330ab2e892SShuo Chen    for (int i = 0; i < files_.size(); ++i)
1340ab2e892SShuo Chen    {
1350ab2e892SShuo Chen      printf("shard %d: %ld bytes, %ld items\n", i, files_[i]->tell(), files_[i]->items());
1360ab2e892SShuo Chen    }
1370ab2e892SShuo Chen  }
1380ab2e892SShuo Chen
1390ab2e892SShuo Chen private:
1400ab2e892SShuo Chen  std::hash<string_view> hash;
1410ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
1420ab2e892SShuo Chen};
1430ab2e892SShuo Chen
1440ab2e892SShuo Chenvoid shard(int argc, char* argv[])
1450ab2e892SShuo Chen{
1460ab2e892SShuo Chen  Sharder sharder;
1470ab2e892SShuo Chen  double t = now();
1480ab2e892SShuo Chen  for (int i = 1; i < argc; ++i)
1490ab2e892SShuo Chen  {
1500ab2e892SShuo Chen    std::cout << "  processing input file " << argv[i] << std::endl;
1510ab2e892SShuo Chen    double t = now();
1520ab2e892SShuo Chen    char line[1024];
1530ab2e892SShuo Chen    FILE* fp = fopen(argv[i], "r");
1540ab2e892SShuo Chen    char buffer[65536];
1550ab2e892SShuo Chen    ::setbuffer(fp, buffer, sizeof buffer);
1560ab2e892SShuo Chen    while (fgets(line, sizeof line, fp))
1570ab2e892SShuo Chen    {
1580ab2e892SShuo Chen      size_t len = strlen(line);
1590ab2e892SShuo Chen      if (len > 0 && line[len-1] == '\n')
1600ab2e892SShuo Chen        line[len-1] = '\0';
1610ab2e892SShuo Chen      sharder.output(line);
1620ab2e892SShuo Chen    }
1630ab2e892SShuo Chen    size_t total = ftell(fp);
1640ab2e892SShuo Chen    fclose(fp);
1650ab2e892SShuo Chen    double sec = now() - t;
1660ab2e892SShuo Chen    printf("%.3f sec %.2f MB/s\n", sec, total / sec / 1000 / 1000);
1670ab2e892SShuo Chen  }
1680ab2e892SShuo Chen  sharder.finish();
1690ab2e892SShuo Chen  std::cout << "shuffling done " << now() - t << " sec" << std::endl;
1700ab2e892SShuo Chen}
1710ab2e892SShuo Chen
1720ab2e892SShuo Chen// ======= count_shards =======
1730ab2e892SShuo Chen
1740ab2e892SShuo Chenvoid count_shard(int shard, int fd)
1750ab2e892SShuo Chen{
1760ab2e892SShuo Chen  const int64_t len = lseek(fd, 0, SEEK_END);
1770ab2e892SShuo Chen  lseek(fd, 0, SEEK_SET);
1780ab2e892SShuo Chen  double t = now();
1790ab2e892SShuo Chen  printf("shard %d: file size %ld\n", shard, len);
1800ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
1810ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
1820ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
1830ab2e892SShuo Chen  const uint8_t* const end = start + len;
1840ab2e892SShuo Chen
1850ab2e892SShuo Chen  std::unordered_map<string_view, uint64_t> items;
1860ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
1870ab2e892SShuo Chen  {
1880ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
1890ab2e892SShuo Chen    items[s]++;
1900ab2e892SShuo Chen    p += 1 + *p;
1910ab2e892SShuo Chen  }
1920ab2e892SShuo Chen  printf("  count %.3f sec %ld items\n", now() - t, items.size());
1930ab2e892SShuo Chen
1940ab2e892SShuo Chen  t = now();
1950ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
1960ab2e892SShuo Chen  for (const auto& it : items)
1970ab2e892SShuo Chen  {
1980ab2e892SShuo Chen    if (it.second > 1)
1990ab2e892SShuo Chen      counts.push_back(make_pair(it.second, it.first));
2000ab2e892SShuo Chen  }
2010ab2e892SShuo Chen  printf("  select %.3f sec %ld\n", now() - t, counts.size());
2020ab2e892SShuo Chen
2030ab2e892SShuo Chen  t = now();
2040ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
2050ab2e892SShuo Chen  printf("  sort %.3f sec\n", now() - t);
2060ab2e892SShuo Chen
2070ab2e892SShuo Chen  t = now();
2080ab2e892SShuo Chen  char buf[256];
2090ab2e892SShuo Chen  snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards);
2100ab2e892SShuo Chen  std::ofstream out(buf);
2110ab2e892SShuo Chen  for (auto it = counts.rbegin(); it != counts.rend(); ++it)
2120ab2e892SShuo Chen  {
2130ab2e892SShuo Chen    out << it->first << '\t' << it->second << '\n';
2140ab2e892SShuo Chen  }
2150ab2e892SShuo Chen  for (const auto& it : items)
2160ab2e892SShuo Chen  {
2170ab2e892SShuo Chen    if (it.second == 1)
2180ab2e892SShuo Chen    {
2190ab2e892SShuo Chen      out << "1\t" << it.first << '\n';
2200ab2e892SShuo Chen    }
2210ab2e892SShuo Chen  }
2220ab2e892SShuo Chen  printf("  output %.3f sec\n", now() - t);
2230ab2e892SShuo Chen
2240ab2e892SShuo Chen  if (munmap(mapped, len))
2250ab2e892SShuo Chen    perror("munmap");
2260ab2e892SShuo Chen}
2270ab2e892SShuo Chen
2280ab2e892SShuo Chenvoid count_shards()
2290ab2e892SShuo Chen{
2300ab2e892SShuo Chen  double t = now();
2310ab2e892SShuo Chen  for (int shard = 0; shard < kShards; ++shard)
2320ab2e892SShuo Chen  {
2330ab2e892SShuo Chen    char buf[256];
2340ab2e892SShuo Chen    snprintf(buf, sizeof buf, "shard-%05d-of-%05d", shard, kShards);
2350ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
2360ab2e892SShuo Chen    count_shard(shard, fd);
2370ab2e892SShuo Chen    ::close(fd);
2380ab2e892SShuo Chen    ::unlink(buf);
2390ab2e892SShuo Chen  }
2400ab2e892SShuo Chen  std::cout << "count done " << now() - t << " sec\n";
2410ab2e892SShuo Chen}
2420ab2e892SShuo Chen
2430ab2e892SShuo Chen// ======= merge =======
2440ab2e892SShuo Chen
2450ab2e892SShuo Chenclass Source  // copyable
2460ab2e892SShuo Chen{
2470ab2e892SShuo Chen public:
2480ab2e892SShuo Chen  explicit Source(std::istream* in)
2490ab2e892SShuo Chen    : in_(in),
2500ab2e892SShuo Chen      count_(0),
2510ab2e892SShuo Chen      word_()
2520ab2e892SShuo Chen  {
2530ab2e892SShuo Chen  }
2540ab2e892SShuo Chen
2550ab2e892SShuo Chen  bool next()
2560ab2e892SShuo Chen  {
2570ab2e892SShuo Chen    string line;
2580ab2e892SShuo Chen    if (getline(*in_, line))
2590ab2e892SShuo Chen    {
2600ab2e892SShuo Chen      size_t tab = line.find('\t');
2610ab2e892SShuo Chen      if (tab != string::npos)
2620ab2e892SShuo Chen      {
2630ab2e892SShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
2640ab2e892SShuo Chen        if (count_ > 0)
2650ab2e892SShuo Chen        {
2660ab2e892SShuo Chen          word_ = line.substr(tab+1);
2670ab2e892SShuo Chen          return true;
2680ab2e892SShuo Chen        }
2690ab2e892SShuo Chen      }
2700ab2e892SShuo Chen    }
2710ab2e892SShuo Chen    return false;
2720ab2e892SShuo Chen  }
2730ab2e892SShuo Chen
2740ab2e892SShuo Chen  bool operator<(const Source& rhs) const
2750ab2e892SShuo Chen  {
2760ab2e892SShuo Chen    return count_ < rhs.count_;
2770ab2e892SShuo Chen  }
2780ab2e892SShuo Chen
2790ab2e892SShuo Chen  void outputTo(std::ostream& out) const
2800ab2e892SShuo Chen  {
2810ab2e892SShuo Chen    out << count_ << '\t' << word_ << '\n';
2820ab2e892SShuo Chen  }
2830ab2e892SShuo Chen
2840ab2e892SShuo Chen private:
2850ab2e892SShuo Chen  std::istream* in_;
2860ab2e892SShuo Chen  int64_t count_;
2870ab2e892SShuo Chen  string word_;
2880ab2e892SShuo Chen};
2890ab2e892SShuo Chen
2900ab2e892SShuo Chenvoid merge()
2910ab2e892SShuo Chen{
2920ab2e892SShuo Chen  vector<unique_ptr<std::ifstream>> inputs;
2930ab2e892SShuo Chen  vector<Source> keys;
2940ab2e892SShuo Chen
2950ab2e892SShuo Chen  double t = now();
2960ab2e892SShuo Chen  for (int i = 0; i < kShards; ++i)
2970ab2e892SShuo Chen  {
2980ab2e892SShuo Chen    char buf[256];
2990ab2e892SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards);
3000ab2e892SShuo Chen    inputs.emplace_back(new std::ifstream(buf));
3010ab2e892SShuo Chen    Source rec(inputs.back().get());
3020ab2e892SShuo Chen    if (rec.next())
3030ab2e892SShuo Chen    {
3040ab2e892SShuo Chen      keys.push_back(rec);
3050ab2e892SShuo Chen    }
3060ab2e892SShuo Chen    ::unlink(buf);
3070ab2e892SShuo Chen  }
3080ab2e892SShuo Chen
3090ab2e892SShuo Chen  std::ofstream out("output");
3100ab2e892SShuo Chen  std::make_heap(keys.begin(), keys.end());
3110ab2e892SShuo Chen  while (!keys.empty())
3120ab2e892SShuo Chen  {
3130ab2e892SShuo Chen    std::pop_heap(keys.begin(), keys.end());
3140ab2e892SShuo Chen    keys.back().outputTo(out);
3150ab2e892SShuo Chen
3160ab2e892SShuo Chen    if (keys.back().next())
3170ab2e892SShuo Chen    {
3180ab2e892SShuo Chen      std::push_heap(keys.begin(), keys.end());
3190ab2e892SShuo Chen    }
3200ab2e892SShuo Chen    else
3210ab2e892SShuo Chen    {
3220ab2e892SShuo Chen      keys.pop_back();
3230ab2e892SShuo Chen    }
3240ab2e892SShuo Chen  }
3250ab2e892SShuo Chen  std::cout << "merging done " << now() - t << " sec\n";
3260ab2e892SShuo Chen}
3270ab2e892SShuo Chen
3280ab2e892SShuo Chenint main(int argc, char* argv[])
3290ab2e892SShuo Chen{
3300ab2e892SShuo Chen  /*
3310ab2e892SShuo Chen  kShards = 9;
3320ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
3330ab2e892SShuo Chen  double t = now();
3340ab2e892SShuo Chen  sort_shard(0, fd, 1074462684, 100030936);
3350ab2e892SShuo Chen  printf("sort_shard %.3f sec\n", now() - t);
3360ab2e892SShuo Chen  t = now();
3370ab2e892SShuo Chen  count_shard(1, fd);
3380ab2e892SShuo Chen  printf("count_shard %.3f sec\n", now() - t);
3390ab2e892SShuo Chen  /*/
3400ab2e892SShuo Chen
3410ab2e892SShuo Chen  shard(argc, argv);
3420ab2e892SShuo Chen  count_shards();
3430ab2e892SShuo Chen  merge();
3440ab2e892SShuo Chen  //*/
3450ab2e892SShuo Chen}
346