word_freq_shards_basic.cc revision 270b6cce
10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 144136e585SShuo Chen#include "absl/container/flat_hash_map.h" 152a129a12SShuo Chen#include "absl/strings/str_format.h" 162a129a12SShuo Chen#include "muduo/base/Logging.h" 174136e585SShuo Chen 180ab2e892SShuo Chen#include <algorithm> 19270b6cceSShuo Chen//#include <fstream> 20270b6cceSShuo Chen//#include <iostream> 210ab2e892SShuo Chen#include <memory> 220ab2e892SShuo Chen#include <string> 230ab2e892SShuo Chen#include <unordered_map> 240ab2e892SShuo Chen#include <vector> 250ab2e892SShuo Chen 262a129a12SShuo Chen#include <boost/program_options.hpp> 272a129a12SShuo Chen 280ab2e892SShuo Chen#include <fcntl.h> 290ab2e892SShuo Chen#include <string.h> 300ab2e892SShuo Chen#include <sys/mman.h> 314136e585SShuo Chen#include <sys/stat.h> 320ab2e892SShuo Chen#include <sys/time.h> 334136e585SShuo Chen#include <sys/times.h> 340ab2e892SShuo Chen#include <unistd.h> 350ab2e892SShuo Chen 360ab2e892SShuo Chenusing std::string; 370ab2e892SShuo Chenusing std::string_view; 380ab2e892SShuo Chenusing std::vector; 390ab2e892SShuo Chenusing std::unique_ptr; 400ab2e892SShuo Chen 41a6693141SShuo Chenconst int kBufferSize = 128 * 1024; 42a6693141SShuo Chen 430ab2e892SShuo Chenint kShards = 10; 442a129a12SShuo Chenbool verbose = false, keep = false; 45a6693141SShuo Chenconst char* shard_dir = "."; 460ab2e892SShuo Chen 470ab2e892SShuo Cheninline double now() 480ab2e892SShuo Chen{ 490ab2e892SShuo Chen struct timeval tv = { 0, 0 }; 500ab2e892SShuo Chen gettimeofday(&tv, nullptr); 510ab2e892SShuo Chen return tv.tv_sec + tv.tv_usec / 1000000.0; 520ab2e892SShuo Chen} 530ab2e892SShuo Chen 544136e585SShuo Chenstruct CpuTime 554136e585SShuo Chen{ 564136e585SShuo Chen double userSeconds = 0.0; 574136e585SShuo Chen double systemSeconds = 0.0; 584136e585SShuo Chen 594136e585SShuo Chen double total() const { return userSeconds + systemSeconds; } 604136e585SShuo Chen}; 614136e585SShuo Chen 624136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK)); 634136e585SShuo Chen 644136e585SShuo ChenCpuTime cpuTime() 654136e585SShuo Chen{ 664136e585SShuo Chen CpuTime t; 674136e585SShuo Chen struct tms tms; 684136e585SShuo Chen if (::times(&tms) >= 0) 694136e585SShuo Chen { 704136e585SShuo Chen const double hz = static_cast<double>(g_clockTicks); 714136e585SShuo Chen t.userSeconds = static_cast<double>(tms.tms_utime) / hz; 724136e585SShuo Chen t.systemSeconds = static_cast<double>(tms.tms_stime) / hz; 734136e585SShuo Chen } 744136e585SShuo Chen return t; 754136e585SShuo Chen} 764136e585SShuo Chen 774136e585SShuo Chenclass Timer 784136e585SShuo Chen{ 794136e585SShuo Chen public: 804136e585SShuo Chen Timer() 814136e585SShuo Chen : start_(now()), 824136e585SShuo Chen start_cpu_(cpuTime()) 834136e585SShuo Chen { 844136e585SShuo Chen } 854136e585SShuo Chen 862a129a12SShuo Chen string report(int64_t bytes) const 874136e585SShuo Chen { 884136e585SShuo Chen CpuTime end_cpu(cpuTime()); 894136e585SShuo Chen double end = now(); 90a6693141SShuo Chen return absl::StrFormat("%.2fs real %.2fs cpu %.2f MiB/s %ld bytes", 912a129a12SShuo Chen end - start_, end_cpu.total() - start_cpu_.total(), 922a129a12SShuo Chen bytes / (end - start_) / 1024 / 1024, bytes); 934136e585SShuo Chen } 944136e585SShuo Chen private: 954136e585SShuo Chen const double start_ = 0; 964136e585SShuo Chen const CpuTime start_cpu_; 974136e585SShuo Chen}; 984136e585SShuo Chen 990ab2e892SShuo Chenclass OutputFile // : boost::noncopyable 1000ab2e892SShuo Chen{ 1010ab2e892SShuo Chen public: 1020ab2e892SShuo Chen explicit OutputFile(const string& filename) 103270b6cceSShuo Chen : file_(::fopen(filename.c_str(), "w")) 1040ab2e892SShuo Chen { 1050ab2e892SShuo Chen assert(file_); 1060ab2e892SShuo Chen ::setbuffer(file_, buffer_, sizeof buffer_); 1070ab2e892SShuo Chen } 1080ab2e892SShuo Chen 1090ab2e892SShuo Chen ~OutputFile() 1100ab2e892SShuo Chen { 1110ab2e892SShuo Chen close(); 1120ab2e892SShuo Chen } 1130ab2e892SShuo Chen 114270b6cceSShuo Chen void write(string_view s) 115270b6cceSShuo Chen { 116270b6cceSShuo Chen ::fwrite(s.data(), 1, s.size(), file_); 117270b6cceSShuo Chen } 118270b6cceSShuo Chen 119270b6cceSShuo Chen void appendRecord(string_view s) 1200ab2e892SShuo Chen { 1210ab2e892SShuo Chen assert(s.size() < 255); 1220ab2e892SShuo Chen uint8_t len = s.size(); 1230ab2e892SShuo Chen ::fwrite(&len, 1, sizeof len, file_); 1240ab2e892SShuo Chen ::fwrite(s.data(), 1, len, file_); 1250ab2e892SShuo Chen ++items_; 1260ab2e892SShuo Chen } 1270ab2e892SShuo Chen 1280ab2e892SShuo Chen void flush() 1290ab2e892SShuo Chen { 1300ab2e892SShuo Chen ::fflush(file_); 1310ab2e892SShuo Chen } 1320ab2e892SShuo Chen 1330ab2e892SShuo Chen void close() 1340ab2e892SShuo Chen { 1350ab2e892SShuo Chen if (file_) 1360ab2e892SShuo Chen ::fclose(file_); 1370ab2e892SShuo Chen file_ = nullptr; 1380ab2e892SShuo Chen } 1390ab2e892SShuo Chen 1400ab2e892SShuo Chen int64_t tell() 1410ab2e892SShuo Chen { 1420ab2e892SShuo Chen return ::ftell(file_); 1430ab2e892SShuo Chen } 1440ab2e892SShuo Chen 1450ab2e892SShuo Chen int fd() 1460ab2e892SShuo Chen { 1470ab2e892SShuo Chen return ::fileno(file_); 1480ab2e892SShuo Chen } 1490ab2e892SShuo Chen 1500ab2e892SShuo Chen size_t items() 1510ab2e892SShuo Chen { 1520ab2e892SShuo Chen return items_; 1530ab2e892SShuo Chen } 1540ab2e892SShuo Chen 1550ab2e892SShuo Chen private: 1560ab2e892SShuo Chen FILE* file_; 157a6693141SShuo Chen char buffer_[kBufferSize]; 1580ab2e892SShuo Chen size_t items_ = 0; 1590ab2e892SShuo Chen 1600ab2e892SShuo Chen OutputFile(const OutputFile&) = delete; 1610ab2e892SShuo Chen void operator=(const OutputFile&) = delete; 1620ab2e892SShuo Chen}; 1630ab2e892SShuo Chen 164270b6cceSShuo Chenclass InputFile 165270b6cceSShuo Chen{ 166270b6cceSShuo Chen public: 167270b6cceSShuo Chen explicit InputFile(const char* filename) 168270b6cceSShuo Chen : file_(::fopen(filename, "r")) 169270b6cceSShuo Chen { 170270b6cceSShuo Chen assert(file_); 171270b6cceSShuo Chen ::setbuffer(file_, buffer_, sizeof buffer_); 172270b6cceSShuo Chen } 173270b6cceSShuo Chen 174270b6cceSShuo Chen ~InputFile() 175270b6cceSShuo Chen { 176270b6cceSShuo Chen close(); 177270b6cceSShuo Chen } 178270b6cceSShuo Chen 179270b6cceSShuo Chen void close() 180270b6cceSShuo Chen { 181270b6cceSShuo Chen if (file_) 182270b6cceSShuo Chen ::fclose(file_); 183270b6cceSShuo Chen file_ = nullptr; 184270b6cceSShuo Chen } 185270b6cceSShuo Chen 186270b6cceSShuo Chen bool getline(string* output) 187270b6cceSShuo Chen { 188270b6cceSShuo Chen char buf[1024] = ""; 189270b6cceSShuo Chen if (fgets(buf, sizeof buf, file_)) 190270b6cceSShuo Chen { 191270b6cceSShuo Chen size_t len = strlen(buf); 192270b6cceSShuo Chen if (len > 0 && buf[len-1] == '\n') 193270b6cceSShuo Chen { 194270b6cceSShuo Chen buf[len-1] = '\0'; 195270b6cceSShuo Chen len--; 196270b6cceSShuo Chen } 197270b6cceSShuo Chen output->assign(buf, len); 198270b6cceSShuo Chen return true; 199270b6cceSShuo Chen } 200270b6cceSShuo Chen return false; 201270b6cceSShuo Chen } 202270b6cceSShuo Chen 203270b6cceSShuo Chen 204270b6cceSShuo Chen private: 205270b6cceSShuo Chen FILE* file_; 206270b6cceSShuo Chen char buffer_[32 * 1024 * 1024]; 207270b6cceSShuo Chen 208270b6cceSShuo Chen InputFile(const InputFile&) = delete; 209270b6cceSShuo Chen void operator=(const InputFile&) = delete; 210270b6cceSShuo Chen}; 211270b6cceSShuo Chen 2120ab2e892SShuo Chenclass Sharder // : boost::noncopyable 2130ab2e892SShuo Chen{ 2140ab2e892SShuo Chen public: 2150ab2e892SShuo Chen Sharder() 2160ab2e892SShuo Chen : files_(kShards) 2170ab2e892SShuo Chen { 2180ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 2190ab2e892SShuo Chen { 2200ab2e892SShuo Chen char name[256]; 221a6693141SShuo Chen snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards); 2220ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 2230ab2e892SShuo Chen } 2240ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 2250ab2e892SShuo Chen } 2260ab2e892SShuo Chen 2270ab2e892SShuo Chen void output(string_view word) 2280ab2e892SShuo Chen { 2290ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 230270b6cceSShuo Chen files_[shard]->appendRecord(word); 2310ab2e892SShuo Chen } 2320ab2e892SShuo Chen 2330ab2e892SShuo Chen void finish() 2340ab2e892SShuo Chen { 2354136e585SShuo Chen int shard = 0; 2364136e585SShuo Chen for (const auto& file : files_) 2370ab2e892SShuo Chen { 2382a129a12SShuo Chen // if (verbose) 2394136e585SShuo Chen printf(" shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items()); 2404136e585SShuo Chen ++shard; 2414136e585SShuo Chen file->close(); 2420ab2e892SShuo Chen } 2430ab2e892SShuo Chen } 2440ab2e892SShuo Chen 2450ab2e892SShuo Chen private: 2460ab2e892SShuo Chen std::hash<string_view> hash; 2470ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 2480ab2e892SShuo Chen}; 2490ab2e892SShuo Chen 2504136e585SShuo Chenint64_t shard_(int argc, char* argv[]) 2510ab2e892SShuo Chen{ 2520ab2e892SShuo Chen Sharder sharder; 2534136e585SShuo Chen Timer timer; 2544136e585SShuo Chen int64_t total = 0; 2552a129a12SShuo Chen for (int i = optind; i < argc; ++i) 2560ab2e892SShuo Chen { 2572a129a12SShuo Chen LOG_INFO << "Processing input file " << argv[i]; 2580ab2e892SShuo Chen double t = now(); 2590ab2e892SShuo Chen char line[1024]; 2600ab2e892SShuo Chen FILE* fp = fopen(argv[i], "r"); 261a6693141SShuo Chen char buffer[kBufferSize]; 2620ab2e892SShuo Chen ::setbuffer(fp, buffer, sizeof buffer); 2630ab2e892SShuo Chen while (fgets(line, sizeof line, fp)) 2640ab2e892SShuo Chen { 2650ab2e892SShuo Chen size_t len = strlen(line); 2660ab2e892SShuo Chen if (len > 0 && line[len-1] == '\n') 267270b6cceSShuo Chen { 2680ab2e892SShuo Chen line[len-1] = '\0'; 269270b6cceSShuo Chen len--; 270270b6cceSShuo Chen } 271270b6cceSShuo Chen sharder.output(string_view(line, len)); 2720ab2e892SShuo Chen } 2734136e585SShuo Chen size_t len = ftell(fp); 2740ab2e892SShuo Chen fclose(fp); 2754136e585SShuo Chen total += len; 2760ab2e892SShuo Chen double sec = now() - t; 2772a129a12SShuo Chen LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024); 2780ab2e892SShuo Chen } 2790ab2e892SShuo Chen sharder.finish(); 2802a129a12SShuo Chen LOG_INFO << "Sharding done " << timer.report(total); 2814136e585SShuo Chen return total; 2820ab2e892SShuo Chen} 2830ab2e892SShuo Chen 2840ab2e892SShuo Chen// ======= count_shards ======= 2850ab2e892SShuo Chen 2862a129a12SShuo Chenint64_t count_shard(int shard, int fd, const char* output) 2870ab2e892SShuo Chen{ 2880ab2e892SShuo Chen const int64_t len = lseek(fd, 0, SEEK_END); 2890ab2e892SShuo Chen lseek(fd, 0, SEEK_SET); 2900ab2e892SShuo Chen double t = now(); 2912a129a12SShuo Chen LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len); 2924136e585SShuo Chen { 2930ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 2940ab2e892SShuo Chen assert(mapped != MAP_FAILED); 2950ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 2960ab2e892SShuo Chen const uint8_t* const end = start + len; 2970ab2e892SShuo Chen 2984136e585SShuo Chen // std::unordered_map<string_view, uint64_t> items; 2994136e585SShuo Chen absl::flat_hash_map<string_view, uint64_t> items; 3002a129a12SShuo Chen int64_t count = 0; 3010ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 3020ab2e892SShuo Chen { 3030ab2e892SShuo Chen string_view s((const char*)p+1, *p); 3040ab2e892SShuo Chen items[s]++; 3050ab2e892SShuo Chen p += 1 + *p; 3062a129a12SShuo Chen ++count; 3070ab2e892SShuo Chen } 308270b6cceSShuo Chen LOG_INFO << "items " << count << " unique " << items.size(); 3092a129a12SShuo Chen if (verbose) 3100ab2e892SShuo Chen printf(" count %.3f sec %ld items\n", now() - t, items.size()); 3110ab2e892SShuo Chen 3120ab2e892SShuo Chen t = now(); 3130ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 3140ab2e892SShuo Chen for (const auto& it : items) 3150ab2e892SShuo Chen { 3160ab2e892SShuo Chen if (it.second > 1) 3170ab2e892SShuo Chen counts.push_back(make_pair(it.second, it.first)); 3180ab2e892SShuo Chen } 3192a129a12SShuo Chen if (verbose) 3200ab2e892SShuo Chen printf(" select %.3f sec %ld\n", now() - t, counts.size()); 3210ab2e892SShuo Chen 3220ab2e892SShuo Chen t = now(); 3230ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 3242a129a12SShuo Chen if (verbose) 3250ab2e892SShuo Chen printf(" sort %.3f sec\n", now() - t); 3260ab2e892SShuo Chen 3270ab2e892SShuo Chen t = now(); 3280ab2e892SShuo Chen { 329270b6cceSShuo Chen OutputFile out(output); 3304136e585SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 3310ab2e892SShuo Chen { 332270b6cceSShuo Chen string s(it->second); 333270b6cceSShuo Chen out.write(absl::StrFormat("%d\t%s\n", it->first, s)); // FIXME %s with string_view doesn't work in C++17 334270b6cceSShuo Chen /* 335270b6cceSShuo Chen char buf[1024]; 336270b6cceSShuo Chen snprintf(buf, sizeof buf, "%zd\t%s\n", 337270b6cceSShuo Chen out.write(buf); 338270b6cceSShuo Chen */ 3394136e585SShuo Chen } 340270b6cceSShuo Chen 3414136e585SShuo Chen for (const auto& it : items) 3424136e585SShuo Chen { 3434136e585SShuo Chen if (it.second == 1) 3444136e585SShuo Chen { 345270b6cceSShuo Chen string s(it.first); 346270b6cceSShuo Chen // FIXME: bug of absl? 347270b6cceSShuo Chen // out.write(absl::StrCat("1\t", s, "\n")); 348270b6cceSShuo Chen out.write(absl::StrFormat("1\t%s\n", s)); 349270b6cceSShuo Chen 3504136e585SShuo Chen } 3510ab2e892SShuo Chen } 3520ab2e892SShuo Chen } 3532a129a12SShuo Chen //if (verbose) 3542a129a12SShuo Chen //printf(" output %.3f sec %lu\n", now() - t, st.st_size); 3550ab2e892SShuo Chen 3564136e585SShuo Chen t = now(); 3570ab2e892SShuo Chen if (munmap(mapped, len)) 3580ab2e892SShuo Chen perror("munmap"); 3594136e585SShuo Chen } 3602a129a12SShuo Chen // printf(" destruct %.3f sec\n", now() - t); 3614136e585SShuo Chen return len; 3620ab2e892SShuo Chen} 3630ab2e892SShuo Chen 3640ab2e892SShuo Chenvoid count_shards() 3650ab2e892SShuo Chen{ 3664136e585SShuo Chen Timer timer; 3674136e585SShuo Chen int64_t total = 0; 3680ab2e892SShuo Chen for (int shard = 0; shard < kShards; ++shard) 3690ab2e892SShuo Chen { 3702a129a12SShuo Chen Timer timer; 3710ab2e892SShuo Chen char buf[256]; 372a6693141SShuo Chen snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards); 3730ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 3742a129a12SShuo Chen if (!keep) 3750ab2e892SShuo Chen ::unlink(buf); 3762a129a12SShuo Chen 3772a129a12SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards); 3782a129a12SShuo Chen int64_t len = count_shard(shard, fd, buf); 3792a129a12SShuo Chen ::close(fd); 3804136e585SShuo Chen total += len; 3812a129a12SShuo Chen struct stat st; 3822a129a12SShuo Chen ::stat(buf, &st); 3832a129a12SShuo Chen LOG_INFO << "shard " << shard << " done " << timer.report(len) << " output " << st.st_size; 3840ab2e892SShuo Chen } 385270b6cceSShuo Chen LOG_INFO << "Counting done "<< timer.report(total); 3860ab2e892SShuo Chen} 3870ab2e892SShuo Chen 3880ab2e892SShuo Chen// ======= merge ======= 3890ab2e892SShuo Chen 3900ab2e892SShuo Chenclass Source // copyable 3910ab2e892SShuo Chen{ 3920ab2e892SShuo Chen public: 393270b6cceSShuo Chen explicit Source(InputFile* in) 3940ab2e892SShuo Chen : in_(in), 3950ab2e892SShuo Chen count_(0), 3960ab2e892SShuo Chen word_() 3970ab2e892SShuo Chen { 3980ab2e892SShuo Chen } 3990ab2e892SShuo Chen 4000ab2e892SShuo Chen bool next() 4010ab2e892SShuo Chen { 4020ab2e892SShuo Chen string line; 403270b6cceSShuo Chen if (in_->getline(&line)) 4040ab2e892SShuo Chen { 4050ab2e892SShuo Chen size_t tab = line.find('\t'); 4060ab2e892SShuo Chen if (tab != string::npos) 4070ab2e892SShuo Chen { 4080ab2e892SShuo Chen count_ = strtol(line.c_str(), NULL, 10); 4090ab2e892SShuo Chen if (count_ > 0) 4100ab2e892SShuo Chen { 4110ab2e892SShuo Chen word_ = line.substr(tab+1); 4120ab2e892SShuo Chen return true; 4130ab2e892SShuo Chen } 4140ab2e892SShuo Chen } 4150ab2e892SShuo Chen } 4160ab2e892SShuo Chen return false; 4170ab2e892SShuo Chen } 4180ab2e892SShuo Chen 4190ab2e892SShuo Chen bool operator<(const Source& rhs) const 4200ab2e892SShuo Chen { 4210ab2e892SShuo Chen return count_ < rhs.count_; 4220ab2e892SShuo Chen } 4230ab2e892SShuo Chen 424270b6cceSShuo Chen void outputTo(OutputFile* out) const 4250ab2e892SShuo Chen { 426270b6cceSShuo Chen out->write(absl::StrFormat("%d\t%s\n", count_, word_)); 4270ab2e892SShuo Chen } 4280ab2e892SShuo Chen 4290ab2e892SShuo Chen private: 430270b6cceSShuo Chen InputFile* in_; // not owned 4310ab2e892SShuo Chen int64_t count_; 4320ab2e892SShuo Chen string word_; 4330ab2e892SShuo Chen}; 4340ab2e892SShuo Chen 4352a129a12SShuo Chenint64_t merge(const char* output) 4360ab2e892SShuo Chen{ 4374136e585SShuo Chen Timer timer; 438270b6cceSShuo Chen vector<unique_ptr<InputFile>> inputs; 4390ab2e892SShuo Chen vector<Source> keys; 4400ab2e892SShuo Chen 4414136e585SShuo Chen int64_t total = 0; 4420ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 4430ab2e892SShuo Chen { 4440ab2e892SShuo Chen char buf[256]; 4450ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards); 4464136e585SShuo Chen struct stat st; 447a6693141SShuo Chen if (::stat(buf, &st) == 0) 4480ab2e892SShuo Chen { 449a6693141SShuo Chen total += st.st_size; 450270b6cceSShuo Chen inputs.push_back(std::make_unique<InputFile>(buf)); 451a6693141SShuo Chen Source rec(inputs.back().get()); 452a6693141SShuo Chen if (rec.next()) 453a6693141SShuo Chen { 454a6693141SShuo Chen keys.push_back(rec); 455a6693141SShuo Chen } 456a6693141SShuo Chen if (!keep) 457a6693141SShuo Chen ::unlink(buf); 458a6693141SShuo Chen } 459a6693141SShuo Chen else 460a6693141SShuo Chen { 461a6693141SShuo Chen perror("Unable to stat file:"); 4620ab2e892SShuo Chen } 4630ab2e892SShuo Chen } 4642a129a12SShuo Chen LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total"; 4650ab2e892SShuo Chen 4664136e585SShuo Chen { 467270b6cceSShuo Chen OutputFile out(output); 4680ab2e892SShuo Chen std::make_heap(keys.begin(), keys.end()); 4690ab2e892SShuo Chen while (!keys.empty()) 4700ab2e892SShuo Chen { 4710ab2e892SShuo Chen std::pop_heap(keys.begin(), keys.end()); 472270b6cceSShuo Chen keys.back().outputTo(&out); 4730ab2e892SShuo Chen 4740ab2e892SShuo Chen if (keys.back().next()) 4750ab2e892SShuo Chen { 4760ab2e892SShuo Chen std::push_heap(keys.begin(), keys.end()); 4770ab2e892SShuo Chen } 4780ab2e892SShuo Chen else 4790ab2e892SShuo Chen { 4800ab2e892SShuo Chen keys.pop_back(); 4810ab2e892SShuo Chen } 4820ab2e892SShuo Chen } 4834136e585SShuo Chen } 4842a129a12SShuo Chen LOG_INFO << "merging done " << timer.report(total); 4852a129a12SShuo Chen return total; 4860ab2e892SShuo Chen} 4870ab2e892SShuo Chen 4880ab2e892SShuo Chenint main(int argc, char* argv[]) 4890ab2e892SShuo Chen{ 4900ab2e892SShuo Chen /* 4910ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 4920ab2e892SShuo Chen double t = now(); 4934136e585SShuo Chen int64_t len = count_shard(0, fd); 4944136e585SShuo Chen double sec = now() - t; 4954136e585SShuo Chen printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6); 4964136e585SShuo Chen */ 4970ab2e892SShuo Chen 4982a129a12SShuo Chen int opt; 499a6693141SShuo Chen const char* output = "output"; 500a6693141SShuo Chen while ((opt = getopt(argc, argv, "ko:s:t:v")) != -1) 5012a129a12SShuo Chen { 5022a129a12SShuo Chen switch (opt) 5032a129a12SShuo Chen { 5042a129a12SShuo Chen case 'k': 5052a129a12SShuo Chen keep = true; 5062a129a12SShuo Chen break; 507a6693141SShuo Chen case 'o': 508a6693141SShuo Chen output = optarg; 509a6693141SShuo Chen break; 5102a129a12SShuo Chen case 's': 5112a129a12SShuo Chen kShards = atoi(optarg); 5122a129a12SShuo Chen break; 513a6693141SShuo Chen case 't': 514a6693141SShuo Chen shard_dir = optarg; 515a6693141SShuo Chen break; 5162a129a12SShuo Chen case 'v': 5172a129a12SShuo Chen verbose = true; 5182a129a12SShuo Chen break; 5192a129a12SShuo Chen } 5202a129a12SShuo Chen } 5212a129a12SShuo Chen 5224136e585SShuo Chen Timer timer; 523a6693141SShuo Chen LOG_INFO << argc - optind << " input files, " << kShards << " shards, output " << output <<" , temp " << shard_dir; 524a6693141SShuo Chen int64_t input = 0; 525a6693141SShuo Chen input = shard_(argc, argv); 5260ab2e892SShuo Chen count_shards(); 527a6693141SShuo Chen int64_t output_size = merge(output); 528a6693141SShuo Chen LOG_INFO << "All done " << timer.report(input) << " output " << output_size; 5290ab2e892SShuo Chen} 530