word_freq_shards_basic.cc revision ecd7048b
10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
144136e585SShuo Chen#include "absl/container/flat_hash_map.h"
152a129a12SShuo Chen#include "absl/strings/str_format.h"
162a129a12SShuo Chen#include "muduo/base/Logging.h"
17a251380aSShuo Chen#include "muduo/base/ThreadPool.h"
184136e585SShuo Chen
190ab2e892SShuo Chen#include <algorithm>
20270b6cceSShuo Chen//#include <fstream>
21270b6cceSShuo Chen//#include <iostream>
220ab2e892SShuo Chen#include <memory>
230ab2e892SShuo Chen#include <string>
240ab2e892SShuo Chen#include <unordered_map>
250ab2e892SShuo Chen#include <vector>
260ab2e892SShuo Chen
272a129a12SShuo Chen#include <boost/program_options.hpp>
282a129a12SShuo Chen
290ab2e892SShuo Chen#include <fcntl.h>
300ab2e892SShuo Chen#include <string.h>
310ab2e892SShuo Chen#include <sys/mman.h>
324136e585SShuo Chen#include <sys/stat.h>
330ab2e892SShuo Chen#include <sys/time.h>
344136e585SShuo Chen#include <sys/times.h>
350ab2e892SShuo Chen#include <unistd.h>
360ab2e892SShuo Chen
370ab2e892SShuo Chenusing std::string;
380ab2e892SShuo Chenusing std::string_view;
390ab2e892SShuo Chenusing std::vector;
400ab2e892SShuo Chenusing std::unique_ptr;
410ab2e892SShuo Chen
42a6693141SShuo Chenconst int kBufferSize = 128 * 1024;
43a6693141SShuo Chen
440ab2e892SShuo Chenint kShards = 10;
452a129a12SShuo Chenbool verbose = false, keep = false;
46a6693141SShuo Chenconst char* shard_dir = ".";
470ab2e892SShuo Chen
480ab2e892SShuo Cheninline double now()
490ab2e892SShuo Chen{
500ab2e892SShuo Chen  struct timeval tv = { 0, 0 };
510ab2e892SShuo Chen  gettimeofday(&tv, nullptr);
520ab2e892SShuo Chen  return tv.tv_sec + tv.tv_usec / 1000000.0;
530ab2e892SShuo Chen}
540ab2e892SShuo Chen
554136e585SShuo Chenstruct CpuTime
564136e585SShuo Chen{
574136e585SShuo Chen  double userSeconds = 0.0;
584136e585SShuo Chen  double systemSeconds = 0.0;
594136e585SShuo Chen
604136e585SShuo Chen  double total() const { return userSeconds + systemSeconds; }
614136e585SShuo Chen};
624136e585SShuo Chen
634136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK));
644136e585SShuo Chen
654136e585SShuo ChenCpuTime cpuTime()
664136e585SShuo Chen{
674136e585SShuo Chen  CpuTime t;
684136e585SShuo Chen  struct tms tms;
694136e585SShuo Chen  if (::times(&tms) >= 0)
704136e585SShuo Chen  {
714136e585SShuo Chen    const double hz = static_cast<double>(g_clockTicks);
724136e585SShuo Chen    t.userSeconds = static_cast<double>(tms.tms_utime) / hz;
734136e585SShuo Chen    t.systemSeconds = static_cast<double>(tms.tms_stime) / hz;
744136e585SShuo Chen  }
754136e585SShuo Chen  return t;
764136e585SShuo Chen}
774136e585SShuo Chen
784136e585SShuo Chenclass Timer
794136e585SShuo Chen{
804136e585SShuo Chen public:
814136e585SShuo Chen  Timer()
824136e585SShuo Chen    : start_(now()),
834136e585SShuo Chen      start_cpu_(cpuTime())
844136e585SShuo Chen  {
854136e585SShuo Chen  }
864136e585SShuo Chen
872a129a12SShuo Chen  string report(int64_t bytes) const
884136e585SShuo Chen  {
894136e585SShuo Chen    CpuTime end_cpu(cpuTime());
904136e585SShuo Chen    double end = now();
91a6693141SShuo Chen    return absl::StrFormat("%.2fs real  %.2fs cpu  %.2f MiB/s  %ld bytes",
922a129a12SShuo Chen                           end - start_, end_cpu.total() - start_cpu_.total(),
932a129a12SShuo Chen                           bytes / (end - start_) / 1024 / 1024, bytes);
944136e585SShuo Chen  }
954136e585SShuo Chen private:
964136e585SShuo Chen  const double start_ = 0;
974136e585SShuo Chen  const CpuTime start_cpu_;
984136e585SShuo Chen};
994136e585SShuo Chen
1000ab2e892SShuo Chenclass OutputFile // : boost::noncopyable
1010ab2e892SShuo Chen{
1020ab2e892SShuo Chen public:
1030ab2e892SShuo Chen  explicit OutputFile(const string& filename)
104270b6cceSShuo Chen    : file_(::fopen(filename.c_str(), "w"))
1050ab2e892SShuo Chen  {
1060ab2e892SShuo Chen    assert(file_);
1070ab2e892SShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
1080ab2e892SShuo Chen  }
1090ab2e892SShuo Chen
1100ab2e892SShuo Chen  ~OutputFile()
1110ab2e892SShuo Chen  {
1120ab2e892SShuo Chen    close();
1130ab2e892SShuo Chen  }
1140ab2e892SShuo Chen
115270b6cceSShuo Chen  void write(string_view s)
116270b6cceSShuo Chen  {
117270b6cceSShuo Chen    ::fwrite(s.data(), 1, s.size(), file_);
118270b6cceSShuo Chen  }
119270b6cceSShuo Chen
120270b6cceSShuo Chen  void appendRecord(string_view s)
1210ab2e892SShuo Chen  {
1220ab2e892SShuo Chen    assert(s.size() < 255);
1230ab2e892SShuo Chen    uint8_t len = s.size();
1240ab2e892SShuo Chen    ::fwrite(&len, 1, sizeof len, file_);
1250ab2e892SShuo Chen    ::fwrite(s.data(), 1, len, file_);
1260ab2e892SShuo Chen    ++items_;
1270ab2e892SShuo Chen  }
1280ab2e892SShuo Chen
1290ab2e892SShuo Chen  void flush()
1300ab2e892SShuo Chen  {
1310ab2e892SShuo Chen    ::fflush(file_);
1320ab2e892SShuo Chen  }
1330ab2e892SShuo Chen
1340ab2e892SShuo Chen  void close()
1350ab2e892SShuo Chen  {
1360ab2e892SShuo Chen    if (file_)
1370ab2e892SShuo Chen      ::fclose(file_);
1380ab2e892SShuo Chen    file_ = nullptr;
1390ab2e892SShuo Chen  }
1400ab2e892SShuo Chen
1410ab2e892SShuo Chen  int64_t tell()
1420ab2e892SShuo Chen  {
1430ab2e892SShuo Chen    return ::ftell(file_);
1440ab2e892SShuo Chen  }
1450ab2e892SShuo Chen
1460ab2e892SShuo Chen  int fd()
1470ab2e892SShuo Chen  {
1480ab2e892SShuo Chen    return ::fileno(file_);
1490ab2e892SShuo Chen  }
1500ab2e892SShuo Chen
1510ab2e892SShuo Chen  size_t items()
1520ab2e892SShuo Chen  {
1530ab2e892SShuo Chen    return items_;
1540ab2e892SShuo Chen  }
1550ab2e892SShuo Chen
1560ab2e892SShuo Chen private:
1570ab2e892SShuo Chen  FILE* file_;
158a6693141SShuo Chen  char buffer_[kBufferSize];
1590ab2e892SShuo Chen  size_t items_ = 0;
1600ab2e892SShuo Chen
1610ab2e892SShuo Chen  OutputFile(const OutputFile&) = delete;
1620ab2e892SShuo Chen  void operator=(const OutputFile&) = delete;
1630ab2e892SShuo Chen};
1640ab2e892SShuo Chen
165270b6cceSShuo Chenclass InputFile
166270b6cceSShuo Chen{
167270b6cceSShuo Chen public:
168270b6cceSShuo Chen  explicit InputFile(const char* filename)
169270b6cceSShuo Chen    : file_(::fopen(filename, "r"))
170270b6cceSShuo Chen  {
171270b6cceSShuo Chen    assert(file_);
172270b6cceSShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
173270b6cceSShuo Chen  }
174270b6cceSShuo Chen
175270b6cceSShuo Chen  ~InputFile()
176270b6cceSShuo Chen  {
177270b6cceSShuo Chen    close();
178270b6cceSShuo Chen  }
179270b6cceSShuo Chen
180270b6cceSShuo Chen  void close()
181270b6cceSShuo Chen  {
182270b6cceSShuo Chen    if (file_)
183270b6cceSShuo Chen      ::fclose(file_);
184270b6cceSShuo Chen    file_ = nullptr;
185270b6cceSShuo Chen  }
186270b6cceSShuo Chen
187270b6cceSShuo Chen  bool getline(string* output)
188270b6cceSShuo Chen  {
189270b6cceSShuo Chen    char buf[1024] = "";
190270b6cceSShuo Chen    if (fgets(buf, sizeof buf, file_))
191270b6cceSShuo Chen    {
192270b6cceSShuo Chen      size_t len = strlen(buf);
193270b6cceSShuo Chen      if (len > 0 && buf[len-1] == '\n')
194270b6cceSShuo Chen      {
195270b6cceSShuo Chen        buf[len-1] = '\0';
196270b6cceSShuo Chen        len--;
197270b6cceSShuo Chen      }
198270b6cceSShuo Chen      output->assign(buf, len);
199270b6cceSShuo Chen      return true;
200270b6cceSShuo Chen    }
201270b6cceSShuo Chen    return false;
202270b6cceSShuo Chen  }
203270b6cceSShuo Chen
204270b6cceSShuo Chen
205270b6cceSShuo Chen private:
206270b6cceSShuo Chen  FILE* file_;
207270b6cceSShuo Chen  char buffer_[32 * 1024 * 1024];
208270b6cceSShuo Chen
209270b6cceSShuo Chen  InputFile(const InputFile&) = delete;
210270b6cceSShuo Chen  void operator=(const InputFile&) = delete;
211270b6cceSShuo Chen};
212270b6cceSShuo Chen
2130ab2e892SShuo Chenclass Sharder // : boost::noncopyable
2140ab2e892SShuo Chen{
2150ab2e892SShuo Chen public:
2160ab2e892SShuo Chen  Sharder()
2170ab2e892SShuo Chen    : files_(kShards)
2180ab2e892SShuo Chen  {
2190ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
2200ab2e892SShuo Chen    {
2210ab2e892SShuo Chen      char name[256];
222a6693141SShuo Chen      snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards);
2230ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
2240ab2e892SShuo Chen    }
2250ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
2260ab2e892SShuo Chen  }
2270ab2e892SShuo Chen
2280ab2e892SShuo Chen  void output(string_view word)
2290ab2e892SShuo Chen  {
2300ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
231270b6cceSShuo Chen    files_[shard]->appendRecord(word);
2320ab2e892SShuo Chen  }
2330ab2e892SShuo Chen
2340ab2e892SShuo Chen  void finish()
2350ab2e892SShuo Chen  {
2364136e585SShuo Chen    int shard = 0;
2374136e585SShuo Chen    for (const auto& file : files_)
2380ab2e892SShuo Chen    {
2392a129a12SShuo Chen      // if (verbose)
2404136e585SShuo Chen      printf("  shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items());
2414136e585SShuo Chen      ++shard;
2424136e585SShuo Chen      file->close();
2430ab2e892SShuo Chen    }
2440ab2e892SShuo Chen  }
2450ab2e892SShuo Chen
2460ab2e892SShuo Chen private:
2470ab2e892SShuo Chen  std::hash<string_view> hash;
2480ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
2490ab2e892SShuo Chen};
2500ab2e892SShuo Chen
2514136e585SShuo Chenint64_t shard_(int argc, char* argv[])
2520ab2e892SShuo Chen{
2530ab2e892SShuo Chen  Sharder sharder;
2544136e585SShuo Chen  Timer timer;
2554136e585SShuo Chen  int64_t total = 0;
2562a129a12SShuo Chen  for (int i = optind; i < argc; ++i)
2570ab2e892SShuo Chen  {
2582a129a12SShuo Chen    LOG_INFO << "Processing input file " << argv[i];
2590ab2e892SShuo Chen    double t = now();
2600ab2e892SShuo Chen    char line[1024];
2610ab2e892SShuo Chen    FILE* fp = fopen(argv[i], "r");
262a6693141SShuo Chen    char buffer[kBufferSize];
2630ab2e892SShuo Chen    ::setbuffer(fp, buffer, sizeof buffer);
2640ab2e892SShuo Chen    while (fgets(line, sizeof line, fp))
2650ab2e892SShuo Chen    {
2660ab2e892SShuo Chen      size_t len = strlen(line);
2670ab2e892SShuo Chen      if (len > 0 && line[len-1] == '\n')
268270b6cceSShuo Chen      {
2690ab2e892SShuo Chen        line[len-1] = '\0';
270270b6cceSShuo Chen        len--;
271270b6cceSShuo Chen      }
272270b6cceSShuo Chen      sharder.output(string_view(line, len));
2730ab2e892SShuo Chen    }
2744136e585SShuo Chen    size_t len = ftell(fp);
2750ab2e892SShuo Chen    fclose(fp);
2764136e585SShuo Chen    total += len;
2770ab2e892SShuo Chen    double sec = now() - t;
2782a129a12SShuo Chen    LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024);
2790ab2e892SShuo Chen  }
2800ab2e892SShuo Chen  sharder.finish();
2812a129a12SShuo Chen  LOG_INFO << "Sharding done " << timer.report(total);
2824136e585SShuo Chen  return total;
2830ab2e892SShuo Chen}
2840ab2e892SShuo Chen
2850ab2e892SShuo Chen// ======= count_shards =======
2860ab2e892SShuo Chen
287ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len)
2880ab2e892SShuo Chen{
289ecd7048bSShuo Chen  Timer timer;
290ecd7048bSShuo Chen
2910ab2e892SShuo Chen  double t = now();
2922a129a12SShuo Chen  LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len);
2934136e585SShuo Chen  {
2940ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
2950ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
2960ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
2970ab2e892SShuo Chen  const uint8_t* const end = start + len;
2980ab2e892SShuo Chen
2994136e585SShuo Chen  // std::unordered_map<string_view, uint64_t> items;
3004136e585SShuo Chen  absl::flat_hash_map<string_view, uint64_t> items;
3012a129a12SShuo Chen  int64_t count = 0;
3020ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
3030ab2e892SShuo Chen  {
3040ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
3050ab2e892SShuo Chen    items[s]++;
3060ab2e892SShuo Chen    p += 1 + *p;
3072a129a12SShuo Chen    ++count;
3080ab2e892SShuo Chen  }
309270b6cceSShuo Chen  LOG_INFO << "items " << count << " unique " << items.size();
3102a129a12SShuo Chen  if (verbose)
3110ab2e892SShuo Chen  printf("  count %.3f sec %ld items\n", now() - t, items.size());
3120ab2e892SShuo Chen
3130ab2e892SShuo Chen  t = now();
3140ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
3150ab2e892SShuo Chen  for (const auto& it : items)
3160ab2e892SShuo Chen  {
3170ab2e892SShuo Chen    if (it.second > 1)
3180ab2e892SShuo Chen      counts.push_back(make_pair(it.second, it.first));
3190ab2e892SShuo Chen  }
3202a129a12SShuo Chen  if (verbose)
3210ab2e892SShuo Chen  printf("  select %.3f sec %ld\n", now() - t, counts.size());
3220ab2e892SShuo Chen
3230ab2e892SShuo Chen  t = now();
3240ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
3252a129a12SShuo Chen  if (verbose)
3260ab2e892SShuo Chen  printf("  sort %.3f sec\n", now() - t);
3270ab2e892SShuo Chen
3280ab2e892SShuo Chen  t = now();
3290ab2e892SShuo Chen  {
330ecd7048bSShuo Chen    char buf[256];
331ecd7048bSShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards);
332ecd7048bSShuo Chen    OutputFile output(buf);
333ecd7048bSShuo Chen
3344136e585SShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
3350ab2e892SShuo Chen    {
336270b6cceSShuo Chen      string s(it->second);
337ecd7048bSShuo Chen      output.write(absl::StrFormat("%d\t%s\n", it->first, s));  // FIXME %s with string_view doesn't work in C++17
338270b6cceSShuo Chen      /*
339270b6cceSShuo Chen      char buf[1024];
340270b6cceSShuo Chen      snprintf(buf, sizeof buf, "%zd\t%s\n",
341270b6cceSShuo Chen      out.write(buf);
342270b6cceSShuo Chen      */
3434136e585SShuo Chen    }
344270b6cceSShuo Chen
3454136e585SShuo Chen    for (const auto& it : items)
3464136e585SShuo Chen    {
3474136e585SShuo Chen      if (it.second == 1)
3484136e585SShuo Chen      {
349270b6cceSShuo Chen        string s(it.first);
350270b6cceSShuo Chen        // FIXME: bug of absl?
351270b6cceSShuo Chen        // out.write(absl::StrCat("1\t", s, "\n"));
352ecd7048bSShuo Chen        output.write(absl::StrFormat("1\t%s\n", s));
3534136e585SShuo Chen      }
3540ab2e892SShuo Chen    }
3550ab2e892SShuo Chen  }
3562a129a12SShuo Chen  //if (verbose)
3572a129a12SShuo Chen  //printf("  output %.3f sec %lu\n", now() - t, st.st_size);
3580ab2e892SShuo Chen
3590ab2e892SShuo Chen  if (munmap(mapped, len))
3600ab2e892SShuo Chen    perror("munmap");
3614136e585SShuo Chen  }
362ecd7048bSShuo Chen  ::close(fd);
363ecd7048bSShuo Chen  LOG_INFO << "shard " << shard << " done " << timer.report(len);
3640ab2e892SShuo Chen}
3650ab2e892SShuo Chen
3660ab2e892SShuo Chenvoid count_shards()
3670ab2e892SShuo Chen{
3684136e585SShuo Chen  Timer timer;
3694136e585SShuo Chen  int64_t total = 0;
370a251380aSShuo Chen  muduo::ThreadPool threadPool;
371a251380aSShuo Chen  threadPool.setMaxQueueSize(10);
372ecd7048bSShuo Chen  threadPool.start(4);
3730ab2e892SShuo Chen  for (int shard = 0; shard < kShards; ++shard)
3740ab2e892SShuo Chen  {
3750ab2e892SShuo Chen    char buf[256];
376a6693141SShuo Chen    snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards);
3770ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
378ecd7048bSShuo Chen    assert(fd >= 0);
3792a129a12SShuo Chen    if (!keep)
380ecd7048bSShuo Chen      ::unlink(buf);
3812a129a12SShuo Chen
382ecd7048bSShuo Chen    struct stat st;
383ecd7048bSShuo Chen    if (::fstat(fd, &st) == 0)
384ecd7048bSShuo Chen    {
385ecd7048bSShuo Chen      size_t len = st.st_size;
386ecd7048bSShuo Chen      total += len;
387ecd7048bSShuo Chen      threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); });
388ecd7048bSShuo Chen    }
389a251380aSShuo Chen  }
390a251380aSShuo Chen  while (threadPool.queueSize() > 0)
391a251380aSShuo Chen  {
392ecd7048bSShuo Chen    LOG_INFO << "Waiting for ThreadPool " << threadPool.queueSize();
393ecd7048bSShuo Chen    muduo::CurrentThread::sleepUsec(1000*1000);
3940ab2e892SShuo Chen  }
395a251380aSShuo Chen  threadPool.stop();
396270b6cceSShuo Chen  LOG_INFO << "Counting done "<< timer.report(total);
3970ab2e892SShuo Chen}
3980ab2e892SShuo Chen
3990ab2e892SShuo Chen// ======= merge =======
4000ab2e892SShuo Chen
4010ab2e892SShuo Chenclass Source  // copyable
4020ab2e892SShuo Chen{
4030ab2e892SShuo Chen public:
404270b6cceSShuo Chen  explicit Source(InputFile* in)
4050ab2e892SShuo Chen    : in_(in),
4060ab2e892SShuo Chen      count_(0),
4070ab2e892SShuo Chen      word_()
4080ab2e892SShuo Chen  {
4090ab2e892SShuo Chen  }
4100ab2e892SShuo Chen
4110ab2e892SShuo Chen  bool next()
4120ab2e892SShuo Chen  {
4130ab2e892SShuo Chen    string line;
414270b6cceSShuo Chen    if (in_->getline(&line))
4150ab2e892SShuo Chen    {
4160ab2e892SShuo Chen      size_t tab = line.find('\t');
4170ab2e892SShuo Chen      if (tab != string::npos)
4180ab2e892SShuo Chen      {
4190ab2e892SShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
4200ab2e892SShuo Chen        if (count_ > 0)
4210ab2e892SShuo Chen        {
4220ab2e892SShuo Chen          word_ = line.substr(tab+1);
4230ab2e892SShuo Chen          return true;
4240ab2e892SShuo Chen        }
4250ab2e892SShuo Chen      }
4260ab2e892SShuo Chen    }
4270ab2e892SShuo Chen    return false;
4280ab2e892SShuo Chen  }
4290ab2e892SShuo Chen
4300ab2e892SShuo Chen  bool operator<(const Source& rhs) const
4310ab2e892SShuo Chen  {
4320ab2e892SShuo Chen    return count_ < rhs.count_;
4330ab2e892SShuo Chen  }
4340ab2e892SShuo Chen
435270b6cceSShuo Chen  void outputTo(OutputFile* out) const
4360ab2e892SShuo Chen  {
437270b6cceSShuo Chen    out->write(absl::StrFormat("%d\t%s\n", count_, word_));
4380ab2e892SShuo Chen  }
4390ab2e892SShuo Chen
4400ab2e892SShuo Chen private:
441270b6cceSShuo Chen  InputFile* in_;  // not owned
4420ab2e892SShuo Chen  int64_t count_;
4430ab2e892SShuo Chen  string word_;
4440ab2e892SShuo Chen};
4450ab2e892SShuo Chen
4462a129a12SShuo Chenint64_t merge(const char* output)
4470ab2e892SShuo Chen{
4484136e585SShuo Chen  Timer timer;
449270b6cceSShuo Chen  vector<unique_ptr<InputFile>> inputs;
4500ab2e892SShuo Chen  vector<Source> keys;
4510ab2e892SShuo Chen
4524136e585SShuo Chen  int64_t total = 0;
4530ab2e892SShuo Chen  for (int i = 0; i < kShards; ++i)
4540ab2e892SShuo Chen  {
4550ab2e892SShuo Chen    char buf[256];
4560ab2e892SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards);
4574136e585SShuo Chen    struct stat st;
458a6693141SShuo Chen    if (::stat(buf, &st) == 0)
4590ab2e892SShuo Chen    {
460a6693141SShuo Chen      total += st.st_size;
461270b6cceSShuo Chen      inputs.push_back(std::make_unique<InputFile>(buf));
462a6693141SShuo Chen      Source rec(inputs.back().get());
463a6693141SShuo Chen      if (rec.next())
464a6693141SShuo Chen      {
465a6693141SShuo Chen        keys.push_back(rec);
466a6693141SShuo Chen      }
467a6693141SShuo Chen      if (!keep)
468a6693141SShuo Chen        ::unlink(buf);
469a6693141SShuo Chen    }
470a6693141SShuo Chen    else
471a6693141SShuo Chen    {
472a6693141SShuo Chen      perror("Unable to stat file:");
4730ab2e892SShuo Chen    }
4740ab2e892SShuo Chen  }
4752a129a12SShuo Chen  LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total";
4760ab2e892SShuo Chen
4774136e585SShuo Chen  {
478270b6cceSShuo Chen  OutputFile out(output);
4790ab2e892SShuo Chen  std::make_heap(keys.begin(), keys.end());
4800ab2e892SShuo Chen  while (!keys.empty())
4810ab2e892SShuo Chen  {
4820ab2e892SShuo Chen    std::pop_heap(keys.begin(), keys.end());
483270b6cceSShuo Chen    keys.back().outputTo(&out);
4840ab2e892SShuo Chen
4850ab2e892SShuo Chen    if (keys.back().next())
4860ab2e892SShuo Chen    {
4870ab2e892SShuo Chen      std::push_heap(keys.begin(), keys.end());
4880ab2e892SShuo Chen    }
4890ab2e892SShuo Chen    else
4900ab2e892SShuo Chen    {
4910ab2e892SShuo Chen      keys.pop_back();
4920ab2e892SShuo Chen    }
4930ab2e892SShuo Chen  }
4944136e585SShuo Chen  }
495a251380aSShuo Chen  LOG_INFO << "Merging done " << timer.report(total);
4962a129a12SShuo Chen  return total;
4970ab2e892SShuo Chen}
4980ab2e892SShuo Chen
4990ab2e892SShuo Chenint main(int argc, char* argv[])
5000ab2e892SShuo Chen{
5010ab2e892SShuo Chen  /*
5020ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
5030ab2e892SShuo Chen  double t = now();
5044136e585SShuo Chen  int64_t len = count_shard(0, fd);
5054136e585SShuo Chen  double sec = now() - t;
5064136e585SShuo Chen  printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6);
5074136e585SShuo Chen  */
5080ab2e892SShuo Chen
5092a129a12SShuo Chen  int opt;
510a6693141SShuo Chen  const char* output = "output";
511a6693141SShuo Chen  while ((opt = getopt(argc, argv, "ko:s:t:v")) != -1)
5122a129a12SShuo Chen  {
5132a129a12SShuo Chen    switch (opt)
5142a129a12SShuo Chen    {
5152a129a12SShuo Chen      case 'k':
5162a129a12SShuo Chen        keep = true;
5172a129a12SShuo Chen        break;
518a6693141SShuo Chen      case 'o':
519a6693141SShuo Chen        output = optarg;
520a6693141SShuo Chen        break;
5212a129a12SShuo Chen      case 's':
5222a129a12SShuo Chen        kShards = atoi(optarg);
5232a129a12SShuo Chen        break;
524a6693141SShuo Chen      case 't':
525a6693141SShuo Chen        shard_dir = optarg;
526a6693141SShuo Chen        break;
5272a129a12SShuo Chen      case 'v':
5282a129a12SShuo Chen        verbose = true;
5292a129a12SShuo Chen        break;
5302a129a12SShuo Chen    }
5312a129a12SShuo Chen  }
5322a129a12SShuo Chen
5334136e585SShuo Chen  Timer timer;
534a251380aSShuo Chen  LOG_INFO << argc - optind << " input files, " << kShards << " shards, "
535a251380aSShuo Chen      << "output " << output <<" , temp " << shard_dir;
536a6693141SShuo Chen  int64_t input = 0;
537a6693141SShuo Chen  input = shard_(argc, argv);
5380ab2e892SShuo Chen  count_shards();
539a6693141SShuo Chen  int64_t output_size = merge(output);
540a6693141SShuo Chen  LOG_INFO << "All done " << timer.report(input) << " output " << output_size;
5410ab2e892SShuo Chen}
542