word_freq_shards_basic.cc revision 4136e585
10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
144136e585SShuo Chen#include "absl/container/flat_hash_map.h"
154136e585SShuo Chen
160ab2e892SShuo Chen#include <algorithm>
170ab2e892SShuo Chen#include <fstream>
180ab2e892SShuo Chen#include <iostream>
190ab2e892SShuo Chen#include <memory>
200ab2e892SShuo Chen#include <string>
210ab2e892SShuo Chen#include <unordered_map>
220ab2e892SShuo Chen#include <vector>
230ab2e892SShuo Chen
240ab2e892SShuo Chen#include <fcntl.h>
250ab2e892SShuo Chen#include <string.h>
260ab2e892SShuo Chen#include <sys/mman.h>
274136e585SShuo Chen#include <sys/stat.h>
280ab2e892SShuo Chen#include <sys/time.h>
294136e585SShuo Chen#include <sys/times.h>
300ab2e892SShuo Chen#include <unistd.h>
310ab2e892SShuo Chen
320ab2e892SShuo Chenusing std::string;
330ab2e892SShuo Chenusing std::string_view;
340ab2e892SShuo Chenusing std::vector;
350ab2e892SShuo Chenusing std::unique_ptr;
360ab2e892SShuo Chen
370ab2e892SShuo Chenint kShards = 10;
380ab2e892SShuo Chen
390ab2e892SShuo Cheninline double now()
400ab2e892SShuo Chen{
410ab2e892SShuo Chen  struct timeval tv = { 0, 0 };
420ab2e892SShuo Chen  gettimeofday(&tv, nullptr);
430ab2e892SShuo Chen  return tv.tv_sec + tv.tv_usec / 1000000.0;
440ab2e892SShuo Chen}
450ab2e892SShuo Chen
464136e585SShuo Chenstruct CpuTime
474136e585SShuo Chen{
484136e585SShuo Chen  double userSeconds = 0.0;
494136e585SShuo Chen  double systemSeconds = 0.0;
504136e585SShuo Chen
514136e585SShuo Chen  double total() const { return userSeconds + systemSeconds; }
524136e585SShuo Chen};
534136e585SShuo Chen
544136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK));
554136e585SShuo Chen
564136e585SShuo ChenCpuTime cpuTime()
574136e585SShuo Chen{
584136e585SShuo Chen  CpuTime t;
594136e585SShuo Chen  struct tms tms;
604136e585SShuo Chen  if (::times(&tms) >= 0)
614136e585SShuo Chen  {
624136e585SShuo Chen    const double hz = static_cast<double>(g_clockTicks);
634136e585SShuo Chen    t.userSeconds = static_cast<double>(tms.tms_utime) / hz;
644136e585SShuo Chen    t.systemSeconds = static_cast<double>(tms.tms_stime) / hz;
654136e585SShuo Chen  }
664136e585SShuo Chen  return t;
674136e585SShuo Chen}
684136e585SShuo Chen
694136e585SShuo Chenclass Timer
704136e585SShuo Chen{
714136e585SShuo Chen public:
724136e585SShuo Chen  Timer()
734136e585SShuo Chen    : start_(now()),
744136e585SShuo Chen      start_cpu_(cpuTime())
754136e585SShuo Chen  {
764136e585SShuo Chen  }
774136e585SShuo Chen
784136e585SShuo Chen  void report(int64_t bytes) const
794136e585SShuo Chen  {
804136e585SShuo Chen    CpuTime end_cpu(cpuTime());
814136e585SShuo Chen    double end = now();
824136e585SShuo Chen    printf("%.3f real  %.3f cpu  %.2f MiB/s  %ld bytes\n",
834136e585SShuo Chen           end - start_, end_cpu.total() - start_cpu_.total(),
844136e585SShuo Chen           bytes / (end - start_) / 1024 / 1024, bytes);
854136e585SShuo Chen  }
864136e585SShuo Chen private:
874136e585SShuo Chen  const double start_ = 0;
884136e585SShuo Chen  const CpuTime start_cpu_;
894136e585SShuo Chen};
904136e585SShuo Chen
910ab2e892SShuo Chenclass OutputFile // : boost::noncopyable
920ab2e892SShuo Chen{
930ab2e892SShuo Chen public:
940ab2e892SShuo Chen  explicit OutputFile(const string& filename)
950ab2e892SShuo Chen    : file_(::fopen(filename.c_str(), "w+"))
960ab2e892SShuo Chen  {
970ab2e892SShuo Chen    assert(file_);
980ab2e892SShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
990ab2e892SShuo Chen  }
1000ab2e892SShuo Chen
1010ab2e892SShuo Chen  ~OutputFile()
1020ab2e892SShuo Chen  {
1030ab2e892SShuo Chen    close();
1040ab2e892SShuo Chen  }
1050ab2e892SShuo Chen
1060ab2e892SShuo Chen  void append(string_view s)
1070ab2e892SShuo Chen  {
1080ab2e892SShuo Chen    assert(s.size() < 255);
1090ab2e892SShuo Chen    uint8_t len = s.size();
1100ab2e892SShuo Chen    ::fwrite(&len, 1, sizeof len, file_);
1110ab2e892SShuo Chen    ::fwrite(s.data(), 1, len, file_);
1120ab2e892SShuo Chen    ++items_;
1130ab2e892SShuo Chen  }
1140ab2e892SShuo Chen
1150ab2e892SShuo Chen  /*
1160ab2e892SShuo Chen  void append(uint64_t x)
1170ab2e892SShuo Chen  {
1180ab2e892SShuo Chen    // FIXME: htobe64(x);
1190ab2e892SShuo Chen    ::fwrite(&x, 1, sizeof x, file_);
1200ab2e892SShuo Chen  }
1210ab2e892SShuo Chen  */
1220ab2e892SShuo Chen
1230ab2e892SShuo Chen  void flush()
1240ab2e892SShuo Chen  {
1250ab2e892SShuo Chen    ::fflush(file_);
1260ab2e892SShuo Chen  }
1270ab2e892SShuo Chen
1280ab2e892SShuo Chen  void close()
1290ab2e892SShuo Chen  {
1300ab2e892SShuo Chen    if (file_)
1310ab2e892SShuo Chen      ::fclose(file_);
1320ab2e892SShuo Chen    file_ = nullptr;
1330ab2e892SShuo Chen  }
1340ab2e892SShuo Chen
1350ab2e892SShuo Chen  int64_t tell()
1360ab2e892SShuo Chen  {
1370ab2e892SShuo Chen    return ::ftell(file_);
1380ab2e892SShuo Chen  }
1390ab2e892SShuo Chen
1400ab2e892SShuo Chen  int fd()
1410ab2e892SShuo Chen  {
1420ab2e892SShuo Chen    return ::fileno(file_);
1430ab2e892SShuo Chen  }
1440ab2e892SShuo Chen
1450ab2e892SShuo Chen  size_t items()
1460ab2e892SShuo Chen  {
1470ab2e892SShuo Chen    return items_;
1480ab2e892SShuo Chen  }
1490ab2e892SShuo Chen
1500ab2e892SShuo Chen private:
1510ab2e892SShuo Chen  FILE* file_;
1520ab2e892SShuo Chen  char buffer_[64 * 1024];
1530ab2e892SShuo Chen  size_t items_ = 0;
1540ab2e892SShuo Chen
1550ab2e892SShuo Chen  OutputFile(const OutputFile&) = delete;
1560ab2e892SShuo Chen  void operator=(const OutputFile&) = delete;
1570ab2e892SShuo Chen};
1580ab2e892SShuo Chen
1590ab2e892SShuo Chenclass Sharder // : boost::noncopyable
1600ab2e892SShuo Chen{
1610ab2e892SShuo Chen public:
1620ab2e892SShuo Chen  Sharder()
1630ab2e892SShuo Chen    : files_(kShards)
1640ab2e892SShuo Chen  {
1650ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
1660ab2e892SShuo Chen    {
1670ab2e892SShuo Chen      char name[256];
1680ab2e892SShuo Chen      snprintf(name, sizeof name, "shard-%05d-of-%05d", i, kShards);
1690ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
1700ab2e892SShuo Chen    }
1710ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
1720ab2e892SShuo Chen  }
1730ab2e892SShuo Chen
1740ab2e892SShuo Chen  void output(string_view word)
1750ab2e892SShuo Chen  {
1760ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
1770ab2e892SShuo Chen    files_[shard]->append(word);
1780ab2e892SShuo Chen  }
1790ab2e892SShuo Chen
1800ab2e892SShuo Chen  void finish()
1810ab2e892SShuo Chen  {
1824136e585SShuo Chen    int shard = 0;
1834136e585SShuo Chen    for (const auto& file : files_)
1840ab2e892SShuo Chen    {
1854136e585SShuo Chen      printf("  shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items());
1864136e585SShuo Chen      ++shard;
1874136e585SShuo Chen      file->close();
1880ab2e892SShuo Chen    }
1890ab2e892SShuo Chen  }
1900ab2e892SShuo Chen
1910ab2e892SShuo Chen private:
1920ab2e892SShuo Chen  std::hash<string_view> hash;
1930ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
1940ab2e892SShuo Chen};
1950ab2e892SShuo Chen
1964136e585SShuo Chenint64_t shard_(int argc, char* argv[])
1970ab2e892SShuo Chen{
1980ab2e892SShuo Chen  Sharder sharder;
1994136e585SShuo Chen  Timer timer;
2004136e585SShuo Chen  int64_t total = 0;
2010ab2e892SShuo Chen  for (int i = 1; i < argc; ++i)
2020ab2e892SShuo Chen  {
2030ab2e892SShuo Chen    std::cout << "  processing input file " << argv[i] << std::endl;
2040ab2e892SShuo Chen    double t = now();
2050ab2e892SShuo Chen    char line[1024];
2060ab2e892SShuo Chen    FILE* fp = fopen(argv[i], "r");
2070ab2e892SShuo Chen    char buffer[65536];
2080ab2e892SShuo Chen    ::setbuffer(fp, buffer, sizeof buffer);
2090ab2e892SShuo Chen    while (fgets(line, sizeof line, fp))
2100ab2e892SShuo Chen    {
2110ab2e892SShuo Chen      size_t len = strlen(line);
2120ab2e892SShuo Chen      if (len > 0 && line[len-1] == '\n')
2130ab2e892SShuo Chen        line[len-1] = '\0';
2140ab2e892SShuo Chen      sharder.output(line);
2150ab2e892SShuo Chen    }
2164136e585SShuo Chen    size_t len = ftell(fp);
2170ab2e892SShuo Chen    fclose(fp);
2184136e585SShuo Chen    total += len;
2190ab2e892SShuo Chen    double sec = now() - t;
2204136e585SShuo Chen    printf("%.3f sec %.2f MiB/s\n", sec, len / sec / 1024 / 1024);
2210ab2e892SShuo Chen  }
2220ab2e892SShuo Chen  sharder.finish();
2234136e585SShuo Chen  printf("sharding done ");
2244136e585SShuo Chen  timer.report(total);
2254136e585SShuo Chen  return total;
2260ab2e892SShuo Chen}
2270ab2e892SShuo Chen
2280ab2e892SShuo Chen// ======= count_shards =======
2290ab2e892SShuo Chen
2304136e585SShuo Chenint64_t count_shard(int shard, int fd)
2310ab2e892SShuo Chen{
2320ab2e892SShuo Chen  const int64_t len = lseek(fd, 0, SEEK_END);
2330ab2e892SShuo Chen  lseek(fd, 0, SEEK_SET);
2340ab2e892SShuo Chen  double t = now();
2350ab2e892SShuo Chen  printf("shard %d: file size %ld\n", shard, len);
2364136e585SShuo Chen  {
2370ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
2380ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
2390ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
2400ab2e892SShuo Chen  const uint8_t* const end = start + len;
2410ab2e892SShuo Chen
2424136e585SShuo Chen  // std::unordered_map<string_view, uint64_t> items;
2434136e585SShuo Chen  absl::flat_hash_map<string_view, uint64_t> items;
2440ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
2450ab2e892SShuo Chen  {
2460ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
2470ab2e892SShuo Chen    items[s]++;
2480ab2e892SShuo Chen    p += 1 + *p;
2490ab2e892SShuo Chen  }
2500ab2e892SShuo Chen  printf("  count %.3f sec %ld items\n", now() - t, items.size());
2510ab2e892SShuo Chen
2520ab2e892SShuo Chen  t = now();
2530ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
2540ab2e892SShuo Chen  for (const auto& it : items)
2550ab2e892SShuo Chen  {
2560ab2e892SShuo Chen    if (it.second > 1)
2570ab2e892SShuo Chen      counts.push_back(make_pair(it.second, it.first));
2580ab2e892SShuo Chen  }
2590ab2e892SShuo Chen  printf("  select %.3f sec %ld\n", now() - t, counts.size());
2600ab2e892SShuo Chen
2610ab2e892SShuo Chen  t = now();
2620ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
2630ab2e892SShuo Chen  printf("  sort %.3f sec\n", now() - t);
2640ab2e892SShuo Chen
2650ab2e892SShuo Chen  t = now();
2660ab2e892SShuo Chen  {
2674136e585SShuo Chen    char buf[256];
2684136e585SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards);
2694136e585SShuo Chen    std::ofstream out(buf);
2704136e585SShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
2710ab2e892SShuo Chen    {
2724136e585SShuo Chen      out << it->first << '\t' << it->second << '\n';
2734136e585SShuo Chen    }
2744136e585SShuo Chen    for (const auto& it : items)
2754136e585SShuo Chen    {
2764136e585SShuo Chen      if (it.second == 1)
2774136e585SShuo Chen      {
2784136e585SShuo Chen        out << "1\t" << it.first << '\n';
2794136e585SShuo Chen      }
2800ab2e892SShuo Chen    }
2810ab2e892SShuo Chen  }
2820ab2e892SShuo Chen  printf("  output %.3f sec\n", now() - t);
2830ab2e892SShuo Chen
2844136e585SShuo Chen  t = now();
2850ab2e892SShuo Chen  if (munmap(mapped, len))
2860ab2e892SShuo Chen    perror("munmap");
2874136e585SShuo Chen  }
2884136e585SShuo Chen  printf("  destruct %.3f sec\n", now() - t);
2894136e585SShuo Chen  return len;
2900ab2e892SShuo Chen}
2910ab2e892SShuo Chen
2920ab2e892SShuo Chenvoid count_shards()
2930ab2e892SShuo Chen{
2944136e585SShuo Chen  Timer timer;
2954136e585SShuo Chen  int64_t total = 0;
2960ab2e892SShuo Chen  for (int shard = 0; shard < kShards; ++shard)
2970ab2e892SShuo Chen  {
2980ab2e892SShuo Chen    char buf[256];
2990ab2e892SShuo Chen    snprintf(buf, sizeof buf, "shard-%05d-of-%05d", shard, kShards);
3000ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
3014136e585SShuo Chen    double t = now();
3024136e585SShuo Chen    int64_t len = count_shard(shard, fd);
3030ab2e892SShuo Chen    ::close(fd);
3040ab2e892SShuo Chen    ::unlink(buf);
3054136e585SShuo Chen    total += len;
3064136e585SShuo Chen    printf("shard %d: %.2f MiB/s\n", shard, len / (now() - t) / 1024 / 1024);
3070ab2e892SShuo Chen  }
3084136e585SShuo Chen  printf("count done ");
3094136e585SShuo Chen  timer.report(total);
3100ab2e892SShuo Chen}
3110ab2e892SShuo Chen
3120ab2e892SShuo Chen// ======= merge =======
3130ab2e892SShuo Chen
3140ab2e892SShuo Chenclass Source  // copyable
3150ab2e892SShuo Chen{
3160ab2e892SShuo Chen public:
3170ab2e892SShuo Chen  explicit Source(std::istream* in)
3180ab2e892SShuo Chen    : in_(in),
3190ab2e892SShuo Chen      count_(0),
3200ab2e892SShuo Chen      word_()
3210ab2e892SShuo Chen  {
3220ab2e892SShuo Chen  }
3230ab2e892SShuo Chen
3240ab2e892SShuo Chen  bool next()
3250ab2e892SShuo Chen  {
3260ab2e892SShuo Chen    string line;
3270ab2e892SShuo Chen    if (getline(*in_, line))
3280ab2e892SShuo Chen    {
3290ab2e892SShuo Chen      size_t tab = line.find('\t');
3300ab2e892SShuo Chen      if (tab != string::npos)
3310ab2e892SShuo Chen      {
3320ab2e892SShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
3330ab2e892SShuo Chen        if (count_ > 0)
3340ab2e892SShuo Chen        {
3350ab2e892SShuo Chen          word_ = line.substr(tab+1);
3360ab2e892SShuo Chen          return true;
3370ab2e892SShuo Chen        }
3380ab2e892SShuo Chen      }
3390ab2e892SShuo Chen    }
3400ab2e892SShuo Chen    return false;
3410ab2e892SShuo Chen  }
3420ab2e892SShuo Chen
3430ab2e892SShuo Chen  bool operator<(const Source& rhs) const
3440ab2e892SShuo Chen  {
3450ab2e892SShuo Chen    return count_ < rhs.count_;
3460ab2e892SShuo Chen  }
3470ab2e892SShuo Chen
3480ab2e892SShuo Chen  void outputTo(std::ostream& out) const
3490ab2e892SShuo Chen  {
3500ab2e892SShuo Chen    out << count_ << '\t' << word_ << '\n';
3510ab2e892SShuo Chen  }
3520ab2e892SShuo Chen
3530ab2e892SShuo Chen private:
3540ab2e892SShuo Chen  std::istream* in_;
3550ab2e892SShuo Chen  int64_t count_;
3560ab2e892SShuo Chen  string word_;
3570ab2e892SShuo Chen};
3580ab2e892SShuo Chen
3590ab2e892SShuo Chenvoid merge()
3600ab2e892SShuo Chen{
3614136e585SShuo Chen  Timer timer;
3620ab2e892SShuo Chen  vector<unique_ptr<std::ifstream>> inputs;
3630ab2e892SShuo Chen  vector<Source> keys;
3640ab2e892SShuo Chen
3654136e585SShuo Chen  int64_t total = 0;
3660ab2e892SShuo Chen  for (int i = 0; i < kShards; ++i)
3670ab2e892SShuo Chen  {
3680ab2e892SShuo Chen    char buf[256];
3690ab2e892SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards);
3704136e585SShuo Chen    struct stat st;
3714136e585SShuo Chen    ::stat(buf, &st);
3724136e585SShuo Chen    total += st.st_size;
3730ab2e892SShuo Chen    inputs.emplace_back(new std::ifstream(buf));
3740ab2e892SShuo Chen    Source rec(inputs.back().get());
3750ab2e892SShuo Chen    if (rec.next())
3760ab2e892SShuo Chen    {
3770ab2e892SShuo Chen      keys.push_back(rec);
3780ab2e892SShuo Chen    }
3790ab2e892SShuo Chen    ::unlink(buf);
3800ab2e892SShuo Chen  }
3810ab2e892SShuo Chen
3824136e585SShuo Chen  {
3830ab2e892SShuo Chen  std::ofstream out("output");
3840ab2e892SShuo Chen  std::make_heap(keys.begin(), keys.end());
3850ab2e892SShuo Chen  while (!keys.empty())
3860ab2e892SShuo Chen  {
3870ab2e892SShuo Chen    std::pop_heap(keys.begin(), keys.end());
3880ab2e892SShuo Chen    keys.back().outputTo(out);
3890ab2e892SShuo Chen
3900ab2e892SShuo Chen    if (keys.back().next())
3910ab2e892SShuo Chen    {
3920ab2e892SShuo Chen      std::push_heap(keys.begin(), keys.end());
3930ab2e892SShuo Chen    }
3940ab2e892SShuo Chen    else
3950ab2e892SShuo Chen    {
3960ab2e892SShuo Chen      keys.pop_back();
3970ab2e892SShuo Chen    }
3980ab2e892SShuo Chen  }
3994136e585SShuo Chen  }
4004136e585SShuo Chen  printf("merging done ");
4014136e585SShuo Chen  timer.report(total);
4020ab2e892SShuo Chen}
4030ab2e892SShuo Chen
4040ab2e892SShuo Chenint main(int argc, char* argv[])
4050ab2e892SShuo Chen{
4060ab2e892SShuo Chen  /*
4070ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
4080ab2e892SShuo Chen  double t = now();
4094136e585SShuo Chen  int64_t len = count_shard(0, fd);
4104136e585SShuo Chen  double sec = now() - t;
4114136e585SShuo Chen  printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6);
4124136e585SShuo Chen  */
4130ab2e892SShuo Chen
4144136e585SShuo Chen  Timer timer;
4154136e585SShuo Chen  int64_t total = shard_(argc, argv);
4160ab2e892SShuo Chen  count_shards();
4170ab2e892SShuo Chen  merge();
4184136e585SShuo Chen  printf("All done ");
4194136e585SShuo Chen  timer.report(total);
4200ab2e892SShuo Chen}
421