10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
1485147189SShuo Chen#include "file.h"
15da39c979SShuo Chen#include "merge.h"
1685147189SShuo Chen#include "timer.h"
1785147189SShuo Chen
184136e585SShuo Chen#include "absl/container/flat_hash_map.h"
192cf09315SShuo Chen#include "absl/hash/hash.h"
202a129a12SShuo Chen#include "absl/strings/str_format.h"
212a129a12SShuo Chen#include "muduo/base/Logging.h"
22a251380aSShuo Chen#include "muduo/base/ThreadPool.h"
234136e585SShuo Chen
240ab2e892SShuo Chen#include <algorithm>
250ab2e892SShuo Chen#include <memory>
260ab2e892SShuo Chen#include <string>
270ab2e892SShuo Chen#include <unordered_map>
280ab2e892SShuo Chen#include <vector>
290ab2e892SShuo Chen
300ab2e892SShuo Chen#include <fcntl.h>
310ab2e892SShuo Chen#include <string.h>
320ab2e892SShuo Chen#include <sys/mman.h>
334136e585SShuo Chen#include <sys/stat.h>
340ab2e892SShuo Chen#include <unistd.h>
350ab2e892SShuo Chen
362cf09315SShuo Chenusing absl::string_view;
370ab2e892SShuo Chenusing std::string;
380ab2e892SShuo Chenusing std::vector;
390ab2e892SShuo Chenusing std::unique_ptr;
400ab2e892SShuo Chen
4185147189SShuo Chenint kShards = 10, kThreads = 4;
4285147189SShuo Chenbool g_verbose = false, g_keep = false;
43a6693141SShuo Chenconst char* shard_dir = ".";
4485147189SShuo Chenconst char* g_output = "output";
45270b6cceSShuo Chen
460ab2e892SShuo Chenclass Sharder // : boost::noncopyable
470ab2e892SShuo Chen{
480ab2e892SShuo Chen public:
490ab2e892SShuo Chen  Sharder()
500ab2e892SShuo Chen    : files_(kShards)
510ab2e892SShuo Chen  {
520ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
530ab2e892SShuo Chen    {
540ab2e892SShuo Chen      char name[256];
55a6693141SShuo Chen      snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards);
560ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
570ab2e892SShuo Chen    }
580ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
590ab2e892SShuo Chen  }
600ab2e892SShuo Chen
610ab2e892SShuo Chen  void output(string_view word)
620ab2e892SShuo Chen  {
630ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
64270b6cceSShuo Chen    files_[shard]->appendRecord(word);
650ab2e892SShuo Chen  }
660ab2e892SShuo Chen
670ab2e892SShuo Chen  void finish()
680ab2e892SShuo Chen  {
694136e585SShuo Chen    int shard = 0;
704136e585SShuo Chen    for (const auto& file : files_)
710ab2e892SShuo Chen    {
7285147189SShuo Chen      // if (g_verbose)
734136e585SShuo Chen      printf("  shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items());
744136e585SShuo Chen      ++shard;
754136e585SShuo Chen      file->close();
760ab2e892SShuo Chen    }
770ab2e892SShuo Chen  }
780ab2e892SShuo Chen
790ab2e892SShuo Chen private:
802cf09315SShuo Chen  absl::Hash<string_view> hash;
810ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
820ab2e892SShuo Chen};
830ab2e892SShuo Chen
844136e585SShuo Chenint64_t shard_(int argc, char* argv[])
850ab2e892SShuo Chen{
860ab2e892SShuo Chen  Sharder sharder;
874136e585SShuo Chen  Timer timer;
884136e585SShuo Chen  int64_t total = 0;
892a129a12SShuo Chen  for (int i = optind; i < argc; ++i)
900ab2e892SShuo Chen  {
912a129a12SShuo Chen    LOG_INFO << "Processing input file " << argv[i];
9285147189SShuo Chen    double t = Timer::now();
9385147189SShuo Chen    string line;
9485147189SShuo Chen    InputFile input(argv[i]);
9585147189SShuo Chen    while (input.getline(&line))
960ab2e892SShuo Chen    {
9785147189SShuo Chen      sharder.output(line);
980ab2e892SShuo Chen    }
9985147189SShuo Chen    size_t len = input.tell();
1004136e585SShuo Chen    total += len;
10185147189SShuo Chen    double sec = Timer::now() - t;
1022a129a12SShuo Chen    LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024);
1030ab2e892SShuo Chen  }
1040ab2e892SShuo Chen  sharder.finish();
1052a129a12SShuo Chen  LOG_INFO << "Sharding done " << timer.report(total);
1064136e585SShuo Chen  return total;
1070ab2e892SShuo Chen}
1080ab2e892SShuo Chen
1090ab2e892SShuo Chen// ======= count_shards =======
1100ab2e892SShuo Chen
111ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len)
1120ab2e892SShuo Chen{
113ecd7048bSShuo Chen  Timer timer;
114ecd7048bSShuo Chen
11585147189SShuo Chen  double t = Timer::now();
1162a129a12SShuo Chen  LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len);
1174136e585SShuo Chen  {
1180ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
1190ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
1200ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
1210ab2e892SShuo Chen  const uint8_t* const end = start + len;
1220ab2e892SShuo Chen
1234136e585SShuo Chen  // std::unordered_map<string_view, uint64_t> items;
1244136e585SShuo Chen  absl::flat_hash_map<string_view, uint64_t> items;
1252a129a12SShuo Chen  int64_t count = 0;
1260ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
1270ab2e892SShuo Chen  {
1280ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
1290ab2e892SShuo Chen    items[s]++;
1300ab2e892SShuo Chen    p += 1 + *p;
1312a129a12SShuo Chen    ++count;
1320ab2e892SShuo Chen  }
133270b6cceSShuo Chen  LOG_INFO << "items " << count << " unique " << items.size();
13485147189SShuo Chen  if (g_verbose)
13585147189SShuo Chen  printf("  count %.3f sec %ld items\n", Timer::now() - t, items.size());
1360ab2e892SShuo Chen
13785147189SShuo Chen  t = Timer::now();
1380ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
1390ab2e892SShuo Chen  for (const auto& it : items)
1400ab2e892SShuo Chen  {
1410ab2e892SShuo Chen    if (it.second > 1)
1422cf09315SShuo Chen      counts.push_back(std::make_pair(it.second, it.first));
1430ab2e892SShuo Chen  }
14485147189SShuo Chen  if (g_verbose)
14585147189SShuo Chen  printf("  select %.3f sec %ld\n", Timer::now() - t, counts.size());
1460ab2e892SShuo Chen
14785147189SShuo Chen  t = Timer::now();
1480ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
14985147189SShuo Chen  if (g_verbose)
15085147189SShuo Chen  printf("  sort %.3f sec\n", Timer::now() - t);
1510ab2e892SShuo Chen
15285147189SShuo Chen  t = Timer::now();
153c377920eSShuo Chen  int64_t out_len = 0;
1540ab2e892SShuo Chen  {
155ecd7048bSShuo Chen    char buf[256];
156da39c979SShuo Chen    snprintf(buf, sizeof buf, "count-%05d", shard);
157ecd7048bSShuo Chen    OutputFile output(buf);
158ecd7048bSShuo Chen
1594136e585SShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
1600ab2e892SShuo Chen    {
161c377920eSShuo Chen      output.write(absl::StrFormat("%d\t%s\n", it->first, it->second));
1624136e585SShuo Chen    }
163270b6cceSShuo Chen
1644136e585SShuo Chen    for (const auto& it : items)
1654136e585SShuo Chen    {
1664136e585SShuo Chen      if (it.second == 1)
1674136e585SShuo Chen      {
168c377920eSShuo Chen        output.write(absl::StrFormat("1\t%s\n", it.first));
1694136e585SShuo Chen      }
1700ab2e892SShuo Chen    }
171c377920eSShuo Chen    out_len = output.tell();
1720ab2e892SShuo Chen  }
173c377920eSShuo Chen  if (g_verbose)
174c377920eSShuo Chen  printf("  output %.3f sec %lu\n", Timer::now() - t, out_len);
1750ab2e892SShuo Chen
1760ab2e892SShuo Chen  if (munmap(mapped, len))
1770ab2e892SShuo Chen    perror("munmap");
1784136e585SShuo Chen  }
179ecd7048bSShuo Chen  ::close(fd);
180ecd7048bSShuo Chen  LOG_INFO << "shard " << shard << " done " << timer.report(len);
1810ab2e892SShuo Chen}
1820ab2e892SShuo Chen
18385147189SShuo Chenvoid count_shards(int shards)
1840ab2e892SShuo Chen{
18585147189SShuo Chen  assert(shards <= kShards);
1864136e585SShuo Chen  Timer timer;
1874136e585SShuo Chen  int64_t total = 0;
188a251380aSShuo Chen  muduo::ThreadPool threadPool;
18985147189SShuo Chen  threadPool.setMaxQueueSize(2*kThreads);
19085147189SShuo Chen  threadPool.start(kThreads);
19185147189SShuo Chen
19285147189SShuo Chen  for (int shard = 0; shard < shards; ++shard)
1930ab2e892SShuo Chen  {
1940ab2e892SShuo Chen    char buf[256];
195a6693141SShuo Chen    snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards);
1960ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
197ecd7048bSShuo Chen    assert(fd >= 0);
19885147189SShuo Chen    if (!g_keep)
199ecd7048bSShuo Chen      ::unlink(buf);
2002a129a12SShuo Chen
201ecd7048bSShuo Chen    struct stat st;
202ecd7048bSShuo Chen    if (::fstat(fd, &st) == 0)
203ecd7048bSShuo Chen    {
204ecd7048bSShuo Chen      size_t len = st.st_size;
205ecd7048bSShuo Chen      total += len;
206ecd7048bSShuo Chen      threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); });
207ecd7048bSShuo Chen    }
208a251380aSShuo Chen  }
209a251380aSShuo Chen  while (threadPool.queueSize() > 0)
210a251380aSShuo Chen  {
21185147189SShuo Chen    LOG_DEBUG << "waiting for ThreadPool " << threadPool.queueSize();
212ecd7048bSShuo Chen    muduo::CurrentThread::sleepUsec(1000*1000);
2130ab2e892SShuo Chen  }
214a251380aSShuo Chen  threadPool.stop();
215270b6cceSShuo Chen  LOG_INFO << "Counting done "<< timer.report(total);
2160ab2e892SShuo Chen}
2170ab2e892SShuo Chen
2180ab2e892SShuo Chen// ======= merge =======
2190ab2e892SShuo Chen
2200ab2e892SShuo Chenint main(int argc, char* argv[])
2210ab2e892SShuo Chen{
2220ab2e892SShuo Chen  /*
2230ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
22485147189SShuo Chen  double t = Timer::now();
2254136e585SShuo Chen  int64_t len = count_shard(0, fd);
22685147189SShuo Chen  double sec = Timer::now() - t;
2274136e585SShuo Chen  printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6);
2284136e585SShuo Chen  */
2293e607da5SShuo Chen  setlocale(LC_NUMERIC, "");
2300ab2e892SShuo Chen
2312a129a12SShuo Chen  int opt;
23285147189SShuo Chen  int count_only = 0;
233da39c979SShuo Chen  int merge_only = 0;
234da39c979SShuo Chen  while ((opt = getopt(argc, argv, "c:km:o:p:s:t:v")) != -1)
2352a129a12SShuo Chen  {
2362a129a12SShuo Chen    switch (opt)
2372a129a12SShuo Chen    {
23885147189SShuo Chen      case 'c':
23985147189SShuo Chen        count_only = atoi(optarg);
24085147189SShuo Chen        break;
2412a129a12SShuo Chen      case 'k':
24285147189SShuo Chen        g_keep = true;
24385147189SShuo Chen        break;
24485147189SShuo Chen      case 'm':
245da39c979SShuo Chen        merge_only = atoi(optarg);
2462a129a12SShuo Chen        break;
247a6693141SShuo Chen      case 'o':
24885147189SShuo Chen        g_output = optarg;
24985147189SShuo Chen        break;
25085147189SShuo Chen      case 'p':  // Path for temp shard files
25185147189SShuo Chen        shard_dir = optarg;
252a6693141SShuo Chen        break;
2532a129a12SShuo Chen      case 's':
2542a129a12SShuo Chen        kShards = atoi(optarg);
2552a129a12SShuo Chen        break;
256a6693141SShuo Chen      case 't':
25785147189SShuo Chen        kThreads = atoi(optarg);
258a6693141SShuo Chen        break;
2592a129a12SShuo Chen      case 'v':
26085147189SShuo Chen        g_verbose = true;
2612a129a12SShuo Chen        break;
2622a129a12SShuo Chen    }
2632a129a12SShuo Chen  }
2642a129a12SShuo Chen
26585147189SShuo Chen  if (count_only > 0 || merge_only)
26685147189SShuo Chen  {
26785147189SShuo Chen    g_keep = true;
2683a0488b5SShuo Chen    //g_verbose = true;
26985147189SShuo Chen    count_only = std::min(count_only, kShards);
27085147189SShuo Chen
27185147189SShuo Chen    if (count_only > 0)
27285147189SShuo Chen    {
27385147189SShuo Chen      count_shards(count_only);
27485147189SShuo Chen    }
27585147189SShuo Chen
276da39c979SShuo Chen    if (merge_only > 0)
27785147189SShuo Chen    {
278da39c979SShuo Chen      merge(merge_only);
27985147189SShuo Chen    }
28085147189SShuo Chen  }
28185147189SShuo Chen  else
28285147189SShuo Chen  {
28385147189SShuo Chen    // Run all three steps
28485147189SShuo Chen    Timer timer;
28585147189SShuo Chen    LOG_INFO << argc - optind << " input files, " << kShards << " shards, "
28685147189SShuo Chen             << "output " << g_output <<" , temp " << shard_dir;
28785147189SShuo Chen    int64_t input = 0;
28885147189SShuo Chen    input = shard_(argc, argv);
28985147189SShuo Chen    count_shards(kShards);
290da39c979SShuo Chen    int64_t output_size = merge(kShards);
29185147189SShuo Chen    LOG_INFO << "All done " << timer.report(input) << " output " << output_size;
29285147189SShuo Chen  }
2930ab2e892SShuo Chen}
294