word_freq_shards_basic.cc revision 2a129a12
10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 144136e585SShuo Chen#include "absl/container/flat_hash_map.h" 152a129a12SShuo Chen#include "absl/strings/str_format.h" 162a129a12SShuo Chen#include "muduo/base/Logging.h" 174136e585SShuo Chen 180ab2e892SShuo Chen#include <algorithm> 190ab2e892SShuo Chen#include <fstream> 200ab2e892SShuo Chen#include <iostream> 210ab2e892SShuo Chen#include <memory> 220ab2e892SShuo Chen#include <string> 230ab2e892SShuo Chen#include <unordered_map> 240ab2e892SShuo Chen#include <vector> 250ab2e892SShuo Chen 262a129a12SShuo Chen#include <boost/program_options.hpp> 272a129a12SShuo Chen 280ab2e892SShuo Chen#include <fcntl.h> 290ab2e892SShuo Chen#include <string.h> 300ab2e892SShuo Chen#include <sys/mman.h> 314136e585SShuo Chen#include <sys/stat.h> 320ab2e892SShuo Chen#include <sys/time.h> 334136e585SShuo Chen#include <sys/times.h> 340ab2e892SShuo Chen#include <unistd.h> 350ab2e892SShuo Chen 360ab2e892SShuo Chenusing std::string; 370ab2e892SShuo Chenusing std::string_view; 380ab2e892SShuo Chenusing std::vector; 390ab2e892SShuo Chenusing std::unique_ptr; 400ab2e892SShuo Chen 410ab2e892SShuo Chenint kShards = 10; 422a129a12SShuo Chenbool verbose = false, keep = false; 430ab2e892SShuo Chen 440ab2e892SShuo Cheninline double now() 450ab2e892SShuo Chen{ 460ab2e892SShuo Chen struct timeval tv = { 0, 0 }; 470ab2e892SShuo Chen gettimeofday(&tv, nullptr); 480ab2e892SShuo Chen return tv.tv_sec + tv.tv_usec / 1000000.0; 490ab2e892SShuo Chen} 500ab2e892SShuo Chen 514136e585SShuo Chenstruct CpuTime 524136e585SShuo Chen{ 534136e585SShuo Chen double userSeconds = 0.0; 544136e585SShuo Chen double systemSeconds = 0.0; 554136e585SShuo Chen 564136e585SShuo Chen double total() const { return userSeconds + systemSeconds; } 574136e585SShuo Chen}; 584136e585SShuo Chen 594136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK)); 604136e585SShuo Chen 614136e585SShuo ChenCpuTime cpuTime() 624136e585SShuo Chen{ 634136e585SShuo Chen CpuTime t; 644136e585SShuo Chen struct tms tms; 654136e585SShuo Chen if (::times(&tms) >= 0) 664136e585SShuo Chen { 674136e585SShuo Chen const double hz = static_cast<double>(g_clockTicks); 684136e585SShuo Chen t.userSeconds = static_cast<double>(tms.tms_utime) / hz; 694136e585SShuo Chen t.systemSeconds = static_cast<double>(tms.tms_stime) / hz; 704136e585SShuo Chen } 714136e585SShuo Chen return t; 724136e585SShuo Chen} 734136e585SShuo Chen 744136e585SShuo Chenclass Timer 754136e585SShuo Chen{ 764136e585SShuo Chen public: 774136e585SShuo Chen Timer() 784136e585SShuo Chen : start_(now()), 794136e585SShuo Chen start_cpu_(cpuTime()) 804136e585SShuo Chen { 814136e585SShuo Chen } 824136e585SShuo Chen 832a129a12SShuo Chen string report(int64_t bytes) const 844136e585SShuo Chen { 854136e585SShuo Chen CpuTime end_cpu(cpuTime()); 864136e585SShuo Chen double end = now(); 872a129a12SShuo Chen return absl::StrFormat("%.3f real %.3f cpu %.2f MiB/s %ld bytes", 882a129a12SShuo Chen end - start_, end_cpu.total() - start_cpu_.total(), 892a129a12SShuo Chen bytes / (end - start_) / 1024 / 1024, bytes); 904136e585SShuo Chen } 914136e585SShuo Chen private: 924136e585SShuo Chen const double start_ = 0; 934136e585SShuo Chen const CpuTime start_cpu_; 944136e585SShuo Chen}; 954136e585SShuo Chen 960ab2e892SShuo Chenclass OutputFile // : boost::noncopyable 970ab2e892SShuo Chen{ 980ab2e892SShuo Chen public: 990ab2e892SShuo Chen explicit OutputFile(const string& filename) 1000ab2e892SShuo Chen : file_(::fopen(filename.c_str(), "w+")) 1010ab2e892SShuo Chen { 1020ab2e892SShuo Chen assert(file_); 1030ab2e892SShuo Chen ::setbuffer(file_, buffer_, sizeof buffer_); 1040ab2e892SShuo Chen } 1050ab2e892SShuo Chen 1060ab2e892SShuo Chen ~OutputFile() 1070ab2e892SShuo Chen { 1080ab2e892SShuo Chen close(); 1090ab2e892SShuo Chen } 1100ab2e892SShuo Chen 1110ab2e892SShuo Chen void append(string_view s) 1120ab2e892SShuo Chen { 1130ab2e892SShuo Chen assert(s.size() < 255); 1140ab2e892SShuo Chen uint8_t len = s.size(); 1150ab2e892SShuo Chen ::fwrite(&len, 1, sizeof len, file_); 1160ab2e892SShuo Chen ::fwrite(s.data(), 1, len, file_); 1170ab2e892SShuo Chen ++items_; 1180ab2e892SShuo Chen } 1190ab2e892SShuo Chen 1200ab2e892SShuo Chen /* 1210ab2e892SShuo Chen void append(uint64_t x) 1220ab2e892SShuo Chen { 1230ab2e892SShuo Chen // FIXME: htobe64(x); 1240ab2e892SShuo Chen ::fwrite(&x, 1, sizeof x, file_); 1250ab2e892SShuo Chen } 1260ab2e892SShuo Chen */ 1270ab2e892SShuo Chen 1280ab2e892SShuo Chen void flush() 1290ab2e892SShuo Chen { 1300ab2e892SShuo Chen ::fflush(file_); 1310ab2e892SShuo Chen } 1320ab2e892SShuo Chen 1330ab2e892SShuo Chen void close() 1340ab2e892SShuo Chen { 1350ab2e892SShuo Chen if (file_) 1360ab2e892SShuo Chen ::fclose(file_); 1370ab2e892SShuo Chen file_ = nullptr; 1380ab2e892SShuo Chen } 1390ab2e892SShuo Chen 1400ab2e892SShuo Chen int64_t tell() 1410ab2e892SShuo Chen { 1420ab2e892SShuo Chen return ::ftell(file_); 1430ab2e892SShuo Chen } 1440ab2e892SShuo Chen 1450ab2e892SShuo Chen int fd() 1460ab2e892SShuo Chen { 1470ab2e892SShuo Chen return ::fileno(file_); 1480ab2e892SShuo Chen } 1490ab2e892SShuo Chen 1500ab2e892SShuo Chen size_t items() 1510ab2e892SShuo Chen { 1520ab2e892SShuo Chen return items_; 1530ab2e892SShuo Chen } 1540ab2e892SShuo Chen 1550ab2e892SShuo Chen private: 1560ab2e892SShuo Chen FILE* file_; 1570ab2e892SShuo Chen char buffer_[64 * 1024]; 1580ab2e892SShuo Chen size_t items_ = 0; 1590ab2e892SShuo Chen 1600ab2e892SShuo Chen OutputFile(const OutputFile&) = delete; 1610ab2e892SShuo Chen void operator=(const OutputFile&) = delete; 1620ab2e892SShuo Chen}; 1630ab2e892SShuo Chen 1640ab2e892SShuo Chenclass Sharder // : boost::noncopyable 1650ab2e892SShuo Chen{ 1660ab2e892SShuo Chen public: 1670ab2e892SShuo Chen Sharder() 1680ab2e892SShuo Chen : files_(kShards) 1690ab2e892SShuo Chen { 1700ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 1710ab2e892SShuo Chen { 1720ab2e892SShuo Chen char name[256]; 1730ab2e892SShuo Chen snprintf(name, sizeof name, "shard-%05d-of-%05d", i, kShards); 1740ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 1750ab2e892SShuo Chen } 1760ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 1770ab2e892SShuo Chen } 1780ab2e892SShuo Chen 1790ab2e892SShuo Chen void output(string_view word) 1800ab2e892SShuo Chen { 1810ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 1820ab2e892SShuo Chen files_[shard]->append(word); 1830ab2e892SShuo Chen } 1840ab2e892SShuo Chen 1850ab2e892SShuo Chen void finish() 1860ab2e892SShuo Chen { 1874136e585SShuo Chen int shard = 0; 1884136e585SShuo Chen for (const auto& file : files_) 1890ab2e892SShuo Chen { 1902a129a12SShuo Chen // if (verbose) 1914136e585SShuo Chen printf(" shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items()); 1924136e585SShuo Chen ++shard; 1934136e585SShuo Chen file->close(); 1940ab2e892SShuo Chen } 1950ab2e892SShuo Chen } 1960ab2e892SShuo Chen 1970ab2e892SShuo Chen private: 1980ab2e892SShuo Chen std::hash<string_view> hash; 1990ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 2000ab2e892SShuo Chen}; 2010ab2e892SShuo Chen 2024136e585SShuo Chenint64_t shard_(int argc, char* argv[]) 2030ab2e892SShuo Chen{ 2040ab2e892SShuo Chen Sharder sharder; 2054136e585SShuo Chen Timer timer; 2064136e585SShuo Chen int64_t total = 0; 2072a129a12SShuo Chen for (int i = optind; i < argc; ++i) 2080ab2e892SShuo Chen { 2092a129a12SShuo Chen LOG_INFO << "Processing input file " << argv[i]; 2100ab2e892SShuo Chen double t = now(); 2110ab2e892SShuo Chen char line[1024]; 2120ab2e892SShuo Chen FILE* fp = fopen(argv[i], "r"); 2130ab2e892SShuo Chen char buffer[65536]; 2140ab2e892SShuo Chen ::setbuffer(fp, buffer, sizeof buffer); 2150ab2e892SShuo Chen while (fgets(line, sizeof line, fp)) 2160ab2e892SShuo Chen { 2170ab2e892SShuo Chen size_t len = strlen(line); 2180ab2e892SShuo Chen if (len > 0 && line[len-1] == '\n') 2190ab2e892SShuo Chen line[len-1] = '\0'; 2200ab2e892SShuo Chen sharder.output(line); 2210ab2e892SShuo Chen } 2224136e585SShuo Chen size_t len = ftell(fp); 2230ab2e892SShuo Chen fclose(fp); 2244136e585SShuo Chen total += len; 2250ab2e892SShuo Chen double sec = now() - t; 2262a129a12SShuo Chen LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024); 2270ab2e892SShuo Chen } 2280ab2e892SShuo Chen sharder.finish(); 2292a129a12SShuo Chen LOG_INFO << "Sharding done " << timer.report(total); 2304136e585SShuo Chen return total; 2310ab2e892SShuo Chen} 2320ab2e892SShuo Chen 2330ab2e892SShuo Chen// ======= count_shards ======= 2340ab2e892SShuo Chen 2352a129a12SShuo Chenint64_t count_shard(int shard, int fd, const char* output) 2360ab2e892SShuo Chen{ 2370ab2e892SShuo Chen const int64_t len = lseek(fd, 0, SEEK_END); 2380ab2e892SShuo Chen lseek(fd, 0, SEEK_SET); 2390ab2e892SShuo Chen double t = now(); 2402a129a12SShuo Chen LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len); 2414136e585SShuo Chen { 2420ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 2430ab2e892SShuo Chen assert(mapped != MAP_FAILED); 2440ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 2450ab2e892SShuo Chen const uint8_t* const end = start + len; 2460ab2e892SShuo Chen 2474136e585SShuo Chen // std::unordered_map<string_view, uint64_t> items; 2484136e585SShuo Chen absl::flat_hash_map<string_view, uint64_t> items; 2492a129a12SShuo Chen int64_t count = 0; 2500ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 2510ab2e892SShuo Chen { 2520ab2e892SShuo Chen string_view s((const char*)p+1, *p); 2530ab2e892SShuo Chen items[s]++; 2540ab2e892SShuo Chen p += 1 + *p; 2552a129a12SShuo Chen ++count; 2560ab2e892SShuo Chen } 2572a129a12SShuo Chen LOG_INFO << "counting " << count << " unique " << items.size(); 2582a129a12SShuo Chen if (verbose) 2590ab2e892SShuo Chen printf(" count %.3f sec %ld items\n", now() - t, items.size()); 2600ab2e892SShuo Chen 2610ab2e892SShuo Chen t = now(); 2620ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 2630ab2e892SShuo Chen for (const auto& it : items) 2640ab2e892SShuo Chen { 2650ab2e892SShuo Chen if (it.second > 1) 2660ab2e892SShuo Chen counts.push_back(make_pair(it.second, it.first)); 2670ab2e892SShuo Chen } 2682a129a12SShuo Chen if (verbose) 2690ab2e892SShuo Chen printf(" select %.3f sec %ld\n", now() - t, counts.size()); 2700ab2e892SShuo Chen 2710ab2e892SShuo Chen t = now(); 2720ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 2732a129a12SShuo Chen if (verbose) 2740ab2e892SShuo Chen printf(" sort %.3f sec\n", now() - t); 2750ab2e892SShuo Chen 2760ab2e892SShuo Chen t = now(); 2770ab2e892SShuo Chen { 2782a129a12SShuo Chen std::ofstream out(output); 2794136e585SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 2800ab2e892SShuo Chen { 2814136e585SShuo Chen out << it->first << '\t' << it->second << '\n'; 2824136e585SShuo Chen } 2834136e585SShuo Chen for (const auto& it : items) 2844136e585SShuo Chen { 2854136e585SShuo Chen if (it.second == 1) 2864136e585SShuo Chen { 2874136e585SShuo Chen out << "1\t" << it.first << '\n'; 2884136e585SShuo Chen } 2890ab2e892SShuo Chen } 2900ab2e892SShuo Chen } 2912a129a12SShuo Chen //if (verbose) 2922a129a12SShuo Chen //printf(" output %.3f sec %lu\n", now() - t, st.st_size); 2930ab2e892SShuo Chen 2944136e585SShuo Chen t = now(); 2950ab2e892SShuo Chen if (munmap(mapped, len)) 2960ab2e892SShuo Chen perror("munmap"); 2974136e585SShuo Chen } 2982a129a12SShuo Chen // printf(" destruct %.3f sec\n", now() - t); 2994136e585SShuo Chen return len; 3000ab2e892SShuo Chen} 3010ab2e892SShuo Chen 3020ab2e892SShuo Chenvoid count_shards() 3030ab2e892SShuo Chen{ 3044136e585SShuo Chen Timer timer; 3054136e585SShuo Chen int64_t total = 0; 3060ab2e892SShuo Chen for (int shard = 0; shard < kShards; ++shard) 3070ab2e892SShuo Chen { 3082a129a12SShuo Chen Timer timer; 3090ab2e892SShuo Chen char buf[256]; 3100ab2e892SShuo Chen snprintf(buf, sizeof buf, "shard-%05d-of-%05d", shard, kShards); 3110ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 3122a129a12SShuo Chen if (!keep) 3130ab2e892SShuo Chen ::unlink(buf); 3142a129a12SShuo Chen 3152a129a12SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards); 3162a129a12SShuo Chen int64_t len = count_shard(shard, fd, buf); 3172a129a12SShuo Chen ::close(fd); 3184136e585SShuo Chen total += len; 3192a129a12SShuo Chen struct stat st; 3202a129a12SShuo Chen ::stat(buf, &st); 3212a129a12SShuo Chen LOG_INFO << "shard " << shard << " done " << timer.report(len) << " output " << st.st_size; 3220ab2e892SShuo Chen } 3232a129a12SShuo Chen LOG_INFO << "count done "<< timer.report(total); 3240ab2e892SShuo Chen} 3250ab2e892SShuo Chen 3260ab2e892SShuo Chen// ======= merge ======= 3270ab2e892SShuo Chen 3280ab2e892SShuo Chenclass Source // copyable 3290ab2e892SShuo Chen{ 3300ab2e892SShuo Chen public: 3310ab2e892SShuo Chen explicit Source(std::istream* in) 3320ab2e892SShuo Chen : in_(in), 3330ab2e892SShuo Chen count_(0), 3340ab2e892SShuo Chen word_() 3350ab2e892SShuo Chen { 3360ab2e892SShuo Chen } 3370ab2e892SShuo Chen 3380ab2e892SShuo Chen bool next() 3390ab2e892SShuo Chen { 3400ab2e892SShuo Chen string line; 3410ab2e892SShuo Chen if (getline(*in_, line)) 3420ab2e892SShuo Chen { 3430ab2e892SShuo Chen size_t tab = line.find('\t'); 3440ab2e892SShuo Chen if (tab != string::npos) 3450ab2e892SShuo Chen { 3460ab2e892SShuo Chen count_ = strtol(line.c_str(), NULL, 10); 3470ab2e892SShuo Chen if (count_ > 0) 3480ab2e892SShuo Chen { 3490ab2e892SShuo Chen word_ = line.substr(tab+1); 3500ab2e892SShuo Chen return true; 3510ab2e892SShuo Chen } 3520ab2e892SShuo Chen } 3530ab2e892SShuo Chen } 3540ab2e892SShuo Chen return false; 3550ab2e892SShuo Chen } 3560ab2e892SShuo Chen 3570ab2e892SShuo Chen bool operator<(const Source& rhs) const 3580ab2e892SShuo Chen { 3590ab2e892SShuo Chen return count_ < rhs.count_; 3600ab2e892SShuo Chen } 3610ab2e892SShuo Chen 3620ab2e892SShuo Chen void outputTo(std::ostream& out) const 3630ab2e892SShuo Chen { 3640ab2e892SShuo Chen out << count_ << '\t' << word_ << '\n'; 3650ab2e892SShuo Chen } 3660ab2e892SShuo Chen 3670ab2e892SShuo Chen private: 3680ab2e892SShuo Chen std::istream* in_; 3690ab2e892SShuo Chen int64_t count_; 3700ab2e892SShuo Chen string word_; 3710ab2e892SShuo Chen}; 3720ab2e892SShuo Chen 3732a129a12SShuo Chenint64_t merge(const char* output) 3740ab2e892SShuo Chen{ 3754136e585SShuo Chen Timer timer; 3760ab2e892SShuo Chen vector<unique_ptr<std::ifstream>> inputs; 3770ab2e892SShuo Chen vector<Source> keys; 3780ab2e892SShuo Chen 3794136e585SShuo Chen int64_t total = 0; 3800ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 3810ab2e892SShuo Chen { 3820ab2e892SShuo Chen char buf[256]; 3830ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards); 3844136e585SShuo Chen struct stat st; 3854136e585SShuo Chen ::stat(buf, &st); 3864136e585SShuo Chen total += st.st_size; 3870ab2e892SShuo Chen inputs.emplace_back(new std::ifstream(buf)); 3880ab2e892SShuo Chen Source rec(inputs.back().get()); 3890ab2e892SShuo Chen if (rec.next()) 3900ab2e892SShuo Chen { 3910ab2e892SShuo Chen keys.push_back(rec); 3920ab2e892SShuo Chen } 3932a129a12SShuo Chen if (!keep) 3940ab2e892SShuo Chen ::unlink(buf); 3950ab2e892SShuo Chen } 3962a129a12SShuo Chen LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total"; 3970ab2e892SShuo Chen 3984136e585SShuo Chen { 3992a129a12SShuo Chen std::ofstream out(output); 4000ab2e892SShuo Chen std::make_heap(keys.begin(), keys.end()); 4010ab2e892SShuo Chen while (!keys.empty()) 4020ab2e892SShuo Chen { 4030ab2e892SShuo Chen std::pop_heap(keys.begin(), keys.end()); 4040ab2e892SShuo Chen keys.back().outputTo(out); 4050ab2e892SShuo Chen 4060ab2e892SShuo Chen if (keys.back().next()) 4070ab2e892SShuo Chen { 4080ab2e892SShuo Chen std::push_heap(keys.begin(), keys.end()); 4090ab2e892SShuo Chen } 4100ab2e892SShuo Chen else 4110ab2e892SShuo Chen { 4120ab2e892SShuo Chen keys.pop_back(); 4130ab2e892SShuo Chen } 4140ab2e892SShuo Chen } 4154136e585SShuo Chen } 4162a129a12SShuo Chen LOG_INFO << "merging done " << timer.report(total); 4172a129a12SShuo Chen return total; 4180ab2e892SShuo Chen} 4190ab2e892SShuo Chen 4200ab2e892SShuo Chenint main(int argc, char* argv[]) 4210ab2e892SShuo Chen{ 4220ab2e892SShuo Chen /* 4230ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 4240ab2e892SShuo Chen double t = now(); 4254136e585SShuo Chen int64_t len = count_shard(0, fd); 4264136e585SShuo Chen double sec = now() - t; 4274136e585SShuo Chen printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6); 4284136e585SShuo Chen */ 4290ab2e892SShuo Chen 4302a129a12SShuo Chen int opt; 4312a129a12SShuo Chen while ((opt = getopt(argc, argv, "ks:v")) != -1) 4322a129a12SShuo Chen { 4332a129a12SShuo Chen switch (opt) 4342a129a12SShuo Chen { 4352a129a12SShuo Chen case 'k': 4362a129a12SShuo Chen keep = true; 4372a129a12SShuo Chen break; 4382a129a12SShuo Chen case 's': 4392a129a12SShuo Chen kShards = atoi(optarg); 4402a129a12SShuo Chen break; 4412a129a12SShuo Chen case 'v': 4422a129a12SShuo Chen verbose = true; 4432a129a12SShuo Chen break; 4442a129a12SShuo Chen } 4452a129a12SShuo Chen } 4462a129a12SShuo Chen 4474136e585SShuo Chen Timer timer; 4484136e585SShuo Chen int64_t total = shard_(argc, argv); 4490ab2e892SShuo Chen count_shards(); 4502a129a12SShuo Chen int64_t output = merge("/dev/null"); 4512a129a12SShuo Chen LOG_INFO << "All done " << timer.report(total) << " output " << output; 4520ab2e892SShuo Chen} 453