word_freq_shards_basic.cc revision 3a0488b5
10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 1485147189SShuo Chen#include "file.h" 1585147189SShuo Chen#include "timer.h" 1685147189SShuo Chen 174136e585SShuo Chen#include "absl/container/flat_hash_map.h" 182cf09315SShuo Chen#include "absl/hash/hash.h" 192a129a12SShuo Chen#include "absl/strings/str_format.h" 202a129a12SShuo Chen#include "muduo/base/Logging.h" 21a251380aSShuo Chen#include "muduo/base/ThreadPool.h" 224136e585SShuo Chen 230ab2e892SShuo Chen#include <algorithm> 240ab2e892SShuo Chen#include <memory> 250ab2e892SShuo Chen#include <string> 260ab2e892SShuo Chen#include <unordered_map> 270ab2e892SShuo Chen#include <vector> 280ab2e892SShuo Chen 290ab2e892SShuo Chen#include <fcntl.h> 300ab2e892SShuo Chen#include <string.h> 310ab2e892SShuo Chen#include <sys/mman.h> 324136e585SShuo Chen#include <sys/stat.h> 330ab2e892SShuo Chen#include <unistd.h> 340ab2e892SShuo Chen 352cf09315SShuo Chenusing absl::string_view; 360ab2e892SShuo Chenusing std::string; 370ab2e892SShuo Chenusing std::vector; 380ab2e892SShuo Chenusing std::unique_ptr; 390ab2e892SShuo Chen 4085147189SShuo Chenint kShards = 10, kThreads = 4; 4185147189SShuo Chenbool g_verbose = false, g_keep = false; 42a6693141SShuo Chenconst char* shard_dir = "."; 4385147189SShuo Chenconst char* g_output = "output"; 44270b6cceSShuo Chen 450ab2e892SShuo Chenclass Sharder // : boost::noncopyable 460ab2e892SShuo Chen{ 470ab2e892SShuo Chen public: 480ab2e892SShuo Chen Sharder() 490ab2e892SShuo Chen : files_(kShards) 500ab2e892SShuo Chen { 510ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 520ab2e892SShuo Chen { 530ab2e892SShuo Chen char name[256]; 54a6693141SShuo Chen snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards); 550ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 560ab2e892SShuo Chen } 570ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 580ab2e892SShuo Chen } 590ab2e892SShuo Chen 600ab2e892SShuo Chen void output(string_view word) 610ab2e892SShuo Chen { 620ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 63270b6cceSShuo Chen files_[shard]->appendRecord(word); 640ab2e892SShuo Chen } 650ab2e892SShuo Chen 660ab2e892SShuo Chen void finish() 670ab2e892SShuo Chen { 684136e585SShuo Chen int shard = 0; 694136e585SShuo Chen for (const auto& file : files_) 700ab2e892SShuo Chen { 7185147189SShuo Chen // if (g_verbose) 724136e585SShuo Chen printf(" shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items()); 734136e585SShuo Chen ++shard; 744136e585SShuo Chen file->close(); 750ab2e892SShuo Chen } 760ab2e892SShuo Chen } 770ab2e892SShuo Chen 780ab2e892SShuo Chen private: 792cf09315SShuo Chen absl::Hash<string_view> hash; 800ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 810ab2e892SShuo Chen}; 820ab2e892SShuo Chen 834136e585SShuo Chenint64_t shard_(int argc, char* argv[]) 840ab2e892SShuo Chen{ 850ab2e892SShuo Chen Sharder sharder; 864136e585SShuo Chen Timer timer; 874136e585SShuo Chen int64_t total = 0; 882a129a12SShuo Chen for (int i = optind; i < argc; ++i) 890ab2e892SShuo Chen { 902a129a12SShuo Chen LOG_INFO << "Processing input file " << argv[i]; 9185147189SShuo Chen double t = Timer::now(); 9285147189SShuo Chen string line; 9385147189SShuo Chen InputFile input(argv[i]); 9485147189SShuo Chen while (input.getline(&line)) 950ab2e892SShuo Chen { 9685147189SShuo Chen sharder.output(line); 970ab2e892SShuo Chen } 9885147189SShuo Chen size_t len = input.tell(); 994136e585SShuo Chen total += len; 10085147189SShuo Chen double sec = Timer::now() - t; 1012a129a12SShuo Chen LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024); 1020ab2e892SShuo Chen } 1030ab2e892SShuo Chen sharder.finish(); 1042a129a12SShuo Chen LOG_INFO << "Sharding done " << timer.report(total); 1054136e585SShuo Chen return total; 1060ab2e892SShuo Chen} 1070ab2e892SShuo Chen 1080ab2e892SShuo Chen// ======= count_shards ======= 1090ab2e892SShuo Chen 110ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len) 1110ab2e892SShuo Chen{ 112ecd7048bSShuo Chen Timer timer; 113ecd7048bSShuo Chen 11485147189SShuo Chen double t = Timer::now(); 1152a129a12SShuo Chen LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len); 1164136e585SShuo Chen { 1170ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 1180ab2e892SShuo Chen assert(mapped != MAP_FAILED); 1190ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 1200ab2e892SShuo Chen const uint8_t* const end = start + len; 1210ab2e892SShuo Chen 1224136e585SShuo Chen // std::unordered_map<string_view, uint64_t> items; 1234136e585SShuo Chen absl::flat_hash_map<string_view, uint64_t> items; 1242a129a12SShuo Chen int64_t count = 0; 1250ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 1260ab2e892SShuo Chen { 1270ab2e892SShuo Chen string_view s((const char*)p+1, *p); 1280ab2e892SShuo Chen items[s]++; 1290ab2e892SShuo Chen p += 1 + *p; 1302a129a12SShuo Chen ++count; 1310ab2e892SShuo Chen } 132270b6cceSShuo Chen LOG_INFO << "items " << count << " unique " << items.size(); 13385147189SShuo Chen if (g_verbose) 13485147189SShuo Chen printf(" count %.3f sec %ld items\n", Timer::now() - t, items.size()); 1350ab2e892SShuo Chen 13685147189SShuo Chen t = Timer::now(); 1370ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 1380ab2e892SShuo Chen for (const auto& it : items) 1390ab2e892SShuo Chen { 1400ab2e892SShuo Chen if (it.second > 1) 1412cf09315SShuo Chen counts.push_back(std::make_pair(it.second, it.first)); 1420ab2e892SShuo Chen } 14385147189SShuo Chen if (g_verbose) 14485147189SShuo Chen printf(" select %.3f sec %ld\n", Timer::now() - t, counts.size()); 1450ab2e892SShuo Chen 14685147189SShuo Chen t = Timer::now(); 1470ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 14885147189SShuo Chen if (g_verbose) 14985147189SShuo Chen printf(" sort %.3f sec\n", Timer::now() - t); 1500ab2e892SShuo Chen 15185147189SShuo Chen t = Timer::now(); 152c377920eSShuo Chen int64_t out_len = 0; 1530ab2e892SShuo Chen { 154ecd7048bSShuo Chen char buf[256]; 155ecd7048bSShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards); 156ecd7048bSShuo Chen OutputFile output(buf); 157ecd7048bSShuo Chen 1584136e585SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 1590ab2e892SShuo Chen { 160c377920eSShuo Chen output.write(absl::StrFormat("%d\t%s\n", it->first, it->second)); 1614136e585SShuo Chen } 162270b6cceSShuo Chen 1634136e585SShuo Chen for (const auto& it : items) 1644136e585SShuo Chen { 1654136e585SShuo Chen if (it.second == 1) 1664136e585SShuo Chen { 167c377920eSShuo Chen output.write(absl::StrFormat("1\t%s\n", it.first)); 1684136e585SShuo Chen } 1690ab2e892SShuo Chen } 170c377920eSShuo Chen out_len = output.tell(); 1710ab2e892SShuo Chen } 172c377920eSShuo Chen if (g_verbose) 173c377920eSShuo Chen printf(" output %.3f sec %lu\n", Timer::now() - t, out_len); 1740ab2e892SShuo Chen 1750ab2e892SShuo Chen if (munmap(mapped, len)) 1760ab2e892SShuo Chen perror("munmap"); 1774136e585SShuo Chen } 178ecd7048bSShuo Chen ::close(fd); 179ecd7048bSShuo Chen LOG_INFO << "shard " << shard << " done " << timer.report(len); 1800ab2e892SShuo Chen} 1810ab2e892SShuo Chen 18285147189SShuo Chenvoid count_shards(int shards) 1830ab2e892SShuo Chen{ 18485147189SShuo Chen assert(shards <= kShards); 1854136e585SShuo Chen Timer timer; 1864136e585SShuo Chen int64_t total = 0; 187a251380aSShuo Chen muduo::ThreadPool threadPool; 18885147189SShuo Chen threadPool.setMaxQueueSize(2*kThreads); 18985147189SShuo Chen threadPool.start(kThreads); 19085147189SShuo Chen 19185147189SShuo Chen for (int shard = 0; shard < shards; ++shard) 1920ab2e892SShuo Chen { 1930ab2e892SShuo Chen char buf[256]; 194a6693141SShuo Chen snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards); 1950ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 196ecd7048bSShuo Chen assert(fd >= 0); 19785147189SShuo Chen if (!g_keep) 198ecd7048bSShuo Chen ::unlink(buf); 1992a129a12SShuo Chen 200ecd7048bSShuo Chen struct stat st; 201ecd7048bSShuo Chen if (::fstat(fd, &st) == 0) 202ecd7048bSShuo Chen { 203ecd7048bSShuo Chen size_t len = st.st_size; 204ecd7048bSShuo Chen total += len; 205ecd7048bSShuo Chen threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); }); 206ecd7048bSShuo Chen } 207a251380aSShuo Chen } 208a251380aSShuo Chen while (threadPool.queueSize() > 0) 209a251380aSShuo Chen { 21085147189SShuo Chen LOG_DEBUG << "waiting for ThreadPool " << threadPool.queueSize(); 211ecd7048bSShuo Chen muduo::CurrentThread::sleepUsec(1000*1000); 2120ab2e892SShuo Chen } 213a251380aSShuo Chen threadPool.stop(); 214270b6cceSShuo Chen LOG_INFO << "Counting done "<< timer.report(total); 2150ab2e892SShuo Chen} 2160ab2e892SShuo Chen 2170ab2e892SShuo Chen// ======= merge ======= 2180ab2e892SShuo Chen 2193a0488b5SShuo Chenclass TextInput 2200ab2e892SShuo Chen{ 2210ab2e892SShuo Chen public: 2223a0488b5SShuo Chen explicit TextInput(const char* filename, int buffer_size = 8 * 1024 * 1024) 2233a0488b5SShuo Chen : fd_(::open(filename, O_RDONLY)), 2243a0488b5SShuo Chen buffer_size_(buffer_size), 2253a0488b5SShuo Chen block_(new Block) 2260ab2e892SShuo Chen { 2273a0488b5SShuo Chen assert(fd_ >= 0); 2283a0488b5SShuo Chen block_->data.reset(new char[buffer_size_]); 2293a0488b5SShuo Chen refill(); 2300ab2e892SShuo Chen } 2310ab2e892SShuo Chen 2323a0488b5SShuo Chen ~TextInput() 2330ab2e892SShuo Chen { 2343a0488b5SShuo Chen ::close(fd_); 2353a0488b5SShuo Chen } 2363a0488b5SShuo Chen 2373a0488b5SShuo Chen absl::string_view line() const { return line_; } 2383a0488b5SShuo Chen 2393a0488b5SShuo Chen bool next(int64_t* count) 2403a0488b5SShuo Chen { 2413a0488b5SShuo Chen // EOF 2423a0488b5SShuo Chen if (block_->records.empty()) 2433a0488b5SShuo Chen { 2443a0488b5SShuo Chen return false; 2453a0488b5SShuo Chen } 2463a0488b5SShuo Chen 2473a0488b5SShuo Chen if (index_ < block_->records.size()) 2483a0488b5SShuo Chen { 2493a0488b5SShuo Chen const Record& rec = block_->records[index_]; 2503a0488b5SShuo Chen *count = rec.count; 2513a0488b5SShuo Chen line_ = absl::string_view(block_->data.get() + rec.offset, rec.len); 2523a0488b5SShuo Chen ++index_; 2533a0488b5SShuo Chen return true; 2543a0488b5SShuo Chen } 2553a0488b5SShuo Chen else 2563a0488b5SShuo Chen { 2573a0488b5SShuo Chen refill(); 2583a0488b5SShuo Chen index_ = 0; 2593a0488b5SShuo Chen return next(count); 2603a0488b5SShuo Chen } 2613a0488b5SShuo Chen } 2623a0488b5SShuo Chen 2633a0488b5SShuo Chen private: 2643a0488b5SShuo Chen 2653a0488b5SShuo Chen struct Record 2663a0488b5SShuo Chen { 2673a0488b5SShuo Chen int64_t count = 0; 2683a0488b5SShuo Chen int32_t offset = 0, len = 0; 2693a0488b5SShuo Chen }; 2703a0488b5SShuo Chen 2713a0488b5SShuo Chen struct Block 2723a0488b5SShuo Chen { 2733a0488b5SShuo Chen std::unique_ptr<char[]> data; 2743a0488b5SShuo Chen std::vector<Record> records; 2753a0488b5SShuo Chen }; 2763a0488b5SShuo Chen 2773a0488b5SShuo Chen void refill() 2783a0488b5SShuo Chen { 2793a0488b5SShuo Chen block_->records.clear(); 2803a0488b5SShuo Chen char* data = block_->data.get(); 2813a0488b5SShuo Chen ssize_t nr = ::pread(fd_, data, buffer_size_, pos_); 2823a0488b5SShuo Chen if (nr > 0) 2830ab2e892SShuo Chen { 2843a0488b5SShuo Chen char* start = data; 2853a0488b5SShuo Chen size_t len = nr; 2863a0488b5SShuo Chen char* nl = static_cast<char*>(::memchr(start, '\n', len)); 2873a0488b5SShuo Chen while (nl) 2880ab2e892SShuo Chen { 2893a0488b5SShuo Chen Record rec; 2903a0488b5SShuo Chen rec.count = strtol(start, NULL, 10); 2913a0488b5SShuo Chen rec.offset = start - data; 2923a0488b5SShuo Chen rec.len = nl - start + 1; 2933a0488b5SShuo Chen block_->records.push_back(rec); 2943a0488b5SShuo Chen start = nl+1; 2953a0488b5SShuo Chen len -= rec.len; 2963a0488b5SShuo Chen nl = static_cast<char*>(::memchr(start, '\n', len)); 2970ab2e892SShuo Chen } 2983a0488b5SShuo Chen pos_ += start - data; 2990ab2e892SShuo Chen } 3000ab2e892SShuo Chen } 3010ab2e892SShuo Chen 3023a0488b5SShuo Chen const int fd_; 3033a0488b5SShuo Chen const int buffer_size_; 3043a0488b5SShuo Chen int64_t pos_ = 0; // file position 3053a0488b5SShuo Chen size_t index_ = 0; // index into block_ 3063a0488b5SShuo Chen std::unique_ptr<Block> block_; 3073a0488b5SShuo Chen absl::string_view line_; 3083a0488b5SShuo Chen 3093a0488b5SShuo Chen TextInput(const TextInput&) = delete; 3103a0488b5SShuo Chen void operator=(const TextInput&) = delete; 3113a0488b5SShuo Chen}; 3123a0488b5SShuo Chen 3133a0488b5SShuo Chenclass Source // copyable 3143a0488b5SShuo Chen{ 3153a0488b5SShuo Chen public: 3163a0488b5SShuo Chen explicit Source(TextInput* in) 3173a0488b5SShuo Chen : input_(in) 3180ab2e892SShuo Chen { 3190ab2e892SShuo Chen } 3200ab2e892SShuo Chen 3213a0488b5SShuo Chen bool next() 3220ab2e892SShuo Chen { 3233a0488b5SShuo Chen return input_->next(&count_); 3240ab2e892SShuo Chen } 3250ab2e892SShuo Chen 3263a0488b5SShuo Chen bool operator<(const Source& rhs) const 32785147189SShuo Chen { 3283a0488b5SShuo Chen return count_ < rhs.count_; 32985147189SShuo Chen } 33085147189SShuo Chen 3313a0488b5SShuo Chen absl::string_view line() const { return input_->line(); } 3323a0488b5SShuo Chen 3330ab2e892SShuo Chen private: 3343a0488b5SShuo Chen TextInput* input_; // not owned 3353a0488b5SShuo Chen int64_t count_ = 0; 3360ab2e892SShuo Chen}; 3370ab2e892SShuo Chen 33885147189SShuo Chenint64_t merge() 3390ab2e892SShuo Chen{ 3404136e585SShuo Chen Timer timer; 3413a0488b5SShuo Chen vector<unique_ptr<TextInput>> inputs; 3420ab2e892SShuo Chen vector<Source> keys; 3430ab2e892SShuo Chen 3444136e585SShuo Chen int64_t total = 0; 3450ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 3460ab2e892SShuo Chen { 3470ab2e892SShuo Chen char buf[256]; 3480ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards); 3494136e585SShuo Chen struct stat st; 350a6693141SShuo Chen if (::stat(buf, &st) == 0) 3510ab2e892SShuo Chen { 352a6693141SShuo Chen total += st.st_size; 35385147189SShuo Chen // TODO: select buffer size based on kShards. 3543a0488b5SShuo Chen inputs.push_back(std::make_unique<TextInput>(buf)); 355a6693141SShuo Chen Source rec(inputs.back().get()); 356a6693141SShuo Chen if (rec.next()) 357a6693141SShuo Chen { 358a6693141SShuo Chen keys.push_back(rec); 359a6693141SShuo Chen } 36085147189SShuo Chen if (!g_keep) 361a6693141SShuo Chen ::unlink(buf); 362a6693141SShuo Chen } 363a6693141SShuo Chen else 364a6693141SShuo Chen { 365a6693141SShuo Chen perror("Unable to stat file:"); 3660ab2e892SShuo Chen } 3670ab2e892SShuo Chen } 3682a129a12SShuo Chen LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total"; 3690ab2e892SShuo Chen 3703a0488b5SShuo Chen int64_t lines = 0; 3714136e585SShuo Chen { 37285147189SShuo Chen OutputFile out(g_output); 37385147189SShuo Chen 3740ab2e892SShuo Chen std::make_heap(keys.begin(), keys.end()); 3750ab2e892SShuo Chen while (!keys.empty()) 3760ab2e892SShuo Chen { 3770ab2e892SShuo Chen std::pop_heap(keys.begin(), keys.end()); 3783a0488b5SShuo Chen out.write(keys.back().line()); 3793a0488b5SShuo Chen ++lines; 3800ab2e892SShuo Chen 3810ab2e892SShuo Chen if (keys.back().next()) 3820ab2e892SShuo Chen { 3830ab2e892SShuo Chen std::push_heap(keys.begin(), keys.end()); 3840ab2e892SShuo Chen } 3850ab2e892SShuo Chen else 3860ab2e892SShuo Chen { 3870ab2e892SShuo Chen keys.pop_back(); 3880ab2e892SShuo Chen } 3890ab2e892SShuo Chen } 3903a0488b5SShuo Chen 3914136e585SShuo Chen } 3923a0488b5SShuo Chen LOG_INFO << "Merging done " << timer.report(total) << " lines " << lines; 3932a129a12SShuo Chen return total; 3940ab2e892SShuo Chen} 3950ab2e892SShuo Chen 3960ab2e892SShuo Chenint main(int argc, char* argv[]) 3970ab2e892SShuo Chen{ 3980ab2e892SShuo Chen /* 3990ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 40085147189SShuo Chen double t = Timer::now(); 4014136e585SShuo Chen int64_t len = count_shard(0, fd); 40285147189SShuo Chen double sec = Timer::now() - t; 4034136e585SShuo Chen printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6); 4044136e585SShuo Chen */ 4053e607da5SShuo Chen setlocale(LC_NUMERIC, ""); 4060ab2e892SShuo Chen 4072a129a12SShuo Chen int opt; 40885147189SShuo Chen int count_only = 0; 40985147189SShuo Chen bool merge_only = false; 41085147189SShuo Chen while ((opt = getopt(argc, argv, "c:kmo:p:s:t:v")) != -1) 4112a129a12SShuo Chen { 4122a129a12SShuo Chen switch (opt) 4132a129a12SShuo Chen { 41485147189SShuo Chen case 'c': 41585147189SShuo Chen count_only = atoi(optarg); 41685147189SShuo Chen break; 4172a129a12SShuo Chen case 'k': 41885147189SShuo Chen g_keep = true; 41985147189SShuo Chen break; 42085147189SShuo Chen case 'm': 42185147189SShuo Chen merge_only = true; 4222a129a12SShuo Chen break; 423a6693141SShuo Chen case 'o': 42485147189SShuo Chen g_output = optarg; 42585147189SShuo Chen break; 42685147189SShuo Chen case 'p': // Path for temp shard files 42785147189SShuo Chen shard_dir = optarg; 428a6693141SShuo Chen break; 4292a129a12SShuo Chen case 's': 4302a129a12SShuo Chen kShards = atoi(optarg); 4312a129a12SShuo Chen break; 432a6693141SShuo Chen case 't': 43385147189SShuo Chen kThreads = atoi(optarg); 434a6693141SShuo Chen break; 4352a129a12SShuo Chen case 'v': 43685147189SShuo Chen g_verbose = true; 4372a129a12SShuo Chen break; 4382a129a12SShuo Chen } 4392a129a12SShuo Chen } 4402a129a12SShuo Chen 44185147189SShuo Chen if (count_only > 0 || merge_only) 44285147189SShuo Chen { 44385147189SShuo Chen g_keep = true; 4443a0488b5SShuo Chen //g_verbose = true; 44585147189SShuo Chen count_only = std::min(count_only, kShards); 44685147189SShuo Chen 44785147189SShuo Chen if (count_only > 0) 44885147189SShuo Chen { 44985147189SShuo Chen count_shards(count_only); 45085147189SShuo Chen } 45185147189SShuo Chen 45285147189SShuo Chen if (merge_only) 45385147189SShuo Chen { 45485147189SShuo Chen merge(); 45585147189SShuo Chen } 45685147189SShuo Chen } 45785147189SShuo Chen else 45885147189SShuo Chen { 45985147189SShuo Chen // Run all three steps 46085147189SShuo Chen Timer timer; 46185147189SShuo Chen LOG_INFO << argc - optind << " input files, " << kShards << " shards, " 46285147189SShuo Chen << "output " << g_output <<" , temp " << shard_dir; 46385147189SShuo Chen int64_t input = 0; 46485147189SShuo Chen input = shard_(argc, argv); 46585147189SShuo Chen count_shards(kShards); 46685147189SShuo Chen int64_t output_size = merge(); 46785147189SShuo Chen LOG_INFO << "All done " << timer.report(input) << " output " << output_size; 46885147189SShuo Chen } 4690ab2e892SShuo Chen} 470