word_freq_shards_basic.cc revision 270b6cce
10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
144136e585SShuo Chen#include "absl/container/flat_hash_map.h"
152a129a12SShuo Chen#include "absl/strings/str_format.h"
162a129a12SShuo Chen#include "muduo/base/Logging.h"
174136e585SShuo Chen
180ab2e892SShuo Chen#include <algorithm>
19270b6cceSShuo Chen//#include <fstream>
20270b6cceSShuo Chen//#include <iostream>
210ab2e892SShuo Chen#include <memory>
220ab2e892SShuo Chen#include <string>
230ab2e892SShuo Chen#include <unordered_map>
240ab2e892SShuo Chen#include <vector>
250ab2e892SShuo Chen
262a129a12SShuo Chen#include <boost/program_options.hpp>
272a129a12SShuo Chen
280ab2e892SShuo Chen#include <fcntl.h>
290ab2e892SShuo Chen#include <string.h>
300ab2e892SShuo Chen#include <sys/mman.h>
314136e585SShuo Chen#include <sys/stat.h>
320ab2e892SShuo Chen#include <sys/time.h>
334136e585SShuo Chen#include <sys/times.h>
340ab2e892SShuo Chen#include <unistd.h>
350ab2e892SShuo Chen
360ab2e892SShuo Chenusing std::string;
370ab2e892SShuo Chenusing std::string_view;
380ab2e892SShuo Chenusing std::vector;
390ab2e892SShuo Chenusing std::unique_ptr;
400ab2e892SShuo Chen
41a6693141SShuo Chenconst int kBufferSize = 128 * 1024;
42a6693141SShuo Chen
430ab2e892SShuo Chenint kShards = 10;
442a129a12SShuo Chenbool verbose = false, keep = false;
45a6693141SShuo Chenconst char* shard_dir = ".";
460ab2e892SShuo Chen
470ab2e892SShuo Cheninline double now()
480ab2e892SShuo Chen{
490ab2e892SShuo Chen  struct timeval tv = { 0, 0 };
500ab2e892SShuo Chen  gettimeofday(&tv, nullptr);
510ab2e892SShuo Chen  return tv.tv_sec + tv.tv_usec / 1000000.0;
520ab2e892SShuo Chen}
530ab2e892SShuo Chen
544136e585SShuo Chenstruct CpuTime
554136e585SShuo Chen{
564136e585SShuo Chen  double userSeconds = 0.0;
574136e585SShuo Chen  double systemSeconds = 0.0;
584136e585SShuo Chen
594136e585SShuo Chen  double total() const { return userSeconds + systemSeconds; }
604136e585SShuo Chen};
614136e585SShuo Chen
624136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK));
634136e585SShuo Chen
644136e585SShuo ChenCpuTime cpuTime()
654136e585SShuo Chen{
664136e585SShuo Chen  CpuTime t;
674136e585SShuo Chen  struct tms tms;
684136e585SShuo Chen  if (::times(&tms) >= 0)
694136e585SShuo Chen  {
704136e585SShuo Chen    const double hz = static_cast<double>(g_clockTicks);
714136e585SShuo Chen    t.userSeconds = static_cast<double>(tms.tms_utime) / hz;
724136e585SShuo Chen    t.systemSeconds = static_cast<double>(tms.tms_stime) / hz;
734136e585SShuo Chen  }
744136e585SShuo Chen  return t;
754136e585SShuo Chen}
764136e585SShuo Chen
774136e585SShuo Chenclass Timer
784136e585SShuo Chen{
794136e585SShuo Chen public:
804136e585SShuo Chen  Timer()
814136e585SShuo Chen    : start_(now()),
824136e585SShuo Chen      start_cpu_(cpuTime())
834136e585SShuo Chen  {
844136e585SShuo Chen  }
854136e585SShuo Chen
862a129a12SShuo Chen  string report(int64_t bytes) const
874136e585SShuo Chen  {
884136e585SShuo Chen    CpuTime end_cpu(cpuTime());
894136e585SShuo Chen    double end = now();
90a6693141SShuo Chen    return absl::StrFormat("%.2fs real  %.2fs cpu  %.2f MiB/s  %ld bytes",
912a129a12SShuo Chen                           end - start_, end_cpu.total() - start_cpu_.total(),
922a129a12SShuo Chen                           bytes / (end - start_) / 1024 / 1024, bytes);
934136e585SShuo Chen  }
944136e585SShuo Chen private:
954136e585SShuo Chen  const double start_ = 0;
964136e585SShuo Chen  const CpuTime start_cpu_;
974136e585SShuo Chen};
984136e585SShuo Chen
990ab2e892SShuo Chenclass OutputFile // : boost::noncopyable
1000ab2e892SShuo Chen{
1010ab2e892SShuo Chen public:
1020ab2e892SShuo Chen  explicit OutputFile(const string& filename)
103270b6cceSShuo Chen    : file_(::fopen(filename.c_str(), "w"))
1040ab2e892SShuo Chen  {
1050ab2e892SShuo Chen    assert(file_);
1060ab2e892SShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
1070ab2e892SShuo Chen  }
1080ab2e892SShuo Chen
1090ab2e892SShuo Chen  ~OutputFile()
1100ab2e892SShuo Chen  {
1110ab2e892SShuo Chen    close();
1120ab2e892SShuo Chen  }
1130ab2e892SShuo Chen
114270b6cceSShuo Chen  void write(string_view s)
115270b6cceSShuo Chen  {
116270b6cceSShuo Chen    ::fwrite(s.data(), 1, s.size(), file_);
117270b6cceSShuo Chen  }
118270b6cceSShuo Chen
119270b6cceSShuo Chen  void appendRecord(string_view s)
1200ab2e892SShuo Chen  {
1210ab2e892SShuo Chen    assert(s.size() < 255);
1220ab2e892SShuo Chen    uint8_t len = s.size();
1230ab2e892SShuo Chen    ::fwrite(&len, 1, sizeof len, file_);
1240ab2e892SShuo Chen    ::fwrite(s.data(), 1, len, file_);
1250ab2e892SShuo Chen    ++items_;
1260ab2e892SShuo Chen  }
1270ab2e892SShuo Chen
1280ab2e892SShuo Chen  void flush()
1290ab2e892SShuo Chen  {
1300ab2e892SShuo Chen    ::fflush(file_);
1310ab2e892SShuo Chen  }
1320ab2e892SShuo Chen
1330ab2e892SShuo Chen  void close()
1340ab2e892SShuo Chen  {
1350ab2e892SShuo Chen    if (file_)
1360ab2e892SShuo Chen      ::fclose(file_);
1370ab2e892SShuo Chen    file_ = nullptr;
1380ab2e892SShuo Chen  }
1390ab2e892SShuo Chen
1400ab2e892SShuo Chen  int64_t tell()
1410ab2e892SShuo Chen  {
1420ab2e892SShuo Chen    return ::ftell(file_);
1430ab2e892SShuo Chen  }
1440ab2e892SShuo Chen
1450ab2e892SShuo Chen  int fd()
1460ab2e892SShuo Chen  {
1470ab2e892SShuo Chen    return ::fileno(file_);
1480ab2e892SShuo Chen  }
1490ab2e892SShuo Chen
1500ab2e892SShuo Chen  size_t items()
1510ab2e892SShuo Chen  {
1520ab2e892SShuo Chen    return items_;
1530ab2e892SShuo Chen  }
1540ab2e892SShuo Chen
1550ab2e892SShuo Chen private:
1560ab2e892SShuo Chen  FILE* file_;
157a6693141SShuo Chen  char buffer_[kBufferSize];
1580ab2e892SShuo Chen  size_t items_ = 0;
1590ab2e892SShuo Chen
1600ab2e892SShuo Chen  OutputFile(const OutputFile&) = delete;
1610ab2e892SShuo Chen  void operator=(const OutputFile&) = delete;
1620ab2e892SShuo Chen};
1630ab2e892SShuo Chen
164270b6cceSShuo Chenclass InputFile
165270b6cceSShuo Chen{
166270b6cceSShuo Chen public:
167270b6cceSShuo Chen  explicit InputFile(const char* filename)
168270b6cceSShuo Chen    : file_(::fopen(filename, "r"))
169270b6cceSShuo Chen  {
170270b6cceSShuo Chen    assert(file_);
171270b6cceSShuo Chen    ::setbuffer(file_, buffer_, sizeof buffer_);
172270b6cceSShuo Chen  }
173270b6cceSShuo Chen
174270b6cceSShuo Chen  ~InputFile()
175270b6cceSShuo Chen  {
176270b6cceSShuo Chen    close();
177270b6cceSShuo Chen  }
178270b6cceSShuo Chen
179270b6cceSShuo Chen  void close()
180270b6cceSShuo Chen  {
181270b6cceSShuo Chen    if (file_)
182270b6cceSShuo Chen      ::fclose(file_);
183270b6cceSShuo Chen    file_ = nullptr;
184270b6cceSShuo Chen  }
185270b6cceSShuo Chen
186270b6cceSShuo Chen  bool getline(string* output)
187270b6cceSShuo Chen  {
188270b6cceSShuo Chen    char buf[1024] = "";
189270b6cceSShuo Chen    if (fgets(buf, sizeof buf, file_))
190270b6cceSShuo Chen    {
191270b6cceSShuo Chen      size_t len = strlen(buf);
192270b6cceSShuo Chen      if (len > 0 && buf[len-1] == '\n')
193270b6cceSShuo Chen      {
194270b6cceSShuo Chen        buf[len-1] = '\0';
195270b6cceSShuo Chen        len--;
196270b6cceSShuo Chen      }
197270b6cceSShuo Chen      output->assign(buf, len);
198270b6cceSShuo Chen      return true;
199270b6cceSShuo Chen    }
200270b6cceSShuo Chen    return false;
201270b6cceSShuo Chen  }
202270b6cceSShuo Chen
203270b6cceSShuo Chen
204270b6cceSShuo Chen private:
205270b6cceSShuo Chen  FILE* file_;
206270b6cceSShuo Chen  char buffer_[32 * 1024 * 1024];
207270b6cceSShuo Chen
208270b6cceSShuo Chen  InputFile(const InputFile&) = delete;
209270b6cceSShuo Chen  void operator=(const InputFile&) = delete;
210270b6cceSShuo Chen};
211270b6cceSShuo Chen
2120ab2e892SShuo Chenclass Sharder // : boost::noncopyable
2130ab2e892SShuo Chen{
2140ab2e892SShuo Chen public:
2150ab2e892SShuo Chen  Sharder()
2160ab2e892SShuo Chen    : files_(kShards)
2170ab2e892SShuo Chen  {
2180ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
2190ab2e892SShuo Chen    {
2200ab2e892SShuo Chen      char name[256];
221a6693141SShuo Chen      snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards);
2220ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
2230ab2e892SShuo Chen    }
2240ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
2250ab2e892SShuo Chen  }
2260ab2e892SShuo Chen
2270ab2e892SShuo Chen  void output(string_view word)
2280ab2e892SShuo Chen  {
2290ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
230270b6cceSShuo Chen    files_[shard]->appendRecord(word);
2310ab2e892SShuo Chen  }
2320ab2e892SShuo Chen
2330ab2e892SShuo Chen  void finish()
2340ab2e892SShuo Chen  {
2354136e585SShuo Chen    int shard = 0;
2364136e585SShuo Chen    for (const auto& file : files_)
2370ab2e892SShuo Chen    {
2382a129a12SShuo Chen      // if (verbose)
2394136e585SShuo Chen      printf("  shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items());
2404136e585SShuo Chen      ++shard;
2414136e585SShuo Chen      file->close();
2420ab2e892SShuo Chen    }
2430ab2e892SShuo Chen  }
2440ab2e892SShuo Chen
2450ab2e892SShuo Chen private:
2460ab2e892SShuo Chen  std::hash<string_view> hash;
2470ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
2480ab2e892SShuo Chen};
2490ab2e892SShuo Chen
2504136e585SShuo Chenint64_t shard_(int argc, char* argv[])
2510ab2e892SShuo Chen{
2520ab2e892SShuo Chen  Sharder sharder;
2534136e585SShuo Chen  Timer timer;
2544136e585SShuo Chen  int64_t total = 0;
2552a129a12SShuo Chen  for (int i = optind; i < argc; ++i)
2560ab2e892SShuo Chen  {
2572a129a12SShuo Chen    LOG_INFO << "Processing input file " << argv[i];
2580ab2e892SShuo Chen    double t = now();
2590ab2e892SShuo Chen    char line[1024];
2600ab2e892SShuo Chen    FILE* fp = fopen(argv[i], "r");
261a6693141SShuo Chen    char buffer[kBufferSize];
2620ab2e892SShuo Chen    ::setbuffer(fp, buffer, sizeof buffer);
2630ab2e892SShuo Chen    while (fgets(line, sizeof line, fp))
2640ab2e892SShuo Chen    {
2650ab2e892SShuo Chen      size_t len = strlen(line);
2660ab2e892SShuo Chen      if (len > 0 && line[len-1] == '\n')
267270b6cceSShuo Chen      {
2680ab2e892SShuo Chen        line[len-1] = '\0';
269270b6cceSShuo Chen        len--;
270270b6cceSShuo Chen      }
271270b6cceSShuo Chen      sharder.output(string_view(line, len));
2720ab2e892SShuo Chen    }
2734136e585SShuo Chen    size_t len = ftell(fp);
2740ab2e892SShuo Chen    fclose(fp);
2754136e585SShuo Chen    total += len;
2760ab2e892SShuo Chen    double sec = now() - t;
2772a129a12SShuo Chen    LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024);
2780ab2e892SShuo Chen  }
2790ab2e892SShuo Chen  sharder.finish();
2802a129a12SShuo Chen  LOG_INFO << "Sharding done " << timer.report(total);
2814136e585SShuo Chen  return total;
2820ab2e892SShuo Chen}
2830ab2e892SShuo Chen
2840ab2e892SShuo Chen// ======= count_shards =======
2850ab2e892SShuo Chen
2862a129a12SShuo Chenint64_t count_shard(int shard, int fd, const char* output)
2870ab2e892SShuo Chen{
2880ab2e892SShuo Chen  const int64_t len = lseek(fd, 0, SEEK_END);
2890ab2e892SShuo Chen  lseek(fd, 0, SEEK_SET);
2900ab2e892SShuo Chen  double t = now();
2912a129a12SShuo Chen  LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len);
2924136e585SShuo Chen  {
2930ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
2940ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
2950ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
2960ab2e892SShuo Chen  const uint8_t* const end = start + len;
2970ab2e892SShuo Chen
2984136e585SShuo Chen  // std::unordered_map<string_view, uint64_t> items;
2994136e585SShuo Chen  absl::flat_hash_map<string_view, uint64_t> items;
3002a129a12SShuo Chen  int64_t count = 0;
3010ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
3020ab2e892SShuo Chen  {
3030ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
3040ab2e892SShuo Chen    items[s]++;
3050ab2e892SShuo Chen    p += 1 + *p;
3062a129a12SShuo Chen    ++count;
3070ab2e892SShuo Chen  }
308270b6cceSShuo Chen  LOG_INFO << "items " << count << " unique " << items.size();
3092a129a12SShuo Chen  if (verbose)
3100ab2e892SShuo Chen  printf("  count %.3f sec %ld items\n", now() - t, items.size());
3110ab2e892SShuo Chen
3120ab2e892SShuo Chen  t = now();
3130ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
3140ab2e892SShuo Chen  for (const auto& it : items)
3150ab2e892SShuo Chen  {
3160ab2e892SShuo Chen    if (it.second > 1)
3170ab2e892SShuo Chen      counts.push_back(make_pair(it.second, it.first));
3180ab2e892SShuo Chen  }
3192a129a12SShuo Chen  if (verbose)
3200ab2e892SShuo Chen  printf("  select %.3f sec %ld\n", now() - t, counts.size());
3210ab2e892SShuo Chen
3220ab2e892SShuo Chen  t = now();
3230ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
3242a129a12SShuo Chen  if (verbose)
3250ab2e892SShuo Chen  printf("  sort %.3f sec\n", now() - t);
3260ab2e892SShuo Chen
3270ab2e892SShuo Chen  t = now();
3280ab2e892SShuo Chen  {
329270b6cceSShuo Chen    OutputFile out(output);
3304136e585SShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
3310ab2e892SShuo Chen    {
332270b6cceSShuo Chen      string s(it->second);
333270b6cceSShuo Chen      out.write(absl::StrFormat("%d\t%s\n", it->first, s));  // FIXME %s with string_view doesn't work in C++17
334270b6cceSShuo Chen      /*
335270b6cceSShuo Chen      char buf[1024];
336270b6cceSShuo Chen      snprintf(buf, sizeof buf, "%zd\t%s\n",
337270b6cceSShuo Chen      out.write(buf);
338270b6cceSShuo Chen      */
3394136e585SShuo Chen    }
340270b6cceSShuo Chen
3414136e585SShuo Chen    for (const auto& it : items)
3424136e585SShuo Chen    {
3434136e585SShuo Chen      if (it.second == 1)
3444136e585SShuo Chen      {
345270b6cceSShuo Chen        string s(it.first);
346270b6cceSShuo Chen        // FIXME: bug of absl?
347270b6cceSShuo Chen        // out.write(absl::StrCat("1\t", s, "\n"));
348270b6cceSShuo Chen        out.write(absl::StrFormat("1\t%s\n", s));
349270b6cceSShuo Chen
3504136e585SShuo Chen      }
3510ab2e892SShuo Chen    }
3520ab2e892SShuo Chen  }
3532a129a12SShuo Chen  //if (verbose)
3542a129a12SShuo Chen  //printf("  output %.3f sec %lu\n", now() - t, st.st_size);
3550ab2e892SShuo Chen
3564136e585SShuo Chen  t = now();
3570ab2e892SShuo Chen  if (munmap(mapped, len))
3580ab2e892SShuo Chen    perror("munmap");
3594136e585SShuo Chen  }
3602a129a12SShuo Chen  // printf("  destruct %.3f sec\n", now() - t);
3614136e585SShuo Chen  return len;
3620ab2e892SShuo Chen}
3630ab2e892SShuo Chen
3640ab2e892SShuo Chenvoid count_shards()
3650ab2e892SShuo Chen{
3664136e585SShuo Chen  Timer timer;
3674136e585SShuo Chen  int64_t total = 0;
3680ab2e892SShuo Chen  for (int shard = 0; shard < kShards; ++shard)
3690ab2e892SShuo Chen  {
3702a129a12SShuo Chen    Timer timer;
3710ab2e892SShuo Chen    char buf[256];
372a6693141SShuo Chen    snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards);
3730ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
3742a129a12SShuo Chen    if (!keep)
3750ab2e892SShuo Chen    ::unlink(buf);
3762a129a12SShuo Chen
3772a129a12SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards);
3782a129a12SShuo Chen    int64_t len = count_shard(shard, fd, buf);
3792a129a12SShuo Chen    ::close(fd);
3804136e585SShuo Chen    total += len;
3812a129a12SShuo Chen    struct stat st;
3822a129a12SShuo Chen    ::stat(buf, &st);
3832a129a12SShuo Chen    LOG_INFO << "shard " << shard << " done " << timer.report(len) << " output " << st.st_size;
3840ab2e892SShuo Chen  }
385270b6cceSShuo Chen  LOG_INFO << "Counting done "<< timer.report(total);
3860ab2e892SShuo Chen}
3870ab2e892SShuo Chen
3880ab2e892SShuo Chen// ======= merge =======
3890ab2e892SShuo Chen
3900ab2e892SShuo Chenclass Source  // copyable
3910ab2e892SShuo Chen{
3920ab2e892SShuo Chen public:
393270b6cceSShuo Chen  explicit Source(InputFile* in)
3940ab2e892SShuo Chen    : in_(in),
3950ab2e892SShuo Chen      count_(0),
3960ab2e892SShuo Chen      word_()
3970ab2e892SShuo Chen  {
3980ab2e892SShuo Chen  }
3990ab2e892SShuo Chen
4000ab2e892SShuo Chen  bool next()
4010ab2e892SShuo Chen  {
4020ab2e892SShuo Chen    string line;
403270b6cceSShuo Chen    if (in_->getline(&line))
4040ab2e892SShuo Chen    {
4050ab2e892SShuo Chen      size_t tab = line.find('\t');
4060ab2e892SShuo Chen      if (tab != string::npos)
4070ab2e892SShuo Chen      {
4080ab2e892SShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
4090ab2e892SShuo Chen        if (count_ > 0)
4100ab2e892SShuo Chen        {
4110ab2e892SShuo Chen          word_ = line.substr(tab+1);
4120ab2e892SShuo Chen          return true;
4130ab2e892SShuo Chen        }
4140ab2e892SShuo Chen      }
4150ab2e892SShuo Chen    }
4160ab2e892SShuo Chen    return false;
4170ab2e892SShuo Chen  }
4180ab2e892SShuo Chen
4190ab2e892SShuo Chen  bool operator<(const Source& rhs) const
4200ab2e892SShuo Chen  {
4210ab2e892SShuo Chen    return count_ < rhs.count_;
4220ab2e892SShuo Chen  }
4230ab2e892SShuo Chen
424270b6cceSShuo Chen  void outputTo(OutputFile* out) const
4250ab2e892SShuo Chen  {
426270b6cceSShuo Chen    out->write(absl::StrFormat("%d\t%s\n", count_, word_));
4270ab2e892SShuo Chen  }
4280ab2e892SShuo Chen
4290ab2e892SShuo Chen private:
430270b6cceSShuo Chen  InputFile* in_;  // not owned
4310ab2e892SShuo Chen  int64_t count_;
4320ab2e892SShuo Chen  string word_;
4330ab2e892SShuo Chen};
4340ab2e892SShuo Chen
4352a129a12SShuo Chenint64_t merge(const char* output)
4360ab2e892SShuo Chen{
4374136e585SShuo Chen  Timer timer;
438270b6cceSShuo Chen  vector<unique_ptr<InputFile>> inputs;
4390ab2e892SShuo Chen  vector<Source> keys;
4400ab2e892SShuo Chen
4414136e585SShuo Chen  int64_t total = 0;
4420ab2e892SShuo Chen  for (int i = 0; i < kShards; ++i)
4430ab2e892SShuo Chen  {
4440ab2e892SShuo Chen    char buf[256];
4450ab2e892SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards);
4464136e585SShuo Chen    struct stat st;
447a6693141SShuo Chen    if (::stat(buf, &st) == 0)
4480ab2e892SShuo Chen    {
449a6693141SShuo Chen      total += st.st_size;
450270b6cceSShuo Chen      inputs.push_back(std::make_unique<InputFile>(buf));
451a6693141SShuo Chen      Source rec(inputs.back().get());
452a6693141SShuo Chen      if (rec.next())
453a6693141SShuo Chen      {
454a6693141SShuo Chen        keys.push_back(rec);
455a6693141SShuo Chen      }
456a6693141SShuo Chen      if (!keep)
457a6693141SShuo Chen        ::unlink(buf);
458a6693141SShuo Chen    }
459a6693141SShuo Chen    else
460a6693141SShuo Chen    {
461a6693141SShuo Chen      perror("Unable to stat file:");
4620ab2e892SShuo Chen    }
4630ab2e892SShuo Chen  }
4642a129a12SShuo Chen  LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total";
4650ab2e892SShuo Chen
4664136e585SShuo Chen  {
467270b6cceSShuo Chen  OutputFile out(output);
4680ab2e892SShuo Chen  std::make_heap(keys.begin(), keys.end());
4690ab2e892SShuo Chen  while (!keys.empty())
4700ab2e892SShuo Chen  {
4710ab2e892SShuo Chen    std::pop_heap(keys.begin(), keys.end());
472270b6cceSShuo Chen    keys.back().outputTo(&out);
4730ab2e892SShuo Chen
4740ab2e892SShuo Chen    if (keys.back().next())
4750ab2e892SShuo Chen    {
4760ab2e892SShuo Chen      std::push_heap(keys.begin(), keys.end());
4770ab2e892SShuo Chen    }
4780ab2e892SShuo Chen    else
4790ab2e892SShuo Chen    {
4800ab2e892SShuo Chen      keys.pop_back();
4810ab2e892SShuo Chen    }
4820ab2e892SShuo Chen  }
4834136e585SShuo Chen  }
4842a129a12SShuo Chen  LOG_INFO << "merging done " << timer.report(total);
4852a129a12SShuo Chen  return total;
4860ab2e892SShuo Chen}
4870ab2e892SShuo Chen
4880ab2e892SShuo Chenint main(int argc, char* argv[])
4890ab2e892SShuo Chen{
4900ab2e892SShuo Chen  /*
4910ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
4920ab2e892SShuo Chen  double t = now();
4934136e585SShuo Chen  int64_t len = count_shard(0, fd);
4944136e585SShuo Chen  double sec = now() - t;
4954136e585SShuo Chen  printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6);
4964136e585SShuo Chen  */
4970ab2e892SShuo Chen
4982a129a12SShuo Chen  int opt;
499a6693141SShuo Chen  const char* output = "output";
500a6693141SShuo Chen  while ((opt = getopt(argc, argv, "ko:s:t:v")) != -1)
5012a129a12SShuo Chen  {
5022a129a12SShuo Chen    switch (opt)
5032a129a12SShuo Chen    {
5042a129a12SShuo Chen      case 'k':
5052a129a12SShuo Chen        keep = true;
5062a129a12SShuo Chen        break;
507a6693141SShuo Chen      case 'o':
508a6693141SShuo Chen        output = optarg;
509a6693141SShuo Chen        break;
5102a129a12SShuo Chen      case 's':
5112a129a12SShuo Chen        kShards = atoi(optarg);
5122a129a12SShuo Chen        break;
513a6693141SShuo Chen      case 't':
514a6693141SShuo Chen        shard_dir = optarg;
515a6693141SShuo Chen        break;
5162a129a12SShuo Chen      case 'v':
5172a129a12SShuo Chen        verbose = true;
5182a129a12SShuo Chen        break;
5192a129a12SShuo Chen    }
5202a129a12SShuo Chen  }
5212a129a12SShuo Chen
5224136e585SShuo Chen  Timer timer;
523a6693141SShuo Chen  LOG_INFO << argc - optind << " input files, " << kShards << " shards, output " << output <<" , temp " << shard_dir;
524a6693141SShuo Chen  int64_t input = 0;
525a6693141SShuo Chen  input = shard_(argc, argv);
5260ab2e892SShuo Chen  count_shards();
527a6693141SShuo Chen  int64_t output_size = merge(output);
528a6693141SShuo Chen  LOG_INFO << "All done " << timer.report(input) << " output " << output_size;
5290ab2e892SShuo Chen}
530