word_freq_shards_basic.cc revision ecd7048b
10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 144136e585SShuo Chen#include "absl/container/flat_hash_map.h" 152a129a12SShuo Chen#include "absl/strings/str_format.h" 162a129a12SShuo Chen#include "muduo/base/Logging.h" 17a251380aSShuo Chen#include "muduo/base/ThreadPool.h" 184136e585SShuo Chen 190ab2e892SShuo Chen#include <algorithm> 20270b6cceSShuo Chen//#include <fstream> 21270b6cceSShuo Chen//#include <iostream> 220ab2e892SShuo Chen#include <memory> 230ab2e892SShuo Chen#include <string> 240ab2e892SShuo Chen#include <unordered_map> 250ab2e892SShuo Chen#include <vector> 260ab2e892SShuo Chen 272a129a12SShuo Chen#include <boost/program_options.hpp> 282a129a12SShuo Chen 290ab2e892SShuo Chen#include <fcntl.h> 300ab2e892SShuo Chen#include <string.h> 310ab2e892SShuo Chen#include <sys/mman.h> 324136e585SShuo Chen#include <sys/stat.h> 330ab2e892SShuo Chen#include <sys/time.h> 344136e585SShuo Chen#include <sys/times.h> 350ab2e892SShuo Chen#include <unistd.h> 360ab2e892SShuo Chen 370ab2e892SShuo Chenusing std::string; 380ab2e892SShuo Chenusing std::string_view; 390ab2e892SShuo Chenusing std::vector; 400ab2e892SShuo Chenusing std::unique_ptr; 410ab2e892SShuo Chen 42a6693141SShuo Chenconst int kBufferSize = 128 * 1024; 43a6693141SShuo Chen 440ab2e892SShuo Chenint kShards = 10; 452a129a12SShuo Chenbool verbose = false, keep = false; 46a6693141SShuo Chenconst char* shard_dir = "."; 470ab2e892SShuo Chen 480ab2e892SShuo Cheninline double now() 490ab2e892SShuo Chen{ 500ab2e892SShuo Chen struct timeval tv = { 0, 0 }; 510ab2e892SShuo Chen gettimeofday(&tv, nullptr); 520ab2e892SShuo Chen return tv.tv_sec + tv.tv_usec / 1000000.0; 530ab2e892SShuo Chen} 540ab2e892SShuo Chen 554136e585SShuo Chenstruct CpuTime 564136e585SShuo Chen{ 574136e585SShuo Chen double userSeconds = 0.0; 584136e585SShuo Chen double systemSeconds = 0.0; 594136e585SShuo Chen 604136e585SShuo Chen double total() const { return userSeconds + systemSeconds; } 614136e585SShuo Chen}; 624136e585SShuo Chen 634136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK)); 644136e585SShuo Chen 654136e585SShuo ChenCpuTime cpuTime() 664136e585SShuo Chen{ 674136e585SShuo Chen CpuTime t; 684136e585SShuo Chen struct tms tms; 694136e585SShuo Chen if (::times(&tms) >= 0) 704136e585SShuo Chen { 714136e585SShuo Chen const double hz = static_cast<double>(g_clockTicks); 724136e585SShuo Chen t.userSeconds = static_cast<double>(tms.tms_utime) / hz; 734136e585SShuo Chen t.systemSeconds = static_cast<double>(tms.tms_stime) / hz; 744136e585SShuo Chen } 754136e585SShuo Chen return t; 764136e585SShuo Chen} 774136e585SShuo Chen 784136e585SShuo Chenclass Timer 794136e585SShuo Chen{ 804136e585SShuo Chen public: 814136e585SShuo Chen Timer() 824136e585SShuo Chen : start_(now()), 834136e585SShuo Chen start_cpu_(cpuTime()) 844136e585SShuo Chen { 854136e585SShuo Chen } 864136e585SShuo Chen 872a129a12SShuo Chen string report(int64_t bytes) const 884136e585SShuo Chen { 894136e585SShuo Chen CpuTime end_cpu(cpuTime()); 904136e585SShuo Chen double end = now(); 91a6693141SShuo Chen return absl::StrFormat("%.2fs real %.2fs cpu %.2f MiB/s %ld bytes", 922a129a12SShuo Chen end - start_, end_cpu.total() - start_cpu_.total(), 932a129a12SShuo Chen bytes / (end - start_) / 1024 / 1024, bytes); 944136e585SShuo Chen } 954136e585SShuo Chen private: 964136e585SShuo Chen const double start_ = 0; 974136e585SShuo Chen const CpuTime start_cpu_; 984136e585SShuo Chen}; 994136e585SShuo Chen 1000ab2e892SShuo Chenclass OutputFile // : boost::noncopyable 1010ab2e892SShuo Chen{ 1020ab2e892SShuo Chen public: 1030ab2e892SShuo Chen explicit OutputFile(const string& filename) 104270b6cceSShuo Chen : file_(::fopen(filename.c_str(), "w")) 1050ab2e892SShuo Chen { 1060ab2e892SShuo Chen assert(file_); 1070ab2e892SShuo Chen ::setbuffer(file_, buffer_, sizeof buffer_); 1080ab2e892SShuo Chen } 1090ab2e892SShuo Chen 1100ab2e892SShuo Chen ~OutputFile() 1110ab2e892SShuo Chen { 1120ab2e892SShuo Chen close(); 1130ab2e892SShuo Chen } 1140ab2e892SShuo Chen 115270b6cceSShuo Chen void write(string_view s) 116270b6cceSShuo Chen { 117270b6cceSShuo Chen ::fwrite(s.data(), 1, s.size(), file_); 118270b6cceSShuo Chen } 119270b6cceSShuo Chen 120270b6cceSShuo Chen void appendRecord(string_view s) 1210ab2e892SShuo Chen { 1220ab2e892SShuo Chen assert(s.size() < 255); 1230ab2e892SShuo Chen uint8_t len = s.size(); 1240ab2e892SShuo Chen ::fwrite(&len, 1, sizeof len, file_); 1250ab2e892SShuo Chen ::fwrite(s.data(), 1, len, file_); 1260ab2e892SShuo Chen ++items_; 1270ab2e892SShuo Chen } 1280ab2e892SShuo Chen 1290ab2e892SShuo Chen void flush() 1300ab2e892SShuo Chen { 1310ab2e892SShuo Chen ::fflush(file_); 1320ab2e892SShuo Chen } 1330ab2e892SShuo Chen 1340ab2e892SShuo Chen void close() 1350ab2e892SShuo Chen { 1360ab2e892SShuo Chen if (file_) 1370ab2e892SShuo Chen ::fclose(file_); 1380ab2e892SShuo Chen file_ = nullptr; 1390ab2e892SShuo Chen } 1400ab2e892SShuo Chen 1410ab2e892SShuo Chen int64_t tell() 1420ab2e892SShuo Chen { 1430ab2e892SShuo Chen return ::ftell(file_); 1440ab2e892SShuo Chen } 1450ab2e892SShuo Chen 1460ab2e892SShuo Chen int fd() 1470ab2e892SShuo Chen { 1480ab2e892SShuo Chen return ::fileno(file_); 1490ab2e892SShuo Chen } 1500ab2e892SShuo Chen 1510ab2e892SShuo Chen size_t items() 1520ab2e892SShuo Chen { 1530ab2e892SShuo Chen return items_; 1540ab2e892SShuo Chen } 1550ab2e892SShuo Chen 1560ab2e892SShuo Chen private: 1570ab2e892SShuo Chen FILE* file_; 158a6693141SShuo Chen char buffer_[kBufferSize]; 1590ab2e892SShuo Chen size_t items_ = 0; 1600ab2e892SShuo Chen 1610ab2e892SShuo Chen OutputFile(const OutputFile&) = delete; 1620ab2e892SShuo Chen void operator=(const OutputFile&) = delete; 1630ab2e892SShuo Chen}; 1640ab2e892SShuo Chen 165270b6cceSShuo Chenclass InputFile 166270b6cceSShuo Chen{ 167270b6cceSShuo Chen public: 168270b6cceSShuo Chen explicit InputFile(const char* filename) 169270b6cceSShuo Chen : file_(::fopen(filename, "r")) 170270b6cceSShuo Chen { 171270b6cceSShuo Chen assert(file_); 172270b6cceSShuo Chen ::setbuffer(file_, buffer_, sizeof buffer_); 173270b6cceSShuo Chen } 174270b6cceSShuo Chen 175270b6cceSShuo Chen ~InputFile() 176270b6cceSShuo Chen { 177270b6cceSShuo Chen close(); 178270b6cceSShuo Chen } 179270b6cceSShuo Chen 180270b6cceSShuo Chen void close() 181270b6cceSShuo Chen { 182270b6cceSShuo Chen if (file_) 183270b6cceSShuo Chen ::fclose(file_); 184270b6cceSShuo Chen file_ = nullptr; 185270b6cceSShuo Chen } 186270b6cceSShuo Chen 187270b6cceSShuo Chen bool getline(string* output) 188270b6cceSShuo Chen { 189270b6cceSShuo Chen char buf[1024] = ""; 190270b6cceSShuo Chen if (fgets(buf, sizeof buf, file_)) 191270b6cceSShuo Chen { 192270b6cceSShuo Chen size_t len = strlen(buf); 193270b6cceSShuo Chen if (len > 0 && buf[len-1] == '\n') 194270b6cceSShuo Chen { 195270b6cceSShuo Chen buf[len-1] = '\0'; 196270b6cceSShuo Chen len--; 197270b6cceSShuo Chen } 198270b6cceSShuo Chen output->assign(buf, len); 199270b6cceSShuo Chen return true; 200270b6cceSShuo Chen } 201270b6cceSShuo Chen return false; 202270b6cceSShuo Chen } 203270b6cceSShuo Chen 204270b6cceSShuo Chen 205270b6cceSShuo Chen private: 206270b6cceSShuo Chen FILE* file_; 207270b6cceSShuo Chen char buffer_[32 * 1024 * 1024]; 208270b6cceSShuo Chen 209270b6cceSShuo Chen InputFile(const InputFile&) = delete; 210270b6cceSShuo Chen void operator=(const InputFile&) = delete; 211270b6cceSShuo Chen}; 212270b6cceSShuo Chen 2130ab2e892SShuo Chenclass Sharder // : boost::noncopyable 2140ab2e892SShuo Chen{ 2150ab2e892SShuo Chen public: 2160ab2e892SShuo Chen Sharder() 2170ab2e892SShuo Chen : files_(kShards) 2180ab2e892SShuo Chen { 2190ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 2200ab2e892SShuo Chen { 2210ab2e892SShuo Chen char name[256]; 222a6693141SShuo Chen snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards); 2230ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 2240ab2e892SShuo Chen } 2250ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 2260ab2e892SShuo Chen } 2270ab2e892SShuo Chen 2280ab2e892SShuo Chen void output(string_view word) 2290ab2e892SShuo Chen { 2300ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 231270b6cceSShuo Chen files_[shard]->appendRecord(word); 2320ab2e892SShuo Chen } 2330ab2e892SShuo Chen 2340ab2e892SShuo Chen void finish() 2350ab2e892SShuo Chen { 2364136e585SShuo Chen int shard = 0; 2374136e585SShuo Chen for (const auto& file : files_) 2380ab2e892SShuo Chen { 2392a129a12SShuo Chen // if (verbose) 2404136e585SShuo Chen printf(" shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items()); 2414136e585SShuo Chen ++shard; 2424136e585SShuo Chen file->close(); 2430ab2e892SShuo Chen } 2440ab2e892SShuo Chen } 2450ab2e892SShuo Chen 2460ab2e892SShuo Chen private: 2470ab2e892SShuo Chen std::hash<string_view> hash; 2480ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 2490ab2e892SShuo Chen}; 2500ab2e892SShuo Chen 2514136e585SShuo Chenint64_t shard_(int argc, char* argv[]) 2520ab2e892SShuo Chen{ 2530ab2e892SShuo Chen Sharder sharder; 2544136e585SShuo Chen Timer timer; 2554136e585SShuo Chen int64_t total = 0; 2562a129a12SShuo Chen for (int i = optind; i < argc; ++i) 2570ab2e892SShuo Chen { 2582a129a12SShuo Chen LOG_INFO << "Processing input file " << argv[i]; 2590ab2e892SShuo Chen double t = now(); 2600ab2e892SShuo Chen char line[1024]; 2610ab2e892SShuo Chen FILE* fp = fopen(argv[i], "r"); 262a6693141SShuo Chen char buffer[kBufferSize]; 2630ab2e892SShuo Chen ::setbuffer(fp, buffer, sizeof buffer); 2640ab2e892SShuo Chen while (fgets(line, sizeof line, fp)) 2650ab2e892SShuo Chen { 2660ab2e892SShuo Chen size_t len = strlen(line); 2670ab2e892SShuo Chen if (len > 0 && line[len-1] == '\n') 268270b6cceSShuo Chen { 2690ab2e892SShuo Chen line[len-1] = '\0'; 270270b6cceSShuo Chen len--; 271270b6cceSShuo Chen } 272270b6cceSShuo Chen sharder.output(string_view(line, len)); 2730ab2e892SShuo Chen } 2744136e585SShuo Chen size_t len = ftell(fp); 2750ab2e892SShuo Chen fclose(fp); 2764136e585SShuo Chen total += len; 2770ab2e892SShuo Chen double sec = now() - t; 2782a129a12SShuo Chen LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024); 2790ab2e892SShuo Chen } 2800ab2e892SShuo Chen sharder.finish(); 2812a129a12SShuo Chen LOG_INFO << "Sharding done " << timer.report(total); 2824136e585SShuo Chen return total; 2830ab2e892SShuo Chen} 2840ab2e892SShuo Chen 2850ab2e892SShuo Chen// ======= count_shards ======= 2860ab2e892SShuo Chen 287ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len) 2880ab2e892SShuo Chen{ 289ecd7048bSShuo Chen Timer timer; 290ecd7048bSShuo Chen 2910ab2e892SShuo Chen double t = now(); 2922a129a12SShuo Chen LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len); 2934136e585SShuo Chen { 2940ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 2950ab2e892SShuo Chen assert(mapped != MAP_FAILED); 2960ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 2970ab2e892SShuo Chen const uint8_t* const end = start + len; 2980ab2e892SShuo Chen 2994136e585SShuo Chen // std::unordered_map<string_view, uint64_t> items; 3004136e585SShuo Chen absl::flat_hash_map<string_view, uint64_t> items; 3012a129a12SShuo Chen int64_t count = 0; 3020ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 3030ab2e892SShuo Chen { 3040ab2e892SShuo Chen string_view s((const char*)p+1, *p); 3050ab2e892SShuo Chen items[s]++; 3060ab2e892SShuo Chen p += 1 + *p; 3072a129a12SShuo Chen ++count; 3080ab2e892SShuo Chen } 309270b6cceSShuo Chen LOG_INFO << "items " << count << " unique " << items.size(); 3102a129a12SShuo Chen if (verbose) 3110ab2e892SShuo Chen printf(" count %.3f sec %ld items\n", now() - t, items.size()); 3120ab2e892SShuo Chen 3130ab2e892SShuo Chen t = now(); 3140ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 3150ab2e892SShuo Chen for (const auto& it : items) 3160ab2e892SShuo Chen { 3170ab2e892SShuo Chen if (it.second > 1) 3180ab2e892SShuo Chen counts.push_back(make_pair(it.second, it.first)); 3190ab2e892SShuo Chen } 3202a129a12SShuo Chen if (verbose) 3210ab2e892SShuo Chen printf(" select %.3f sec %ld\n", now() - t, counts.size()); 3220ab2e892SShuo Chen 3230ab2e892SShuo Chen t = now(); 3240ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 3252a129a12SShuo Chen if (verbose) 3260ab2e892SShuo Chen printf(" sort %.3f sec\n", now() - t); 3270ab2e892SShuo Chen 3280ab2e892SShuo Chen t = now(); 3290ab2e892SShuo Chen { 330ecd7048bSShuo Chen char buf[256]; 331ecd7048bSShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards); 332ecd7048bSShuo Chen OutputFile output(buf); 333ecd7048bSShuo Chen 3344136e585SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 3350ab2e892SShuo Chen { 336270b6cceSShuo Chen string s(it->second); 337ecd7048bSShuo Chen output.write(absl::StrFormat("%d\t%s\n", it->first, s)); // FIXME %s with string_view doesn't work in C++17 338270b6cceSShuo Chen /* 339270b6cceSShuo Chen char buf[1024]; 340270b6cceSShuo Chen snprintf(buf, sizeof buf, "%zd\t%s\n", 341270b6cceSShuo Chen out.write(buf); 342270b6cceSShuo Chen */ 3434136e585SShuo Chen } 344270b6cceSShuo Chen 3454136e585SShuo Chen for (const auto& it : items) 3464136e585SShuo Chen { 3474136e585SShuo Chen if (it.second == 1) 3484136e585SShuo Chen { 349270b6cceSShuo Chen string s(it.first); 350270b6cceSShuo Chen // FIXME: bug of absl? 351270b6cceSShuo Chen // out.write(absl::StrCat("1\t", s, "\n")); 352ecd7048bSShuo Chen output.write(absl::StrFormat("1\t%s\n", s)); 3534136e585SShuo Chen } 3540ab2e892SShuo Chen } 3550ab2e892SShuo Chen } 3562a129a12SShuo Chen //if (verbose) 3572a129a12SShuo Chen //printf(" output %.3f sec %lu\n", now() - t, st.st_size); 3580ab2e892SShuo Chen 3590ab2e892SShuo Chen if (munmap(mapped, len)) 3600ab2e892SShuo Chen perror("munmap"); 3614136e585SShuo Chen } 362ecd7048bSShuo Chen ::close(fd); 363ecd7048bSShuo Chen LOG_INFO << "shard " << shard << " done " << timer.report(len); 3640ab2e892SShuo Chen} 3650ab2e892SShuo Chen 3660ab2e892SShuo Chenvoid count_shards() 3670ab2e892SShuo Chen{ 3684136e585SShuo Chen Timer timer; 3694136e585SShuo Chen int64_t total = 0; 370a251380aSShuo Chen muduo::ThreadPool threadPool; 371a251380aSShuo Chen threadPool.setMaxQueueSize(10); 372ecd7048bSShuo Chen threadPool.start(4); 3730ab2e892SShuo Chen for (int shard = 0; shard < kShards; ++shard) 3740ab2e892SShuo Chen { 3750ab2e892SShuo Chen char buf[256]; 376a6693141SShuo Chen snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards); 3770ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 378ecd7048bSShuo Chen assert(fd >= 0); 3792a129a12SShuo Chen if (!keep) 380ecd7048bSShuo Chen ::unlink(buf); 3812a129a12SShuo Chen 382ecd7048bSShuo Chen struct stat st; 383ecd7048bSShuo Chen if (::fstat(fd, &st) == 0) 384ecd7048bSShuo Chen { 385ecd7048bSShuo Chen size_t len = st.st_size; 386ecd7048bSShuo Chen total += len; 387ecd7048bSShuo Chen threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); }); 388ecd7048bSShuo Chen } 389a251380aSShuo Chen } 390a251380aSShuo Chen while (threadPool.queueSize() > 0) 391a251380aSShuo Chen { 392ecd7048bSShuo Chen LOG_INFO << "Waiting for ThreadPool " << threadPool.queueSize(); 393ecd7048bSShuo Chen muduo::CurrentThread::sleepUsec(1000*1000); 3940ab2e892SShuo Chen } 395a251380aSShuo Chen threadPool.stop(); 396270b6cceSShuo Chen LOG_INFO << "Counting done "<< timer.report(total); 3970ab2e892SShuo Chen} 3980ab2e892SShuo Chen 3990ab2e892SShuo Chen// ======= merge ======= 4000ab2e892SShuo Chen 4010ab2e892SShuo Chenclass Source // copyable 4020ab2e892SShuo Chen{ 4030ab2e892SShuo Chen public: 404270b6cceSShuo Chen explicit Source(InputFile* in) 4050ab2e892SShuo Chen : in_(in), 4060ab2e892SShuo Chen count_(0), 4070ab2e892SShuo Chen word_() 4080ab2e892SShuo Chen { 4090ab2e892SShuo Chen } 4100ab2e892SShuo Chen 4110ab2e892SShuo Chen bool next() 4120ab2e892SShuo Chen { 4130ab2e892SShuo Chen string line; 414270b6cceSShuo Chen if (in_->getline(&line)) 4150ab2e892SShuo Chen { 4160ab2e892SShuo Chen size_t tab = line.find('\t'); 4170ab2e892SShuo Chen if (tab != string::npos) 4180ab2e892SShuo Chen { 4190ab2e892SShuo Chen count_ = strtol(line.c_str(), NULL, 10); 4200ab2e892SShuo Chen if (count_ > 0) 4210ab2e892SShuo Chen { 4220ab2e892SShuo Chen word_ = line.substr(tab+1); 4230ab2e892SShuo Chen return true; 4240ab2e892SShuo Chen } 4250ab2e892SShuo Chen } 4260ab2e892SShuo Chen } 4270ab2e892SShuo Chen return false; 4280ab2e892SShuo Chen } 4290ab2e892SShuo Chen 4300ab2e892SShuo Chen bool operator<(const Source& rhs) const 4310ab2e892SShuo Chen { 4320ab2e892SShuo Chen return count_ < rhs.count_; 4330ab2e892SShuo Chen } 4340ab2e892SShuo Chen 435270b6cceSShuo Chen void outputTo(OutputFile* out) const 4360ab2e892SShuo Chen { 437270b6cceSShuo Chen out->write(absl::StrFormat("%d\t%s\n", count_, word_)); 4380ab2e892SShuo Chen } 4390ab2e892SShuo Chen 4400ab2e892SShuo Chen private: 441270b6cceSShuo Chen InputFile* in_; // not owned 4420ab2e892SShuo Chen int64_t count_; 4430ab2e892SShuo Chen string word_; 4440ab2e892SShuo Chen}; 4450ab2e892SShuo Chen 4462a129a12SShuo Chenint64_t merge(const char* output) 4470ab2e892SShuo Chen{ 4484136e585SShuo Chen Timer timer; 449270b6cceSShuo Chen vector<unique_ptr<InputFile>> inputs; 4500ab2e892SShuo Chen vector<Source> keys; 4510ab2e892SShuo Chen 4524136e585SShuo Chen int64_t total = 0; 4530ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 4540ab2e892SShuo Chen { 4550ab2e892SShuo Chen char buf[256]; 4560ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards); 4574136e585SShuo Chen struct stat st; 458a6693141SShuo Chen if (::stat(buf, &st) == 0) 4590ab2e892SShuo Chen { 460a6693141SShuo Chen total += st.st_size; 461270b6cceSShuo Chen inputs.push_back(std::make_unique<InputFile>(buf)); 462a6693141SShuo Chen Source rec(inputs.back().get()); 463a6693141SShuo Chen if (rec.next()) 464a6693141SShuo Chen { 465a6693141SShuo Chen keys.push_back(rec); 466a6693141SShuo Chen } 467a6693141SShuo Chen if (!keep) 468a6693141SShuo Chen ::unlink(buf); 469a6693141SShuo Chen } 470a6693141SShuo Chen else 471a6693141SShuo Chen { 472a6693141SShuo Chen perror("Unable to stat file:"); 4730ab2e892SShuo Chen } 4740ab2e892SShuo Chen } 4752a129a12SShuo Chen LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total"; 4760ab2e892SShuo Chen 4774136e585SShuo Chen { 478270b6cceSShuo Chen OutputFile out(output); 4790ab2e892SShuo Chen std::make_heap(keys.begin(), keys.end()); 4800ab2e892SShuo Chen while (!keys.empty()) 4810ab2e892SShuo Chen { 4820ab2e892SShuo Chen std::pop_heap(keys.begin(), keys.end()); 483270b6cceSShuo Chen keys.back().outputTo(&out); 4840ab2e892SShuo Chen 4850ab2e892SShuo Chen if (keys.back().next()) 4860ab2e892SShuo Chen { 4870ab2e892SShuo Chen std::push_heap(keys.begin(), keys.end()); 4880ab2e892SShuo Chen } 4890ab2e892SShuo Chen else 4900ab2e892SShuo Chen { 4910ab2e892SShuo Chen keys.pop_back(); 4920ab2e892SShuo Chen } 4930ab2e892SShuo Chen } 4944136e585SShuo Chen } 495a251380aSShuo Chen LOG_INFO << "Merging done " << timer.report(total); 4962a129a12SShuo Chen return total; 4970ab2e892SShuo Chen} 4980ab2e892SShuo Chen 4990ab2e892SShuo Chenint main(int argc, char* argv[]) 5000ab2e892SShuo Chen{ 5010ab2e892SShuo Chen /* 5020ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 5030ab2e892SShuo Chen double t = now(); 5044136e585SShuo Chen int64_t len = count_shard(0, fd); 5054136e585SShuo Chen double sec = now() - t; 5064136e585SShuo Chen printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6); 5074136e585SShuo Chen */ 5080ab2e892SShuo Chen 5092a129a12SShuo Chen int opt; 510a6693141SShuo Chen const char* output = "output"; 511a6693141SShuo Chen while ((opt = getopt(argc, argv, "ko:s:t:v")) != -1) 5122a129a12SShuo Chen { 5132a129a12SShuo Chen switch (opt) 5142a129a12SShuo Chen { 5152a129a12SShuo Chen case 'k': 5162a129a12SShuo Chen keep = true; 5172a129a12SShuo Chen break; 518a6693141SShuo Chen case 'o': 519a6693141SShuo Chen output = optarg; 520a6693141SShuo Chen break; 5212a129a12SShuo Chen case 's': 5222a129a12SShuo Chen kShards = atoi(optarg); 5232a129a12SShuo Chen break; 524a6693141SShuo Chen case 't': 525a6693141SShuo Chen shard_dir = optarg; 526a6693141SShuo Chen break; 5272a129a12SShuo Chen case 'v': 5282a129a12SShuo Chen verbose = true; 5292a129a12SShuo Chen break; 5302a129a12SShuo Chen } 5312a129a12SShuo Chen } 5322a129a12SShuo Chen 5334136e585SShuo Chen Timer timer; 534a251380aSShuo Chen LOG_INFO << argc - optind << " input files, " << kShards << " shards, " 535a251380aSShuo Chen << "output " << output <<" , temp " << shard_dir; 536a6693141SShuo Chen int64_t input = 0; 537a6693141SShuo Chen input = shard_(argc, argv); 5380ab2e892SShuo Chen count_shards(); 539a6693141SShuo Chen int64_t output_size = merge(output); 540a6693141SShuo Chen LOG_INFO << "All done " << timer.report(input) << " output " << output_size; 5410ab2e892SShuo Chen} 542