word_freq_shards_basic.cc revision 3e607da5
10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
1485147189SShuo Chen#include "file.h"
1585147189SShuo Chen#include "timer.h"
1685147189SShuo Chen
174136e585SShuo Chen#include "absl/container/flat_hash_map.h"
182a129a12SShuo Chen#include "absl/strings/str_format.h"
1985147189SShuo Chen#include "muduo/base/BoundedBlockingQueue.h"
202a129a12SShuo Chen#include "muduo/base/Logging.h"
21a251380aSShuo Chen#include "muduo/base/ThreadPool.h"
224136e585SShuo Chen
230ab2e892SShuo Chen#include <algorithm>
240ab2e892SShuo Chen#include <memory>
250ab2e892SShuo Chen#include <string>
260ab2e892SShuo Chen#include <unordered_map>
270ab2e892SShuo Chen#include <vector>
280ab2e892SShuo Chen
292a129a12SShuo Chen#include <boost/program_options.hpp>
302a129a12SShuo Chen
310ab2e892SShuo Chen#include <fcntl.h>
320ab2e892SShuo Chen#include <string.h>
330ab2e892SShuo Chen#include <sys/mman.h>
344136e585SShuo Chen#include <sys/stat.h>
350ab2e892SShuo Chen#include <unistd.h>
360ab2e892SShuo Chen
370ab2e892SShuo Chenusing std::string;
380ab2e892SShuo Chenusing std::string_view;
390ab2e892SShuo Chenusing std::vector;
400ab2e892SShuo Chenusing std::unique_ptr;
410ab2e892SShuo Chen
4285147189SShuo Chenint kShards = 10, kThreads = 4;
4385147189SShuo Chenbool g_verbose = false, g_keep = false;
44a6693141SShuo Chenconst char* shard_dir = ".";
4585147189SShuo Chenconst char* g_output = "output";
46270b6cceSShuo Chen
470ab2e892SShuo Chenclass Sharder // : boost::noncopyable
480ab2e892SShuo Chen{
490ab2e892SShuo Chen public:
500ab2e892SShuo Chen  Sharder()
510ab2e892SShuo Chen    : files_(kShards)
520ab2e892SShuo Chen  {
530ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
540ab2e892SShuo Chen    {
550ab2e892SShuo Chen      char name[256];
56a6693141SShuo Chen      snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards);
570ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
580ab2e892SShuo Chen    }
590ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
600ab2e892SShuo Chen  }
610ab2e892SShuo Chen
620ab2e892SShuo Chen  void output(string_view word)
630ab2e892SShuo Chen  {
640ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
65270b6cceSShuo Chen    files_[shard]->appendRecord(word);
660ab2e892SShuo Chen  }
670ab2e892SShuo Chen
680ab2e892SShuo Chen  void finish()
690ab2e892SShuo Chen  {
704136e585SShuo Chen    int shard = 0;
714136e585SShuo Chen    for (const auto& file : files_)
720ab2e892SShuo Chen    {
7385147189SShuo Chen      // if (g_verbose)
744136e585SShuo Chen      printf("  shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items());
754136e585SShuo Chen      ++shard;
764136e585SShuo Chen      file->close();
770ab2e892SShuo Chen    }
780ab2e892SShuo Chen  }
790ab2e892SShuo Chen
800ab2e892SShuo Chen private:
810ab2e892SShuo Chen  std::hash<string_view> hash;
820ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
830ab2e892SShuo Chen};
840ab2e892SShuo Chen
854136e585SShuo Chenint64_t shard_(int argc, char* argv[])
860ab2e892SShuo Chen{
870ab2e892SShuo Chen  Sharder sharder;
884136e585SShuo Chen  Timer timer;
894136e585SShuo Chen  int64_t total = 0;
902a129a12SShuo Chen  for (int i = optind; i < argc; ++i)
910ab2e892SShuo Chen  {
922a129a12SShuo Chen    LOG_INFO << "Processing input file " << argv[i];
9385147189SShuo Chen    double t = Timer::now();
9485147189SShuo Chen    string line;
9585147189SShuo Chen    InputFile input(argv[i]);
9685147189SShuo Chen    while (input.getline(&line))
970ab2e892SShuo Chen    {
9885147189SShuo Chen      sharder.output(line);
990ab2e892SShuo Chen    }
10085147189SShuo Chen    size_t len = input.tell();
1014136e585SShuo Chen    total += len;
10285147189SShuo Chen    double sec = Timer::now() - t;
1032a129a12SShuo Chen    LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024);
1040ab2e892SShuo Chen  }
1050ab2e892SShuo Chen  sharder.finish();
1062a129a12SShuo Chen  LOG_INFO << "Sharding done " << timer.report(total);
1074136e585SShuo Chen  return total;
1080ab2e892SShuo Chen}
1090ab2e892SShuo Chen
1100ab2e892SShuo Chen// ======= count_shards =======
1110ab2e892SShuo Chen
112ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len)
1130ab2e892SShuo Chen{
114ecd7048bSShuo Chen  Timer timer;
115ecd7048bSShuo Chen
11685147189SShuo Chen  double t = Timer::now();
1172a129a12SShuo Chen  LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len);
1184136e585SShuo Chen  {
1190ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
1200ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
1210ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
1220ab2e892SShuo Chen  const uint8_t* const end = start + len;
1230ab2e892SShuo Chen
1244136e585SShuo Chen  // std::unordered_map<string_view, uint64_t> items;
1254136e585SShuo Chen  absl::flat_hash_map<string_view, uint64_t> items;
1262a129a12SShuo Chen  int64_t count = 0;
1270ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
1280ab2e892SShuo Chen  {
1290ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
1300ab2e892SShuo Chen    items[s]++;
1310ab2e892SShuo Chen    p += 1 + *p;
1322a129a12SShuo Chen    ++count;
1330ab2e892SShuo Chen  }
134270b6cceSShuo Chen  LOG_INFO << "items " << count << " unique " << items.size();
13585147189SShuo Chen  if (g_verbose)
13685147189SShuo Chen  printf("  count %.3f sec %ld items\n", Timer::now() - t, items.size());
1370ab2e892SShuo Chen
13885147189SShuo Chen  t = Timer::now();
1390ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
1400ab2e892SShuo Chen  for (const auto& it : items)
1410ab2e892SShuo Chen  {
1420ab2e892SShuo Chen    if (it.second > 1)
1430ab2e892SShuo Chen      counts.push_back(make_pair(it.second, it.first));
1440ab2e892SShuo Chen  }
14585147189SShuo Chen  if (g_verbose)
14685147189SShuo Chen  printf("  select %.3f sec %ld\n", Timer::now() - t, counts.size());
1470ab2e892SShuo Chen
14885147189SShuo Chen  t = Timer::now();
1490ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
15085147189SShuo Chen  if (g_verbose)
15185147189SShuo Chen  printf("  sort %.3f sec\n", Timer::now() - t);
1520ab2e892SShuo Chen
15385147189SShuo Chen  t = Timer::now();
1540ab2e892SShuo Chen  {
155ecd7048bSShuo Chen    char buf[256];
156ecd7048bSShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards);
157ecd7048bSShuo Chen    OutputFile output(buf);
158ecd7048bSShuo Chen
1594136e585SShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
1600ab2e892SShuo Chen    {
161270b6cceSShuo Chen      string s(it->second);
162ecd7048bSShuo Chen      output.write(absl::StrFormat("%d\t%s\n", it->first, s));  // FIXME %s with string_view doesn't work in C++17
163270b6cceSShuo Chen      /*
164270b6cceSShuo Chen      char buf[1024];
165270b6cceSShuo Chen      snprintf(buf, sizeof buf, "%zd\t%s\n",
166270b6cceSShuo Chen      out.write(buf);
167270b6cceSShuo Chen      */
1684136e585SShuo Chen    }
169270b6cceSShuo Chen
1704136e585SShuo Chen    for (const auto& it : items)
1714136e585SShuo Chen    {
1724136e585SShuo Chen      if (it.second == 1)
1734136e585SShuo Chen      {
174270b6cceSShuo Chen        string s(it.first);
175270b6cceSShuo Chen        // FIXME: bug of absl?
176270b6cceSShuo Chen        // out.write(absl::StrCat("1\t", s, "\n"));
177ecd7048bSShuo Chen        output.write(absl::StrFormat("1\t%s\n", s));
1784136e585SShuo Chen      }
1790ab2e892SShuo Chen    }
1800ab2e892SShuo Chen  }
18185147189SShuo Chen  //if (g_verbose)
18285147189SShuo Chen  //printf("  output %.3f sec %lu\n", Timer::now() - t, st.st_size);
1830ab2e892SShuo Chen
1840ab2e892SShuo Chen  if (munmap(mapped, len))
1850ab2e892SShuo Chen    perror("munmap");
1864136e585SShuo Chen  }
187ecd7048bSShuo Chen  ::close(fd);
188ecd7048bSShuo Chen  LOG_INFO << "shard " << shard << " done " << timer.report(len);
1890ab2e892SShuo Chen}
1900ab2e892SShuo Chen
19185147189SShuo Chenvoid count_shards(int shards)
1920ab2e892SShuo Chen{
19385147189SShuo Chen  assert(shards <= kShards);
1944136e585SShuo Chen  Timer timer;
1954136e585SShuo Chen  int64_t total = 0;
196a251380aSShuo Chen  muduo::ThreadPool threadPool;
19785147189SShuo Chen  threadPool.setMaxQueueSize(2*kThreads);
19885147189SShuo Chen  threadPool.start(kThreads);
19985147189SShuo Chen
20085147189SShuo Chen  for (int shard = 0; shard < shards; ++shard)
2010ab2e892SShuo Chen  {
2020ab2e892SShuo Chen    char buf[256];
203a6693141SShuo Chen    snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards);
2040ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
205ecd7048bSShuo Chen    assert(fd >= 0);
20685147189SShuo Chen    if (!g_keep)
207ecd7048bSShuo Chen      ::unlink(buf);
2082a129a12SShuo Chen
209ecd7048bSShuo Chen    struct stat st;
210ecd7048bSShuo Chen    if (::fstat(fd, &st) == 0)
211ecd7048bSShuo Chen    {
212ecd7048bSShuo Chen      size_t len = st.st_size;
213ecd7048bSShuo Chen      total += len;
214ecd7048bSShuo Chen      threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); });
215ecd7048bSShuo Chen    }
216a251380aSShuo Chen  }
217a251380aSShuo Chen  while (threadPool.queueSize() > 0)
218a251380aSShuo Chen  {
21985147189SShuo Chen    LOG_DEBUG << "waiting for ThreadPool " << threadPool.queueSize();
220ecd7048bSShuo Chen    muduo::CurrentThread::sleepUsec(1000*1000);
2210ab2e892SShuo Chen  }
222a251380aSShuo Chen  threadPool.stop();
223270b6cceSShuo Chen  LOG_INFO << "Counting done "<< timer.report(total);
2240ab2e892SShuo Chen}
2250ab2e892SShuo Chen
2260ab2e892SShuo Chen// ======= merge =======
2270ab2e892SShuo Chen
2280ab2e892SShuo Chenclass Source  // copyable
2290ab2e892SShuo Chen{
2300ab2e892SShuo Chen public:
231270b6cceSShuo Chen  explicit Source(InputFile* in)
2320ab2e892SShuo Chen    : in_(in),
2330ab2e892SShuo Chen      count_(0),
2340ab2e892SShuo Chen      word_()
2350ab2e892SShuo Chen  {
2360ab2e892SShuo Chen  }
2370ab2e892SShuo Chen
2380ab2e892SShuo Chen  bool next()
2390ab2e892SShuo Chen  {
2400ab2e892SShuo Chen    string line;
241270b6cceSShuo Chen    if (in_->getline(&line))
2420ab2e892SShuo Chen    {
2430ab2e892SShuo Chen      size_t tab = line.find('\t');
2440ab2e892SShuo Chen      if (tab != string::npos)
2450ab2e892SShuo Chen      {
2460ab2e892SShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
2470ab2e892SShuo Chen        if (count_ > 0)
2480ab2e892SShuo Chen        {
2490ab2e892SShuo Chen          word_ = line.substr(tab+1);
2500ab2e892SShuo Chen          return true;
2510ab2e892SShuo Chen        }
2520ab2e892SShuo Chen      }
2530ab2e892SShuo Chen    }
2540ab2e892SShuo Chen    return false;
2550ab2e892SShuo Chen  }
2560ab2e892SShuo Chen
2570ab2e892SShuo Chen  bool operator<(const Source& rhs) const
2580ab2e892SShuo Chen  {
2590ab2e892SShuo Chen    return count_ < rhs.count_;
2600ab2e892SShuo Chen  }
2610ab2e892SShuo Chen
262270b6cceSShuo Chen  void outputTo(OutputFile* out) const
2630ab2e892SShuo Chen  {
26485147189SShuo Chen    //char buf[1024];
26585147189SShuo Chen    //snprintf(buf, sizeof buf, "%ld\t%s\n", count_, word_.c_str());
26685147189SShuo Chen    //out->write(buf);
267270b6cceSShuo Chen    out->write(absl::StrFormat("%d\t%s\n", count_, word_));
2680ab2e892SShuo Chen  }
2690ab2e892SShuo Chen
27085147189SShuo Chen  std::pair<int64_t, string> item()
27185147189SShuo Chen  {
27285147189SShuo Chen    return make_pair(count_, std::move(word_));
27385147189SShuo Chen  }
27485147189SShuo Chen
2750ab2e892SShuo Chen private:
276270b6cceSShuo Chen  InputFile* in_;  // not owned
2770ab2e892SShuo Chen  int64_t count_;
2780ab2e892SShuo Chen  string word_;
2790ab2e892SShuo Chen};
2800ab2e892SShuo Chen
28185147189SShuo Chenint64_t merge()
2820ab2e892SShuo Chen{
2834136e585SShuo Chen  Timer timer;
284270b6cceSShuo Chen  vector<unique_ptr<InputFile>> inputs;
2850ab2e892SShuo Chen  vector<Source> keys;
2860ab2e892SShuo Chen
2874136e585SShuo Chen  int64_t total = 0;
2880ab2e892SShuo Chen  for (int i = 0; i < kShards; ++i)
2890ab2e892SShuo Chen  {
2900ab2e892SShuo Chen    char buf[256];
2910ab2e892SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards);
2924136e585SShuo Chen    struct stat st;
293a6693141SShuo Chen    if (::stat(buf, &st) == 0)
2940ab2e892SShuo Chen    {
295a6693141SShuo Chen      total += st.st_size;
29685147189SShuo Chen      // TODO: select buffer size based on kShards.
29785147189SShuo Chen      inputs.push_back(std::make_unique<InputFile>(buf, 32 * 1024 * 1024));
298a6693141SShuo Chen      Source rec(inputs.back().get());
299a6693141SShuo Chen      if (rec.next())
300a6693141SShuo Chen      {
301a6693141SShuo Chen        keys.push_back(rec);
302a6693141SShuo Chen      }
30385147189SShuo Chen      if (!g_keep)
304a6693141SShuo Chen        ::unlink(buf);
305a6693141SShuo Chen    }
306a6693141SShuo Chen    else
307a6693141SShuo Chen    {
308a6693141SShuo Chen      perror("Unable to stat file:");
3090ab2e892SShuo Chen    }
3100ab2e892SShuo Chen  }
3112a129a12SShuo Chen  LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total";
3120ab2e892SShuo Chen
3134136e585SShuo Chen  {
31485147189SShuo Chen  OutputFile out(g_output);
31585147189SShuo Chen  /*
31685147189SShuo Chen  muduo::BoundedBlockingQueue<vector<std::pair<int64_t, string>>> queue(1024);
31785147189SShuo Chen  muduo::Thread thr([&queue] {
31885147189SShuo Chen    OutputFile out(g_output);
31985147189SShuo Chen    while (true) {
32085147189SShuo Chen      auto vec = queue.take();
32185147189SShuo Chen      if (vec.size() == 0)
32285147189SShuo Chen        break;
32385147189SShuo Chen      for (const auto& x : vec)
32485147189SShuo Chen        out.write(absl::StrFormat("%d\t%s\n", x.first, x.second));
32585147189SShuo Chen    }
32685147189SShuo Chen  });
32785147189SShuo Chen  thr.start();
32885147189SShuo Chen
32985147189SShuo Chen  vector<std::pair<int64_t, string>> batch;
33085147189SShuo Chen  */
3310ab2e892SShuo Chen  std::make_heap(keys.begin(), keys.end());
3320ab2e892SShuo Chen  while (!keys.empty())
3330ab2e892SShuo Chen  {
3340ab2e892SShuo Chen    std::pop_heap(keys.begin(), keys.end());
335270b6cceSShuo Chen    keys.back().outputTo(&out);
33685147189SShuo Chen    /*
33785147189SShuo Chen    batch.push_back(std::move(keys.back().item()));
33885147189SShuo Chen    if (batch.size() >= 10*1024*1024)
33985147189SShuo Chen    {
34085147189SShuo Chen      queue.put(std::move(batch));
34185147189SShuo Chen      batch.clear();
34285147189SShuo Chen    }
34385147189SShuo Chen    */
3440ab2e892SShuo Chen
3450ab2e892SShuo Chen    if (keys.back().next())
3460ab2e892SShuo Chen    {
3470ab2e892SShuo Chen      std::push_heap(keys.begin(), keys.end());
3480ab2e892SShuo Chen    }
3490ab2e892SShuo Chen    else
3500ab2e892SShuo Chen    {
3510ab2e892SShuo Chen      keys.pop_back();
3520ab2e892SShuo Chen    }
3530ab2e892SShuo Chen  }
35485147189SShuo Chen  /*
35585147189SShuo Chen  queue.put(batch);
35685147189SShuo Chen  batch.clear();
35785147189SShuo Chen  queue.put(batch);
35885147189SShuo Chen  thr.join();
35985147189SShuo Chen  */
3604136e585SShuo Chen  }
361a251380aSShuo Chen  LOG_INFO << "Merging done " << timer.report(total);
3622a129a12SShuo Chen  return total;
3630ab2e892SShuo Chen}
3640ab2e892SShuo Chen
3650ab2e892SShuo Chenint main(int argc, char* argv[])
3660ab2e892SShuo Chen{
3670ab2e892SShuo Chen  /*
3680ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
36985147189SShuo Chen  double t = Timer::now();
3704136e585SShuo Chen  int64_t len = count_shard(0, fd);
37185147189SShuo Chen  double sec = Timer::now() - t;
3724136e585SShuo Chen  printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6);
3734136e585SShuo Chen  */
3743e607da5SShuo Chen  setlocale(LC_NUMERIC, "");
3750ab2e892SShuo Chen
3762a129a12SShuo Chen  int opt;
37785147189SShuo Chen  int count_only = 0;
37885147189SShuo Chen  bool merge_only = false;
37985147189SShuo Chen  while ((opt = getopt(argc, argv, "c:kmo:p:s:t:v")) != -1)
3802a129a12SShuo Chen  {
3812a129a12SShuo Chen    switch (opt)
3822a129a12SShuo Chen    {
38385147189SShuo Chen      case 'c':
38485147189SShuo Chen        count_only = atoi(optarg);
38585147189SShuo Chen        break;
3862a129a12SShuo Chen      case 'k':
38785147189SShuo Chen        g_keep = true;
38885147189SShuo Chen        break;
38985147189SShuo Chen      case 'm':
39085147189SShuo Chen        merge_only = true;
3912a129a12SShuo Chen        break;
392a6693141SShuo Chen      case 'o':
39385147189SShuo Chen        g_output = optarg;
39485147189SShuo Chen        break;
39585147189SShuo Chen      case 'p':  // Path for temp shard files
39685147189SShuo Chen        shard_dir = optarg;
397a6693141SShuo Chen        break;
3982a129a12SShuo Chen      case 's':
3992a129a12SShuo Chen        kShards = atoi(optarg);
4002a129a12SShuo Chen        break;
401a6693141SShuo Chen      case 't':
40285147189SShuo Chen        kThreads = atoi(optarg);
403a6693141SShuo Chen        break;
4042a129a12SShuo Chen      case 'v':
40585147189SShuo Chen        g_verbose = true;
4062a129a12SShuo Chen        break;
4072a129a12SShuo Chen    }
4082a129a12SShuo Chen  }
4092a129a12SShuo Chen
41085147189SShuo Chen  if (count_only > 0 || merge_only)
41185147189SShuo Chen  {
41285147189SShuo Chen    g_keep = true;
41385147189SShuo Chen    g_verbose = true;
41485147189SShuo Chen    count_only = std::min(count_only, kShards);
41585147189SShuo Chen
41685147189SShuo Chen    if (count_only > 0)
41785147189SShuo Chen    {
41885147189SShuo Chen      count_shards(count_only);
41985147189SShuo Chen    }
42085147189SShuo Chen
42185147189SShuo Chen    if (merge_only)
42285147189SShuo Chen    {
42385147189SShuo Chen      merge();
42485147189SShuo Chen    }
42585147189SShuo Chen  }
42685147189SShuo Chen  else
42785147189SShuo Chen  {
42885147189SShuo Chen    // Run all three steps
42985147189SShuo Chen    Timer timer;
43085147189SShuo Chen    LOG_INFO << argc - optind << " input files, " << kShards << " shards, "
43185147189SShuo Chen             << "output " << g_output <<" , temp " << shard_dir;
43285147189SShuo Chen    int64_t input = 0;
43385147189SShuo Chen    input = shard_(argc, argv);
43485147189SShuo Chen    count_shards(kShards);
43585147189SShuo Chen    int64_t output_size = merge();
43685147189SShuo Chen    LOG_INFO << "All done " << timer.report(input) << " output " << output_size;
43785147189SShuo Chen  }
4380ab2e892SShuo Chen}
439