word_freq_shards_basic.cc revision 3a0488b5
10ab2e892SShuo Chen/* sort word by frequency, sharding version.
20ab2e892SShuo Chen
30ab2e892SShuo Chen  1. read input file, shard to N files:
40ab2e892SShuo Chen       word
50ab2e892SShuo Chen  2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files:
60ab2e892SShuo Chen       count \t word
70ab2e892SShuo Chen  3. merge N count files using heap.
80ab2e892SShuo Chen
90ab2e892SShuo ChenLimits: each shard must fit in memory.
100ab2e892SShuo Chen*/
110ab2e892SShuo Chen
120ab2e892SShuo Chen#include <assert.h>
130ab2e892SShuo Chen
1485147189SShuo Chen#include "file.h"
1585147189SShuo Chen#include "timer.h"
1685147189SShuo Chen
174136e585SShuo Chen#include "absl/container/flat_hash_map.h"
182cf09315SShuo Chen#include "absl/hash/hash.h"
192a129a12SShuo Chen#include "absl/strings/str_format.h"
202a129a12SShuo Chen#include "muduo/base/Logging.h"
21a251380aSShuo Chen#include "muduo/base/ThreadPool.h"
224136e585SShuo Chen
230ab2e892SShuo Chen#include <algorithm>
240ab2e892SShuo Chen#include <memory>
250ab2e892SShuo Chen#include <string>
260ab2e892SShuo Chen#include <unordered_map>
270ab2e892SShuo Chen#include <vector>
280ab2e892SShuo Chen
290ab2e892SShuo Chen#include <fcntl.h>
300ab2e892SShuo Chen#include <string.h>
310ab2e892SShuo Chen#include <sys/mman.h>
324136e585SShuo Chen#include <sys/stat.h>
330ab2e892SShuo Chen#include <unistd.h>
340ab2e892SShuo Chen
352cf09315SShuo Chenusing absl::string_view;
360ab2e892SShuo Chenusing std::string;
370ab2e892SShuo Chenusing std::vector;
380ab2e892SShuo Chenusing std::unique_ptr;
390ab2e892SShuo Chen
4085147189SShuo Chenint kShards = 10, kThreads = 4;
4185147189SShuo Chenbool g_verbose = false, g_keep = false;
42a6693141SShuo Chenconst char* shard_dir = ".";
4385147189SShuo Chenconst char* g_output = "output";
44270b6cceSShuo Chen
450ab2e892SShuo Chenclass Sharder // : boost::noncopyable
460ab2e892SShuo Chen{
470ab2e892SShuo Chen public:
480ab2e892SShuo Chen  Sharder()
490ab2e892SShuo Chen    : files_(kShards)
500ab2e892SShuo Chen  {
510ab2e892SShuo Chen    for (int i = 0; i < kShards; ++i)
520ab2e892SShuo Chen    {
530ab2e892SShuo Chen      char name[256];
54a6693141SShuo Chen      snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards);
550ab2e892SShuo Chen      files_[i].reset(new OutputFile(name));
560ab2e892SShuo Chen    }
570ab2e892SShuo Chen    assert(files_.size() == static_cast<size_t>(kShards));
580ab2e892SShuo Chen  }
590ab2e892SShuo Chen
600ab2e892SShuo Chen  void output(string_view word)
610ab2e892SShuo Chen  {
620ab2e892SShuo Chen    size_t shard = hash(word) % files_.size();
63270b6cceSShuo Chen    files_[shard]->appendRecord(word);
640ab2e892SShuo Chen  }
650ab2e892SShuo Chen
660ab2e892SShuo Chen  void finish()
670ab2e892SShuo Chen  {
684136e585SShuo Chen    int shard = 0;
694136e585SShuo Chen    for (const auto& file : files_)
700ab2e892SShuo Chen    {
7185147189SShuo Chen      // if (g_verbose)
724136e585SShuo Chen      printf("  shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items());
734136e585SShuo Chen      ++shard;
744136e585SShuo Chen      file->close();
750ab2e892SShuo Chen    }
760ab2e892SShuo Chen  }
770ab2e892SShuo Chen
780ab2e892SShuo Chen private:
792cf09315SShuo Chen  absl::Hash<string_view> hash;
800ab2e892SShuo Chen  vector<unique_ptr<OutputFile>> files_;
810ab2e892SShuo Chen};
820ab2e892SShuo Chen
834136e585SShuo Chenint64_t shard_(int argc, char* argv[])
840ab2e892SShuo Chen{
850ab2e892SShuo Chen  Sharder sharder;
864136e585SShuo Chen  Timer timer;
874136e585SShuo Chen  int64_t total = 0;
882a129a12SShuo Chen  for (int i = optind; i < argc; ++i)
890ab2e892SShuo Chen  {
902a129a12SShuo Chen    LOG_INFO << "Processing input file " << argv[i];
9185147189SShuo Chen    double t = Timer::now();
9285147189SShuo Chen    string line;
9385147189SShuo Chen    InputFile input(argv[i]);
9485147189SShuo Chen    while (input.getline(&line))
950ab2e892SShuo Chen    {
9685147189SShuo Chen      sharder.output(line);
970ab2e892SShuo Chen    }
9885147189SShuo Chen    size_t len = input.tell();
994136e585SShuo Chen    total += len;
10085147189SShuo Chen    double sec = Timer::now() - t;
1012a129a12SShuo Chen    LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024);
1020ab2e892SShuo Chen  }
1030ab2e892SShuo Chen  sharder.finish();
1042a129a12SShuo Chen  LOG_INFO << "Sharding done " << timer.report(total);
1054136e585SShuo Chen  return total;
1060ab2e892SShuo Chen}
1070ab2e892SShuo Chen
1080ab2e892SShuo Chen// ======= count_shards =======
1090ab2e892SShuo Chen
110ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len)
1110ab2e892SShuo Chen{
112ecd7048bSShuo Chen  Timer timer;
113ecd7048bSShuo Chen
11485147189SShuo Chen  double t = Timer::now();
1152a129a12SShuo Chen  LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len);
1164136e585SShuo Chen  {
1170ab2e892SShuo Chen  void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0);
1180ab2e892SShuo Chen  assert(mapped != MAP_FAILED);
1190ab2e892SShuo Chen  const uint8_t* const start = static_cast<const uint8_t*>(mapped);
1200ab2e892SShuo Chen  const uint8_t* const end = start + len;
1210ab2e892SShuo Chen
1224136e585SShuo Chen  // std::unordered_map<string_view, uint64_t> items;
1234136e585SShuo Chen  absl::flat_hash_map<string_view, uint64_t> items;
1242a129a12SShuo Chen  int64_t count = 0;
1250ab2e892SShuo Chen  for (const uint8_t* p = start; p < end;)
1260ab2e892SShuo Chen  {
1270ab2e892SShuo Chen    string_view s((const char*)p+1, *p);
1280ab2e892SShuo Chen    items[s]++;
1290ab2e892SShuo Chen    p += 1 + *p;
1302a129a12SShuo Chen    ++count;
1310ab2e892SShuo Chen  }
132270b6cceSShuo Chen  LOG_INFO << "items " << count << " unique " << items.size();
13385147189SShuo Chen  if (g_verbose)
13485147189SShuo Chen  printf("  count %.3f sec %ld items\n", Timer::now() - t, items.size());
1350ab2e892SShuo Chen
13685147189SShuo Chen  t = Timer::now();
1370ab2e892SShuo Chen  vector<std::pair<size_t, string_view>> counts;
1380ab2e892SShuo Chen  for (const auto& it : items)
1390ab2e892SShuo Chen  {
1400ab2e892SShuo Chen    if (it.second > 1)
1412cf09315SShuo Chen      counts.push_back(std::make_pair(it.second, it.first));
1420ab2e892SShuo Chen  }
14385147189SShuo Chen  if (g_verbose)
14485147189SShuo Chen  printf("  select %.3f sec %ld\n", Timer::now() - t, counts.size());
1450ab2e892SShuo Chen
14685147189SShuo Chen  t = Timer::now();
1470ab2e892SShuo Chen  std::sort(counts.begin(), counts.end());
14885147189SShuo Chen  if (g_verbose)
14985147189SShuo Chen  printf("  sort %.3f sec\n", Timer::now() - t);
1500ab2e892SShuo Chen
15185147189SShuo Chen  t = Timer::now();
152c377920eSShuo Chen  int64_t out_len = 0;
1530ab2e892SShuo Chen  {
154ecd7048bSShuo Chen    char buf[256];
155ecd7048bSShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards);
156ecd7048bSShuo Chen    OutputFile output(buf);
157ecd7048bSShuo Chen
1584136e585SShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
1590ab2e892SShuo Chen    {
160c377920eSShuo Chen      output.write(absl::StrFormat("%d\t%s\n", it->first, it->second));
1614136e585SShuo Chen    }
162270b6cceSShuo Chen
1634136e585SShuo Chen    for (const auto& it : items)
1644136e585SShuo Chen    {
1654136e585SShuo Chen      if (it.second == 1)
1664136e585SShuo Chen      {
167c377920eSShuo Chen        output.write(absl::StrFormat("1\t%s\n", it.first));
1684136e585SShuo Chen      }
1690ab2e892SShuo Chen    }
170c377920eSShuo Chen    out_len = output.tell();
1710ab2e892SShuo Chen  }
172c377920eSShuo Chen  if (g_verbose)
173c377920eSShuo Chen  printf("  output %.3f sec %lu\n", Timer::now() - t, out_len);
1740ab2e892SShuo Chen
1750ab2e892SShuo Chen  if (munmap(mapped, len))
1760ab2e892SShuo Chen    perror("munmap");
1774136e585SShuo Chen  }
178ecd7048bSShuo Chen  ::close(fd);
179ecd7048bSShuo Chen  LOG_INFO << "shard " << shard << " done " << timer.report(len);
1800ab2e892SShuo Chen}
1810ab2e892SShuo Chen
18285147189SShuo Chenvoid count_shards(int shards)
1830ab2e892SShuo Chen{
18485147189SShuo Chen  assert(shards <= kShards);
1854136e585SShuo Chen  Timer timer;
1864136e585SShuo Chen  int64_t total = 0;
187a251380aSShuo Chen  muduo::ThreadPool threadPool;
18885147189SShuo Chen  threadPool.setMaxQueueSize(2*kThreads);
18985147189SShuo Chen  threadPool.start(kThreads);
19085147189SShuo Chen
19185147189SShuo Chen  for (int shard = 0; shard < shards; ++shard)
1920ab2e892SShuo Chen  {
1930ab2e892SShuo Chen    char buf[256];
194a6693141SShuo Chen    snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards);
1950ab2e892SShuo Chen    int fd = open(buf, O_RDONLY);
196ecd7048bSShuo Chen    assert(fd >= 0);
19785147189SShuo Chen    if (!g_keep)
198ecd7048bSShuo Chen      ::unlink(buf);
1992a129a12SShuo Chen
200ecd7048bSShuo Chen    struct stat st;
201ecd7048bSShuo Chen    if (::fstat(fd, &st) == 0)
202ecd7048bSShuo Chen    {
203ecd7048bSShuo Chen      size_t len = st.st_size;
204ecd7048bSShuo Chen      total += len;
205ecd7048bSShuo Chen      threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); });
206ecd7048bSShuo Chen    }
207a251380aSShuo Chen  }
208a251380aSShuo Chen  while (threadPool.queueSize() > 0)
209a251380aSShuo Chen  {
21085147189SShuo Chen    LOG_DEBUG << "waiting for ThreadPool " << threadPool.queueSize();
211ecd7048bSShuo Chen    muduo::CurrentThread::sleepUsec(1000*1000);
2120ab2e892SShuo Chen  }
213a251380aSShuo Chen  threadPool.stop();
214270b6cceSShuo Chen  LOG_INFO << "Counting done "<< timer.report(total);
2150ab2e892SShuo Chen}
2160ab2e892SShuo Chen
2170ab2e892SShuo Chen// ======= merge =======
2180ab2e892SShuo Chen
2193a0488b5SShuo Chenclass TextInput
2200ab2e892SShuo Chen{
2210ab2e892SShuo Chen public:
2223a0488b5SShuo Chen  explicit TextInput(const char* filename, int buffer_size = 8 * 1024 * 1024)
2233a0488b5SShuo Chen    : fd_(::open(filename, O_RDONLY)),
2243a0488b5SShuo Chen      buffer_size_(buffer_size),
2253a0488b5SShuo Chen      block_(new Block)
2260ab2e892SShuo Chen  {
2273a0488b5SShuo Chen    assert(fd_ >= 0);
2283a0488b5SShuo Chen    block_->data.reset(new char[buffer_size_]);
2293a0488b5SShuo Chen    refill();
2300ab2e892SShuo Chen  }
2310ab2e892SShuo Chen
2323a0488b5SShuo Chen  ~TextInput()
2330ab2e892SShuo Chen  {
2343a0488b5SShuo Chen    ::close(fd_);
2353a0488b5SShuo Chen  }
2363a0488b5SShuo Chen
2373a0488b5SShuo Chen  absl::string_view line() const { return line_; }
2383a0488b5SShuo Chen
2393a0488b5SShuo Chen  bool next(int64_t* count)
2403a0488b5SShuo Chen  {
2413a0488b5SShuo Chen    // EOF
2423a0488b5SShuo Chen    if (block_->records.empty())
2433a0488b5SShuo Chen    {
2443a0488b5SShuo Chen      return false;
2453a0488b5SShuo Chen    }
2463a0488b5SShuo Chen
2473a0488b5SShuo Chen    if (index_ < block_->records.size())
2483a0488b5SShuo Chen    {
2493a0488b5SShuo Chen      const Record& rec = block_->records[index_];
2503a0488b5SShuo Chen      *count = rec.count;
2513a0488b5SShuo Chen      line_ = absl::string_view(block_->data.get() + rec.offset, rec.len);
2523a0488b5SShuo Chen      ++index_;
2533a0488b5SShuo Chen      return true;
2543a0488b5SShuo Chen    }
2553a0488b5SShuo Chen    else
2563a0488b5SShuo Chen    {
2573a0488b5SShuo Chen      refill();
2583a0488b5SShuo Chen      index_ = 0;
2593a0488b5SShuo Chen      return next(count);
2603a0488b5SShuo Chen    }
2613a0488b5SShuo Chen  }
2623a0488b5SShuo Chen
2633a0488b5SShuo Chen private:
2643a0488b5SShuo Chen
2653a0488b5SShuo Chen  struct Record
2663a0488b5SShuo Chen  {
2673a0488b5SShuo Chen    int64_t count = 0;
2683a0488b5SShuo Chen    int32_t offset = 0, len = 0;
2693a0488b5SShuo Chen  };
2703a0488b5SShuo Chen
2713a0488b5SShuo Chen  struct Block
2723a0488b5SShuo Chen  {
2733a0488b5SShuo Chen    std::unique_ptr<char[]> data;
2743a0488b5SShuo Chen    std::vector<Record> records;
2753a0488b5SShuo Chen  };
2763a0488b5SShuo Chen
2773a0488b5SShuo Chen  void refill()
2783a0488b5SShuo Chen  {
2793a0488b5SShuo Chen    block_->records.clear();
2803a0488b5SShuo Chen    char* data = block_->data.get();
2813a0488b5SShuo Chen    ssize_t nr = ::pread(fd_, data, buffer_size_, pos_);
2823a0488b5SShuo Chen    if (nr > 0)
2830ab2e892SShuo Chen    {
2843a0488b5SShuo Chen      char* start = data;
2853a0488b5SShuo Chen      size_t len = nr;
2863a0488b5SShuo Chen      char* nl = static_cast<char*>(::memchr(start, '\n', len));
2873a0488b5SShuo Chen      while (nl)
2880ab2e892SShuo Chen      {
2893a0488b5SShuo Chen        Record rec;
2903a0488b5SShuo Chen        rec.count = strtol(start, NULL, 10);
2913a0488b5SShuo Chen        rec.offset = start - data;
2923a0488b5SShuo Chen        rec.len = nl - start + 1;
2933a0488b5SShuo Chen        block_->records.push_back(rec);
2943a0488b5SShuo Chen        start = nl+1;
2953a0488b5SShuo Chen        len -= rec.len;
2963a0488b5SShuo Chen        nl = static_cast<char*>(::memchr(start, '\n', len));
2970ab2e892SShuo Chen      }
2983a0488b5SShuo Chen      pos_ += start - data;
2990ab2e892SShuo Chen    }
3000ab2e892SShuo Chen  }
3010ab2e892SShuo Chen
3023a0488b5SShuo Chen  const int fd_;
3033a0488b5SShuo Chen  const int buffer_size_;
3043a0488b5SShuo Chen  int64_t pos_ = 0;  // file position
3053a0488b5SShuo Chen  size_t index_ = 0; // index into block_
3063a0488b5SShuo Chen  std::unique_ptr<Block> block_;
3073a0488b5SShuo Chen  absl::string_view line_;
3083a0488b5SShuo Chen
3093a0488b5SShuo Chen  TextInput(const TextInput&) = delete;
3103a0488b5SShuo Chen  void operator=(const TextInput&) = delete;
3113a0488b5SShuo Chen};
3123a0488b5SShuo Chen
3133a0488b5SShuo Chenclass Source  // copyable
3143a0488b5SShuo Chen{
3153a0488b5SShuo Chen public:
3163a0488b5SShuo Chen  explicit Source(TextInput* in)
3173a0488b5SShuo Chen    : input_(in)
3180ab2e892SShuo Chen  {
3190ab2e892SShuo Chen  }
3200ab2e892SShuo Chen
3213a0488b5SShuo Chen  bool next()
3220ab2e892SShuo Chen  {
3233a0488b5SShuo Chen    return input_->next(&count_);
3240ab2e892SShuo Chen  }
3250ab2e892SShuo Chen
3263a0488b5SShuo Chen  bool operator<(const Source& rhs) const
32785147189SShuo Chen  {
3283a0488b5SShuo Chen    return count_ < rhs.count_;
32985147189SShuo Chen  }
33085147189SShuo Chen
3313a0488b5SShuo Chen  absl::string_view line() const { return input_->line(); }
3323a0488b5SShuo Chen
3330ab2e892SShuo Chen private:
3343a0488b5SShuo Chen  TextInput* input_;  // not owned
3353a0488b5SShuo Chen  int64_t count_ = 0;
3360ab2e892SShuo Chen};
3370ab2e892SShuo Chen
33885147189SShuo Chenint64_t merge()
3390ab2e892SShuo Chen{
3404136e585SShuo Chen  Timer timer;
3413a0488b5SShuo Chen  vector<unique_ptr<TextInput>> inputs;
3420ab2e892SShuo Chen  vector<Source> keys;
3430ab2e892SShuo Chen
3444136e585SShuo Chen  int64_t total = 0;
3450ab2e892SShuo Chen  for (int i = 0; i < kShards; ++i)
3460ab2e892SShuo Chen  {
3470ab2e892SShuo Chen    char buf[256];
3480ab2e892SShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards);
3494136e585SShuo Chen    struct stat st;
350a6693141SShuo Chen    if (::stat(buf, &st) == 0)
3510ab2e892SShuo Chen    {
352a6693141SShuo Chen      total += st.st_size;
35385147189SShuo Chen      // TODO: select buffer size based on kShards.
3543a0488b5SShuo Chen      inputs.push_back(std::make_unique<TextInput>(buf));
355a6693141SShuo Chen      Source rec(inputs.back().get());
356a6693141SShuo Chen      if (rec.next())
357a6693141SShuo Chen      {
358a6693141SShuo Chen        keys.push_back(rec);
359a6693141SShuo Chen      }
36085147189SShuo Chen      if (!g_keep)
361a6693141SShuo Chen        ::unlink(buf);
362a6693141SShuo Chen    }
363a6693141SShuo Chen    else
364a6693141SShuo Chen    {
365a6693141SShuo Chen      perror("Unable to stat file:");
3660ab2e892SShuo Chen    }
3670ab2e892SShuo Chen  }
3682a129a12SShuo Chen  LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total";
3690ab2e892SShuo Chen
3703a0488b5SShuo Chen  int64_t lines = 0;
3714136e585SShuo Chen  {
37285147189SShuo Chen  OutputFile out(g_output);
37385147189SShuo Chen
3740ab2e892SShuo Chen  std::make_heap(keys.begin(), keys.end());
3750ab2e892SShuo Chen  while (!keys.empty())
3760ab2e892SShuo Chen  {
3770ab2e892SShuo Chen    std::pop_heap(keys.begin(), keys.end());
3783a0488b5SShuo Chen    out.write(keys.back().line());
3793a0488b5SShuo Chen    ++lines;
3800ab2e892SShuo Chen
3810ab2e892SShuo Chen    if (keys.back().next())
3820ab2e892SShuo Chen    {
3830ab2e892SShuo Chen      std::push_heap(keys.begin(), keys.end());
3840ab2e892SShuo Chen    }
3850ab2e892SShuo Chen    else
3860ab2e892SShuo Chen    {
3870ab2e892SShuo Chen      keys.pop_back();
3880ab2e892SShuo Chen    }
3890ab2e892SShuo Chen  }
3903a0488b5SShuo Chen
3914136e585SShuo Chen  }
3923a0488b5SShuo Chen  LOG_INFO << "Merging done " << timer.report(total) << " lines " << lines;
3932a129a12SShuo Chen  return total;
3940ab2e892SShuo Chen}
3950ab2e892SShuo Chen
3960ab2e892SShuo Chenint main(int argc, char* argv[])
3970ab2e892SShuo Chen{
3980ab2e892SShuo Chen  /*
3990ab2e892SShuo Chen  int fd = open("shard-00000-of-00010", O_RDONLY);
40085147189SShuo Chen  double t = Timer::now();
4014136e585SShuo Chen  int64_t len = count_shard(0, fd);
40285147189SShuo Chen  double sec = Timer::now() - t;
4034136e585SShuo Chen  printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6);
4044136e585SShuo Chen  */
4053e607da5SShuo Chen  setlocale(LC_NUMERIC, "");
4060ab2e892SShuo Chen
4072a129a12SShuo Chen  int opt;
40885147189SShuo Chen  int count_only = 0;
40985147189SShuo Chen  bool merge_only = false;
41085147189SShuo Chen  while ((opt = getopt(argc, argv, "c:kmo:p:s:t:v")) != -1)
4112a129a12SShuo Chen  {
4122a129a12SShuo Chen    switch (opt)
4132a129a12SShuo Chen    {
41485147189SShuo Chen      case 'c':
41585147189SShuo Chen        count_only = atoi(optarg);
41685147189SShuo Chen        break;
4172a129a12SShuo Chen      case 'k':
41885147189SShuo Chen        g_keep = true;
41985147189SShuo Chen        break;
42085147189SShuo Chen      case 'm':
42185147189SShuo Chen        merge_only = true;
4222a129a12SShuo Chen        break;
423a6693141SShuo Chen      case 'o':
42485147189SShuo Chen        g_output = optarg;
42585147189SShuo Chen        break;
42685147189SShuo Chen      case 'p':  // Path for temp shard files
42785147189SShuo Chen        shard_dir = optarg;
428a6693141SShuo Chen        break;
4292a129a12SShuo Chen      case 's':
4302a129a12SShuo Chen        kShards = atoi(optarg);
4312a129a12SShuo Chen        break;
432a6693141SShuo Chen      case 't':
43385147189SShuo Chen        kThreads = atoi(optarg);
434a6693141SShuo Chen        break;
4352a129a12SShuo Chen      case 'v':
43685147189SShuo Chen        g_verbose = true;
4372a129a12SShuo Chen        break;
4382a129a12SShuo Chen    }
4392a129a12SShuo Chen  }
4402a129a12SShuo Chen
44185147189SShuo Chen  if (count_only > 0 || merge_only)
44285147189SShuo Chen  {
44385147189SShuo Chen    g_keep = true;
4443a0488b5SShuo Chen    //g_verbose = true;
44585147189SShuo Chen    count_only = std::min(count_only, kShards);
44685147189SShuo Chen
44785147189SShuo Chen    if (count_only > 0)
44885147189SShuo Chen    {
44985147189SShuo Chen      count_shards(count_only);
45085147189SShuo Chen    }
45185147189SShuo Chen
45285147189SShuo Chen    if (merge_only)
45385147189SShuo Chen    {
45485147189SShuo Chen      merge();
45585147189SShuo Chen    }
45685147189SShuo Chen  }
45785147189SShuo Chen  else
45885147189SShuo Chen  {
45985147189SShuo Chen    // Run all three steps
46085147189SShuo Chen    Timer timer;
46185147189SShuo Chen    LOG_INFO << argc - optind << " input files, " << kShards << " shards, "
46285147189SShuo Chen             << "output " << g_output <<" , temp " << shard_dir;
46385147189SShuo Chen    int64_t input = 0;
46485147189SShuo Chen    input = shard_(argc, argv);
46585147189SShuo Chen    count_shards(kShards);
46685147189SShuo Chen    int64_t output_size = merge();
46785147189SShuo Chen    LOG_INFO << "All done " << timer.report(input) << " output " << output_size;
46885147189SShuo Chen  }
4690ab2e892SShuo Chen}
470