word_freq_shards_basic.cc revision a251380a
10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
144136e585SShuo Chen#include "absl/container/flat_hash_map.h"
152a129a12SShuo Chen#include "absl/strings/str_format.h"
162a129a12SShuo Chen#include "muduo/base/Logging.h"
17a251380aSShuo Chen#include "muduo/base/ThreadPool.h"
184136e585SShuo Chen
190ab2e892SShuo Chen#include <algorithm>
20270b6cceSShuo Chen//#include <fstream>
21270b6cceSShuo Chen//#include <iostream>
220ab2e892SShuo Chen#include <memory>
230ab2e892SShuo Chen#include <string>
240ab2e892SShuo Chen#include <unordered_map>
250ab2e892SShuo Chen#include <vector>
260ab2e892SShuo Chen
272a129a12SShuo Chen#include <boost/program_options.hpp>
282a129a12SShuo Chen
290ab2e892SShuo Chen#include <fcntl.h>
300ab2e892SShuo Chen#include <string.h>
310ab2e892SShuo Chen#include <sys/mman.h>
324136e585SShuo Chen#include <sys/stat.h>
330ab2e892SShuo Chen#include <sys/time.h>
344136e585SShuo Chen#include <sys/times.h>
350ab2e892SShuo Chen#include <unistd.h>
360ab2e892SShuo Chen
370ab2e892SShuo Chenusing std::string;
380ab2e892SShuo Chenusing std::string_view;
390ab2e892SShuo Chenusing std::vector;
400ab2e892SShuo Chenusing std::unique_ptr;
410ab2e892SShuo Chen
42a6693141SShuo Chenconst int kBufferSize = 128 * 1024;
43a6693141SShuo Chen
440ab2e892SShuo Chenint kShards = 10;
452a129a12SShuo Chenbool verbose = false, keep = false;
46a6693141SShuo Chenconst char* shard_dir = ".";
470ab2e892SShuo Chen
480ab2e892SShuo Cheninline double now()
490ab2e892SShuo Chen{
500ab2e892SShuo Chen  struct timeval tv = { 0, 0 };
510ab2e892SShuo Chen  gettimeofday(&tv, nullptr);
520ab2e892SShuo Chen  return tv.tv_sec + tv.tv_usec / 1000000.0;
530ab2e892SShuo Chen}
540ab2e892SShuo Chen
554136e585SShuo Chenstruct CpuTime
564136e585SShuo Chen{
574136e585SShuo Chen  double userSeconds = 0.0;
584136e585SShuo Chen  double systemSeconds = 0.0;
594136e585SShuo Chen
604136e585SShuo Chen  double total() const { return userSeconds + systemSeconds; }
614136e585SShuo Chen};
624136e585SShuo Chen
634136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK));
644136e585SShuo Chen
654136e585SShuo ChenCpuTime cpuTime()
664136e585SShuo Chen{
674136e585SShuo Chen  CpuTime t;
684136e585SShuo Chen  struct tms tms;
694136e585SShuo Chen  if (::times(&tms) >= 0)
704136e585SShuo Chen  {
714136e585SShuo Chen    const double hz = static_cast<double>(g_clockTicks);
724136e585SShuo Chen    t.userSeconds = static_cast<double>(tms.tms_utime) / hz;
734136e585SShuo Chen    t.systemSeconds = static_cast<double>(tms.tms_stime) / hz;
744136e585SShuo Chen  }
754136e585SShuo Chen  return t;
764136e585SShuo Chen}
774136e585SShuo Chen
784136e585SShuo Chenclass Timer
794136e585SShuo Chen{
804136e585SShuo Chen public:
814136e585SShuo Chen  Timer()
824136e585SShuo Chen    : start_(now()),
834136e585SShuo Chen      start_cpu_(cpuTime())
844136e585SShuo Chen  {
854136e585SShuo Chen  }
864136e585SShuo Chen
872a129a12SShuo Chen  string report(int64_t bytes) const
884136e585SShuo Chen  {
894136e585SShuo Chen    CpuTime end_cpu(cpuTime());
904136e585SShuo Chen    double end = now();
91a6693141SShuo Chen    return absl::StrFormat("%.2fs real  %.2fs cpu  %.2f MiB/s  %ld bytes",
922a129a12SShuo Chen                           end - start_, end_cpu.total() - start_cpu_.total(),
932a129a12SShuo Chen                           bytes / (end - start_) / 1024 / 1024, bytes);
944136e585SShuo Chen  }
954136e585SShuo Chen private:
964136e585SShuo Chen  const double start_ = 0;
974136e585SShuo Chen  const CpuTime start_cpu_;
984136e585SShuo Chen};
994136e585SShuo Chen
1000ab2e892SShuo Chenclass OutputFile // : boost::noncopyable
1010ab2e892SShuo Chen{
1020ab2e892SShuo Chen public:
1030ab2e892SShuo Chen  explicit OutputFile(const string& filename)
104270b6cceSShuo Chen    : file_(::fopen(filename.c_str(), "w"))
1050ab2e892SShuo Chen  {
1060ab2e892SShuo Chen    assert(file_);
1070ab2e892SShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
1080ab2e892SShuo Chen  }
1090ab2e892SShuo Chen
1100ab2e892SShuo Chen  ~OutputFile()
1110ab2e892SShuo Chen  {
1120ab2e892SShuo Chen    close();
1130ab2e892SShuo Chen  }
1140ab2e892SShuo Chen
115270b6cceSShuo Chen  void write(string_view s)
116270b6cceSShuo Chen  {
117270b6cceSShuo Chen    ::fwrite(s.data(), 1, s.size(), file_);
118270b6cceSShuo Chen  }
119270b6cceSShuo Chen
120270b6cceSShuo Chen  void appendRecord(string_view s)
1210ab2e892SShuo Chen  {
1220ab2e892SShuo Chen    assert(s.size() < 255);
1230ab2e892SShuo Chen    uint8_t len = s.size();
1240ab2e892SShuo Chen    ::fwrite(&len, 1, sizeof len, file_);
1250ab2e892SShuo Chen    ::fwrite(s.data(), 1, len, file_);
1260ab2e892SShuo Chen    ++items_;
1270ab2e892SShuo Chen  }
1280ab2e892SShuo Chen
1290ab2e892SShuo Chen  void flush()
1300ab2e892SShuo Chen  {
1310ab2e892SShuo Chen    ::fflush(file_);
1320ab2e892SShuo Chen  }
1330ab2e892SShuo Chen
1340ab2e892SShuo Chen  void close()
1350ab2e892SShuo Chen  {
1360ab2e892SShuo Chen    if (file_)
1370ab2e892SShuo Chen      ::fclose(file_);
1380ab2e892SShuo Chen    file_ = nullptr;
1390ab2e892SShuo Chen  }
1400ab2e892SShuo Chen
1410ab2e892SShuo Chen  int64_t tell()
1420ab2e892SShuo Chen  {
1430ab2e892SShuo Chen    return ::ftell(file_);
1440ab2e892SShuo Chen  }
1450ab2e892SShuo Chen
1460ab2e892SShuo Chen  int fd()
1470ab2e892SShuo Chen  {
1480ab2e892SShuo Chen    return ::fileno(file_);
1490ab2e892SShuo Chen  }
1500ab2e892SShuo Chen
1510ab2e892SShuo Chen  size_t items()
1520ab2e892SShuo Chen  {
1530ab2e892SShuo Chen    return items_;
1540ab2e892SShuo Chen  }
1550ab2e892SShuo Chen
1560ab2e892SShuo Chen private:
1570ab2e892SShuo Chen  FILE* file_;
158a6693141SShuo Chen  char buffer_[kBufferSize];
1590ab2e892SShuo Chen  size_t items_ = 0;
1600ab2e892SShuo Chen
1610ab2e892SShuo Chen  OutputFile(const OutputFile&) = delete;
1620ab2e892SShuo Chen  void operator=(const OutputFile&) = delete;
1630ab2e892SShuo Chen};
1640ab2e892SShuo Chen
165270b6cceSShuo Chenclass InputFile
166270b6cceSShuo Chen{
167270b6cceSShuo Chen public:
168270b6cceSShuo Chen  explicit InputFile(const char* filename)
169270b6cceSShuo Chen    : file_(::fopen(filename, "r"))
170270b6cceSShuo Chen  {
171270b6cceSShuo Chen    assert(file_);
172270b6cceSShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
173270b6cceSShuo Chen  }
174270b6cceSShuo Chen
175270b6cceSShuo Chen  ~InputFile()
176270b6cceSShuo Chen  {
177270b6cceSShuo Chen    close();
178270b6cceSShuo Chen  }
179270b6cceSShuo Chen
180270b6cceSShuo Chen  void close()
181270b6cceSShuo Chen  {
182270b6cceSShuo Chen    if (file_)
183270b6cceSShuo Chen      ::fclose(file_);
184270b6cceSShuo Chen    file_ = nullptr;
185270b6cceSShuo Chen  }
186270b6cceSShuo Chen
187270b6cceSShuo Chen  bool getline(string* output)
188270b6cceSShuo Chen  {
189270b6cceSShuo Chen    char buf[1024] = "";
190270b6cceSShuo Chen    if (fgets(buf, sizeof buf, file_))
191270b6cceSShuo Chen    {
192270b6cceSShuo Chen      size_t len = strlen(buf);
193270b6cceSShuo Chen      if (len > 0 && buf[len-1] == '\n')
194270b6cceSShuo Chen      {
195270b6cceSShuo Chen        buf[len-1] = '\0';
196270b6cceSShuo Chen        len--;
197270b6cceSShuo Chen      }
198270b6cceSShuo Chen      output->assign(buf, len);
199270b6cceSShuo Chen      return true;
200270b6cceSShuo Chen    }
201270b6cceSShuo Chen    return false;
202270b6cceSShuo Chen  }
203270b6cceSShuo Chen
204270b6cceSShuo Chen
205270b6cceSShuo Chen private:
206270b6cceSShuo Chen  FILE* file_;
207270b6cceSShuo Chen  char buffer_[32 * 1024 * 1024];
208270b6cceSShuo Chen
209270b6cceSShuo Chen  InputFile(const InputFile&) = delete;
210270b6cceSShuo Chen  void operator=(const InputFile&) = delete;
211270b6cceSShuo Chen};
212270b6cceSShuo Chen
2130ab2e892SShuo Chenclass Sharder // : boost::noncopyable
2140ab2e892SShuo Chen{
2150ab2e892SShuo Chen public:
2160ab2e892SShuo Chen  Sharder()
2170ab2e892SShuo Chen    : files_(kShards)
2180ab2e892SShuo Chen  {
2190ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
2200ab2e892SShuo Chen    {
2210ab2e892SShuo Chen      char name[256];
222a6693141SShuo Chen      snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards);
2230ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
2240ab2e892SShuo Chen    }
2250ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
2260ab2e892SShuo Chen  }
2270ab2e892SShuo Chen
2280ab2e892SShuo Chen  void output(string_view word)
2290ab2e892SShuo Chen  {
2300ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
231270b6cceSShuo Chen    files_[shard]->appendRecord(word);
2320ab2e892SShuo Chen  }
2330ab2e892SShuo Chen
2340ab2e892SShuo Chen  void finish()
2350ab2e892SShuo Chen  {
2364136e585SShuo Chen    int shard = 0;
2374136e585SShuo Chen    for (const auto& file : files_)
2380ab2e892SShuo Chen    {
2392a129a12SShuo Chen      // if (verbose)
2404136e585SShuo Chen      printf("  shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items());
2414136e585SShuo Chen      ++shard;
2424136e585SShuo Chen      file->close();
2430ab2e892SShuo Chen    }
2440ab2e892SShuo Chen  }
2450ab2e892SShuo Chen
2460ab2e892SShuo Chen private:
2470ab2e892SShuo Chen  std::hash<string_view> hash;
2480ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
2490ab2e892SShuo Chen};
2500ab2e892SShuo Chen
2514136e585SShuo Chenint64_t shard_(int argc, char* argv[])
2520ab2e892SShuo Chen{
2530ab2e892SShuo Chen  Sharder sharder;
2544136e585SShuo Chen  Timer timer;
2554136e585SShuo Chen  int64_t total = 0;
2562a129a12SShuo Chen  for (int i = optind; i < argc; ++i)
2570ab2e892SShuo Chen  {
2582a129a12SShuo Chen    LOG_INFO << "Processing input file " << argv[i];
2590ab2e892SShuo Chen    double t = now();
2600ab2e892SShuo Chen    char line[1024];
2610ab2e892SShuo Chen    FILE* fp = fopen(argv[i], "r");
262a6693141SShuo Chen    char buffer[kBufferSize];
2630ab2e892SShuo Chen    ::setbuffer(fp, buffer, sizeof buffer);
2640ab2e892SShuo Chen    while (fgets(line, sizeof line, fp))
2650ab2e892SShuo Chen    {
2660ab2e892SShuo Chen      size_t len = strlen(line);
2670ab2e892SShuo Chen      if (len > 0 && line[len-1] == '\n')
268270b6cceSShuo Chen      {
2690ab2e892SShuo Chen        line[len-1] = '\0';
270270b6cceSShuo Chen        len--;
271270b6cceSShuo Chen      }
272270b6cceSShuo Chen      sharder.output(string_view(line, len));
2730ab2e892SShuo Chen    }
2744136e585SShuo Chen    size_t len = ftell(fp);
2750ab2e892SShuo Chen    fclose(fp);
2764136e585SShuo Chen    total += len;
2770ab2e892SShuo Chen    double sec = now() - t;
2782a129a12SShuo Chen    LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024);
2790ab2e892SShuo Chen  }
2800ab2e892SShuo Chen  sharder.finish();
2812a129a12SShuo Chen  LOG_INFO << "Sharding done " << timer.report(total);
2824136e585SShuo Chen  return total;
2830ab2e892SShuo Chen}
2840ab2e892SShuo Chen
2850ab2e892SShuo Chen// ======= count_shards =======
2860ab2e892SShuo Chen
287a251380aSShuo Chenint64_t count_shard(int shard, int fd, OutputFile* output)
2880ab2e892SShuo Chen{
2890ab2e892SShuo Chen  const int64_t len = lseek(fd, 0, SEEK_END);
2900ab2e892SShuo Chen  lseek(fd, 0, SEEK_SET);
2910ab2e892SShuo Chen  double t = now();
2922a129a12SShuo Chen  LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len);
2934136e585SShuo Chen  {
2940ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
2950ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
2960ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
2970ab2e892SShuo Chen  const uint8_t* const end = start + len;
2980ab2e892SShuo Chen
2994136e585SShuo Chen  // std::unordered_map<string_view, uint64_t> items;
3004136e585SShuo Chen  absl::flat_hash_map<string_view, uint64_t> items;
3012a129a12SShuo Chen  int64_t count = 0;
3020ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
3030ab2e892SShuo Chen  {
3040ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
3050ab2e892SShuo Chen    items[s]++;
3060ab2e892SShuo Chen    p += 1 + *p;
3072a129a12SShuo Chen    ++count;
3080ab2e892SShuo Chen  }
309270b6cceSShuo Chen  LOG_INFO << "items " << count << " unique " << items.size();
3102a129a12SShuo Chen  if (verbose)
3110ab2e892SShuo Chen  printf("  count %.3f sec %ld items\n", now() - t, items.size());
3120ab2e892SShuo Chen
3130ab2e892SShuo Chen  t = now();
3140ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
3150ab2e892SShuo Chen  for (const auto& it : items)
3160ab2e892SShuo Chen  {
3170ab2e892SShuo Chen    if (it.second > 1)
3180ab2e892SShuo Chen      counts.push_back(make_pair(it.second, it.first));
3190ab2e892SShuo Chen  }
3202a129a12SShuo Chen  if (verbose)
3210ab2e892SShuo Chen  printf("  select %.3f sec %ld\n", now() - t, counts.size());
3220ab2e892SShuo Chen
3230ab2e892SShuo Chen  t = now();
3240ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
3252a129a12SShuo Chen  if (verbose)
3260ab2e892SShuo Chen  printf("  sort %.3f sec\n", now() - t);
3270ab2e892SShuo Chen
3280ab2e892SShuo Chen  t = now();
3290ab2e892SShuo Chen  {
3304136e585SShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
3310ab2e892SShuo Chen    {
332270b6cceSShuo Chen      string s(it->second);
333a251380aSShuo Chen      output->write(absl::StrFormat("%d\t%s\n", it->first, s));  // FIXME %s with string_view doesn't work in C++17
334270b6cceSShuo Chen      /*
335270b6cceSShuo Chen      char buf[1024];
336270b6cceSShuo Chen      snprintf(buf, sizeof buf, "%zd\t%s\n",
337270b6cceSShuo Chen      out.write(buf);
338270b6cceSShuo Chen      */
3394136e585SShuo Chen    }
340270b6cceSShuo Chen
3414136e585SShuo Chen    for (const auto& it : items)
3424136e585SShuo Chen    {
3434136e585SShuo Chen      if (it.second == 1)
3444136e585SShuo Chen      {
345270b6cceSShuo Chen        string s(it.first);
346270b6cceSShuo Chen        // FIXME: bug of absl?
347270b6cceSShuo Chen        // out.write(absl::StrCat("1\t", s, "\n"));
348a251380aSShuo Chen        output->write(absl::StrFormat("1\t%s\n", s));
3494136e585SShuo Chen      }
3500ab2e892SShuo Chen    }
3510ab2e892SShuo Chen  }
3522a129a12SShuo Chen  //if (verbose)
3532a129a12SShuo Chen  //printf("  output %.3f sec %lu\n", now() - t, st.st_size);
3540ab2e892SShuo Chen
3554136e585SShuo Chen  t = now();
3560ab2e892SShuo Chen  if (munmap(mapped, len))
3570ab2e892SShuo Chen    perror("munmap");
3584136e585SShuo Chen  }
3592a129a12SShuo Chen  // printf("  destruct %.3f sec\n", now() - t);
3604136e585SShuo Chen  return len;
3610ab2e892SShuo Chen}
3620ab2e892SShuo Chen
3630ab2e892SShuo Chenvoid count_shards()
3640ab2e892SShuo Chen{
3654136e585SShuo Chen  Timer timer;
3664136e585SShuo Chen  int64_t total = 0;
367a251380aSShuo Chen  muduo::ThreadPool threadPool;
368a251380aSShuo Chen  threadPool.setMaxQueueSize(10);
369a251380aSShuo Chen  threadPool.start(1);
3700ab2e892SShuo Chen  for (int shard = 0; shard < kShards; ++shard)
3710ab2e892SShuo Chen  {
3722a129a12SShuo Chen    Timer timer;
3730ab2e892SShuo Chen    char buf[256];
374a6693141SShuo Chen    snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards);
3750ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
3762a129a12SShuo Chen    if (!keep)
3770ab2e892SShuo Chen    ::unlink(buf);
3782a129a12SShuo Chen
3792a129a12SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards);
380a251380aSShuo Chen    std::shared_ptr<OutputFile> output(new OutputFile(buf));
381a251380aSShuo Chen    int64_t len = count_shard(shard, fd, output.get());
3822a129a12SShuo Chen    ::close(fd);
3834136e585SShuo Chen    total += len;
384a251380aSShuo Chen    threadPool.run([x = std::move(output)]{});
385a251380aSShuo Chen    LOG_INFO << "shard " << shard << " done " << timer.report(len);
386a251380aSShuo Chen  }
387a251380aSShuo Chen  while (threadPool.queueSize() > 0)
388a251380aSShuo Chen  {
389a251380aSShuo Chen    LOG_INFO << "Waiting for ThreadPool";
390a251380aSShuo Chen    muduo::CurrentThread::sleepUsec(100*1000);
3910ab2e892SShuo Chen  }
392a251380aSShuo Chen  threadPool.stop();
393270b6cceSShuo Chen  LOG_INFO << "Counting done "<< timer.report(total);
3940ab2e892SShuo Chen}
3950ab2e892SShuo Chen
3960ab2e892SShuo Chen// ======= merge =======
3970ab2e892SShuo Chen
3980ab2e892SShuo Chenclass Source  // copyable
3990ab2e892SShuo Chen{
4000ab2e892SShuo Chen public:
401270b6cceSShuo Chen  explicit Source(InputFile* in)
4020ab2e892SShuo Chen    : in_(in),
4030ab2e892SShuo Chen      count_(0),
4040ab2e892SShuo Chen      word_()
4050ab2e892SShuo Chen  {
4060ab2e892SShuo Chen  }
4070ab2e892SShuo Chen
4080ab2e892SShuo Chen  bool next()
4090ab2e892SShuo Chen  {
4100ab2e892SShuo Chen    string line;
411270b6cceSShuo Chen    if (in_->getline(&line))
4120ab2e892SShuo Chen    {
4130ab2e892SShuo Chen      size_t tab = line.find('\t');
4140ab2e892SShuo Chen      if (tab != string::npos)
4150ab2e892SShuo Chen      {
4160ab2e892SShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
4170ab2e892SShuo Chen        if (count_ > 0)
4180ab2e892SShuo Chen        {
4190ab2e892SShuo Chen          word_ = line.substr(tab+1);
4200ab2e892SShuo Chen          return true;
4210ab2e892SShuo Chen        }
4220ab2e892SShuo Chen      }
4230ab2e892SShuo Chen    }
4240ab2e892SShuo Chen    return false;
4250ab2e892SShuo Chen  }
4260ab2e892SShuo Chen
4270ab2e892SShuo Chen  bool operator<(const Source& rhs) const
4280ab2e892SShuo Chen  {
4290ab2e892SShuo Chen    return count_ < rhs.count_;
4300ab2e892SShuo Chen  }
4310ab2e892SShuo Chen
432270b6cceSShuo Chen  void outputTo(OutputFile* out) const
4330ab2e892SShuo Chen  {
434270b6cceSShuo Chen    out->write(absl::StrFormat("%d\t%s\n", count_, word_));
4350ab2e892SShuo Chen  }
4360ab2e892SShuo Chen
4370ab2e892SShuo Chen private:
438270b6cceSShuo Chen  InputFile* in_;  // not owned
4390ab2e892SShuo Chen  int64_t count_;
4400ab2e892SShuo Chen  string word_;
4410ab2e892SShuo Chen};
4420ab2e892SShuo Chen
4432a129a12SShuo Chenint64_t merge(const char* output)
4440ab2e892SShuo Chen{
4454136e585SShuo Chen  Timer timer;
446270b6cceSShuo Chen  vector<unique_ptr<InputFile>> inputs;
4470ab2e892SShuo Chen  vector<Source> keys;
4480ab2e892SShuo Chen
4494136e585SShuo Chen  int64_t total = 0;
4500ab2e892SShuo Chen  for (int i = 0; i < kShards; ++i)
4510ab2e892SShuo Chen  {
4520ab2e892SShuo Chen    char buf[256];
4530ab2e892SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards);
4544136e585SShuo Chen    struct stat st;
455a6693141SShuo Chen    if (::stat(buf, &st) == 0)
4560ab2e892SShuo Chen    {
457a6693141SShuo Chen      total += st.st_size;
458270b6cceSShuo Chen      inputs.push_back(std::make_unique<InputFile>(buf));
459a6693141SShuo Chen      Source rec(inputs.back().get());
460a6693141SShuo Chen      if (rec.next())
461a6693141SShuo Chen      {
462a6693141SShuo Chen        keys.push_back(rec);
463a6693141SShuo Chen      }
464a6693141SShuo Chen      if (!keep)
465a6693141SShuo Chen        ::unlink(buf);
466a6693141SShuo Chen    }
467a6693141SShuo Chen    else
468a6693141SShuo Chen    {
469a6693141SShuo Chen      perror("Unable to stat file:");
4700ab2e892SShuo Chen    }
4710ab2e892SShuo Chen  }
4722a129a12SShuo Chen  LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total";
4730ab2e892SShuo Chen
4744136e585SShuo Chen  {
475270b6cceSShuo Chen  OutputFile out(output);
4760ab2e892SShuo Chen  std::make_heap(keys.begin(), keys.end());
4770ab2e892SShuo Chen  while (!keys.empty())
4780ab2e892SShuo Chen  {
4790ab2e892SShuo Chen    std::pop_heap(keys.begin(), keys.end());
480270b6cceSShuo Chen    keys.back().outputTo(&out);
4810ab2e892SShuo Chen
4820ab2e892SShuo Chen    if (keys.back().next())
4830ab2e892SShuo Chen    {
4840ab2e892SShuo Chen      std::push_heap(keys.begin(), keys.end());
4850ab2e892SShuo Chen    }
4860ab2e892SShuo Chen    else
4870ab2e892SShuo Chen    {
4880ab2e892SShuo Chen      keys.pop_back();
4890ab2e892SShuo Chen    }
4900ab2e892SShuo Chen  }
4914136e585SShuo Chen  }
492a251380aSShuo Chen  LOG_INFO << "Merging done " << timer.report(total);
4932a129a12SShuo Chen  return total;
4940ab2e892SShuo Chen}
4950ab2e892SShuo Chen
4960ab2e892SShuo Chenint main(int argc, char* argv[])
4970ab2e892SShuo Chen{
4980ab2e892SShuo Chen  /*
4990ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
5000ab2e892SShuo Chen  double t = now();
5014136e585SShuo Chen  int64_t len = count_shard(0, fd);
5024136e585SShuo Chen  double sec = now() - t;
5034136e585SShuo Chen  printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6);
5044136e585SShuo Chen  */
5050ab2e892SShuo Chen
5062a129a12SShuo Chen  int opt;
507a6693141SShuo Chen  const char* output = "output";
508a6693141SShuo Chen  while ((opt = getopt(argc, argv, "ko:s:t:v")) != -1)
5092a129a12SShuo Chen  {
5102a129a12SShuo Chen    switch (opt)
5112a129a12SShuo Chen    {
5122a129a12SShuo Chen      case 'k':
5132a129a12SShuo Chen        keep = true;
5142a129a12SShuo Chen        break;
515a6693141SShuo Chen      case 'o':
516a6693141SShuo Chen        output = optarg;
517a6693141SShuo Chen        break;
5182a129a12SShuo Chen      case 's':
5192a129a12SShuo Chen        kShards = atoi(optarg);
5202a129a12SShuo Chen        break;
521a6693141SShuo Chen      case 't':
522a6693141SShuo Chen        shard_dir = optarg;
523a6693141SShuo Chen        break;
5242a129a12SShuo Chen      case 'v':
5252a129a12SShuo Chen        verbose = true;
5262a129a12SShuo Chen        break;
5272a129a12SShuo Chen    }
5282a129a12SShuo Chen  }
5292a129a12SShuo Chen
5304136e585SShuo Chen  Timer timer;
531a251380aSShuo Chen  LOG_INFO << argc - optind << " input files, " << kShards << " shards, "
532a251380aSShuo Chen      << "output " << output <<" , temp " << shard_dir;
533a6693141SShuo Chen  int64_t input = 0;
534a6693141SShuo Chen  input = shard_(argc, argv);
5350ab2e892SShuo Chen  count_shards();
536a6693141SShuo Chen  int64_t output_size = merge(output);
537a6693141SShuo Chen  LOG_INFO << "All done " << timer.report(input) << " output " << output_size;
5380ab2e892SShuo Chen}
539