word_freq_shards_basic.cc revision 4136e585
10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 144136e585SShuo Chen#include "absl/container/flat_hash_map.h" 154136e585SShuo Chen 160ab2e892SShuo Chen#include <algorithm> 170ab2e892SShuo Chen#include <fstream> 180ab2e892SShuo Chen#include <iostream> 190ab2e892SShuo Chen#include <memory> 200ab2e892SShuo Chen#include <string> 210ab2e892SShuo Chen#include <unordered_map> 220ab2e892SShuo Chen#include <vector> 230ab2e892SShuo Chen 240ab2e892SShuo Chen#include <fcntl.h> 250ab2e892SShuo Chen#include <string.h> 260ab2e892SShuo Chen#include <sys/mman.h> 274136e585SShuo Chen#include <sys/stat.h> 280ab2e892SShuo Chen#include <sys/time.h> 294136e585SShuo Chen#include <sys/times.h> 300ab2e892SShuo Chen#include <unistd.h> 310ab2e892SShuo Chen 320ab2e892SShuo Chenusing std::string; 330ab2e892SShuo Chenusing std::string_view; 340ab2e892SShuo Chenusing std::vector; 350ab2e892SShuo Chenusing std::unique_ptr; 360ab2e892SShuo Chen 370ab2e892SShuo Chenint kShards = 10; 380ab2e892SShuo Chen 390ab2e892SShuo Cheninline double now() 400ab2e892SShuo Chen{ 410ab2e892SShuo Chen struct timeval tv = { 0, 0 }; 420ab2e892SShuo Chen gettimeofday(&tv, nullptr); 430ab2e892SShuo Chen return tv.tv_sec + tv.tv_usec / 1000000.0; 440ab2e892SShuo Chen} 450ab2e892SShuo Chen 464136e585SShuo Chenstruct CpuTime 474136e585SShuo Chen{ 484136e585SShuo Chen double userSeconds = 0.0; 494136e585SShuo Chen double systemSeconds = 0.0; 504136e585SShuo Chen 514136e585SShuo Chen double total() const { return userSeconds + systemSeconds; } 524136e585SShuo Chen}; 534136e585SShuo Chen 544136e585SShuo Chenconst int g_clockTicks = static_cast<int>(::sysconf(_SC_CLK_TCK)); 554136e585SShuo Chen 564136e585SShuo ChenCpuTime cpuTime() 574136e585SShuo Chen{ 584136e585SShuo Chen CpuTime t; 594136e585SShuo Chen struct tms tms; 604136e585SShuo Chen if (::times(&tms) >= 0) 614136e585SShuo Chen { 624136e585SShuo Chen const double hz = static_cast<double>(g_clockTicks); 634136e585SShuo Chen t.userSeconds = static_cast<double>(tms.tms_utime) / hz; 644136e585SShuo Chen t.systemSeconds = static_cast<double>(tms.tms_stime) / hz; 654136e585SShuo Chen } 664136e585SShuo Chen return t; 674136e585SShuo Chen} 684136e585SShuo Chen 694136e585SShuo Chenclass Timer 704136e585SShuo Chen{ 714136e585SShuo Chen public: 724136e585SShuo Chen Timer() 734136e585SShuo Chen : start_(now()), 744136e585SShuo Chen start_cpu_(cpuTime()) 754136e585SShuo Chen { 764136e585SShuo Chen } 774136e585SShuo Chen 784136e585SShuo Chen void report(int64_t bytes) const 794136e585SShuo Chen { 804136e585SShuo Chen CpuTime end_cpu(cpuTime()); 814136e585SShuo Chen double end = now(); 824136e585SShuo Chen printf("%.3f real %.3f cpu %.2f MiB/s %ld bytes\n", 834136e585SShuo Chen end - start_, end_cpu.total() - start_cpu_.total(), 844136e585SShuo Chen bytes / (end - start_) / 1024 / 1024, bytes); 854136e585SShuo Chen } 864136e585SShuo Chen private: 874136e585SShuo Chen const double start_ = 0; 884136e585SShuo Chen const CpuTime start_cpu_; 894136e585SShuo Chen}; 904136e585SShuo Chen 910ab2e892SShuo Chenclass OutputFile // : boost::noncopyable 920ab2e892SShuo Chen{ 930ab2e892SShuo Chen public: 940ab2e892SShuo Chen explicit OutputFile(const string& filename) 950ab2e892SShuo Chen : file_(::fopen(filename.c_str(), "w+")) 960ab2e892SShuo Chen { 970ab2e892SShuo Chen assert(file_); 980ab2e892SShuo Chen ::setbuffer(file_, buffer_, sizeof buffer_); 990ab2e892SShuo Chen } 1000ab2e892SShuo Chen 1010ab2e892SShuo Chen ~OutputFile() 1020ab2e892SShuo Chen { 1030ab2e892SShuo Chen close(); 1040ab2e892SShuo Chen } 1050ab2e892SShuo Chen 1060ab2e892SShuo Chen void append(string_view s) 1070ab2e892SShuo Chen { 1080ab2e892SShuo Chen assert(s.size() < 255); 1090ab2e892SShuo Chen uint8_t len = s.size(); 1100ab2e892SShuo Chen ::fwrite(&len, 1, sizeof len, file_); 1110ab2e892SShuo Chen ::fwrite(s.data(), 1, len, file_); 1120ab2e892SShuo Chen ++items_; 1130ab2e892SShuo Chen } 1140ab2e892SShuo Chen 1150ab2e892SShuo Chen /* 1160ab2e892SShuo Chen void append(uint64_t x) 1170ab2e892SShuo Chen { 1180ab2e892SShuo Chen // FIXME: htobe64(x); 1190ab2e892SShuo Chen ::fwrite(&x, 1, sizeof x, file_); 1200ab2e892SShuo Chen } 1210ab2e892SShuo Chen */ 1220ab2e892SShuo Chen 1230ab2e892SShuo Chen void flush() 1240ab2e892SShuo Chen { 1250ab2e892SShuo Chen ::fflush(file_); 1260ab2e892SShuo Chen } 1270ab2e892SShuo Chen 1280ab2e892SShuo Chen void close() 1290ab2e892SShuo Chen { 1300ab2e892SShuo Chen if (file_) 1310ab2e892SShuo Chen ::fclose(file_); 1320ab2e892SShuo Chen file_ = nullptr; 1330ab2e892SShuo Chen } 1340ab2e892SShuo Chen 1350ab2e892SShuo Chen int64_t tell() 1360ab2e892SShuo Chen { 1370ab2e892SShuo Chen return ::ftell(file_); 1380ab2e892SShuo Chen } 1390ab2e892SShuo Chen 1400ab2e892SShuo Chen int fd() 1410ab2e892SShuo Chen { 1420ab2e892SShuo Chen return ::fileno(file_); 1430ab2e892SShuo Chen } 1440ab2e892SShuo Chen 1450ab2e892SShuo Chen size_t items() 1460ab2e892SShuo Chen { 1470ab2e892SShuo Chen return items_; 1480ab2e892SShuo Chen } 1490ab2e892SShuo Chen 1500ab2e892SShuo Chen private: 1510ab2e892SShuo Chen FILE* file_; 1520ab2e892SShuo Chen char buffer_[64 * 1024]; 1530ab2e892SShuo Chen size_t items_ = 0; 1540ab2e892SShuo Chen 1550ab2e892SShuo Chen OutputFile(const OutputFile&) = delete; 1560ab2e892SShuo Chen void operator=(const OutputFile&) = delete; 1570ab2e892SShuo Chen}; 1580ab2e892SShuo Chen 1590ab2e892SShuo Chenclass Sharder // : boost::noncopyable 1600ab2e892SShuo Chen{ 1610ab2e892SShuo Chen public: 1620ab2e892SShuo Chen Sharder() 1630ab2e892SShuo Chen : files_(kShards) 1640ab2e892SShuo Chen { 1650ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 1660ab2e892SShuo Chen { 1670ab2e892SShuo Chen char name[256]; 1680ab2e892SShuo Chen snprintf(name, sizeof name, "shard-%05d-of-%05d", i, kShards); 1690ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 1700ab2e892SShuo Chen } 1710ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 1720ab2e892SShuo Chen } 1730ab2e892SShuo Chen 1740ab2e892SShuo Chen void output(string_view word) 1750ab2e892SShuo Chen { 1760ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 1770ab2e892SShuo Chen files_[shard]->append(word); 1780ab2e892SShuo Chen } 1790ab2e892SShuo Chen 1800ab2e892SShuo Chen void finish() 1810ab2e892SShuo Chen { 1824136e585SShuo Chen int shard = 0; 1834136e585SShuo Chen for (const auto& file : files_) 1840ab2e892SShuo Chen { 1854136e585SShuo Chen printf(" shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items()); 1864136e585SShuo Chen ++shard; 1874136e585SShuo Chen file->close(); 1880ab2e892SShuo Chen } 1890ab2e892SShuo Chen } 1900ab2e892SShuo Chen 1910ab2e892SShuo Chen private: 1920ab2e892SShuo Chen std::hash<string_view> hash; 1930ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 1940ab2e892SShuo Chen}; 1950ab2e892SShuo Chen 1964136e585SShuo Chenint64_t shard_(int argc, char* argv[]) 1970ab2e892SShuo Chen{ 1980ab2e892SShuo Chen Sharder sharder; 1994136e585SShuo Chen Timer timer; 2004136e585SShuo Chen int64_t total = 0; 2010ab2e892SShuo Chen for (int i = 1; i < argc; ++i) 2020ab2e892SShuo Chen { 2030ab2e892SShuo Chen std::cout << " processing input file " << argv[i] << std::endl; 2040ab2e892SShuo Chen double t = now(); 2050ab2e892SShuo Chen char line[1024]; 2060ab2e892SShuo Chen FILE* fp = fopen(argv[i], "r"); 2070ab2e892SShuo Chen char buffer[65536]; 2080ab2e892SShuo Chen ::setbuffer(fp, buffer, sizeof buffer); 2090ab2e892SShuo Chen while (fgets(line, sizeof line, fp)) 2100ab2e892SShuo Chen { 2110ab2e892SShuo Chen size_t len = strlen(line); 2120ab2e892SShuo Chen if (len > 0 && line[len-1] == '\n') 2130ab2e892SShuo Chen line[len-1] = '\0'; 2140ab2e892SShuo Chen sharder.output(line); 2150ab2e892SShuo Chen } 2164136e585SShuo Chen size_t len = ftell(fp); 2170ab2e892SShuo Chen fclose(fp); 2184136e585SShuo Chen total += len; 2190ab2e892SShuo Chen double sec = now() - t; 2204136e585SShuo Chen printf("%.3f sec %.2f MiB/s\n", sec, len / sec / 1024 / 1024); 2210ab2e892SShuo Chen } 2220ab2e892SShuo Chen sharder.finish(); 2234136e585SShuo Chen printf("sharding done "); 2244136e585SShuo Chen timer.report(total); 2254136e585SShuo Chen return total; 2260ab2e892SShuo Chen} 2270ab2e892SShuo Chen 2280ab2e892SShuo Chen// ======= count_shards ======= 2290ab2e892SShuo Chen 2304136e585SShuo Chenint64_t count_shard(int shard, int fd) 2310ab2e892SShuo Chen{ 2320ab2e892SShuo Chen const int64_t len = lseek(fd, 0, SEEK_END); 2330ab2e892SShuo Chen lseek(fd, 0, SEEK_SET); 2340ab2e892SShuo Chen double t = now(); 2350ab2e892SShuo Chen printf("shard %d: file size %ld\n", shard, len); 2364136e585SShuo Chen { 2370ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 2380ab2e892SShuo Chen assert(mapped != MAP_FAILED); 2390ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 2400ab2e892SShuo Chen const uint8_t* const end = start + len; 2410ab2e892SShuo Chen 2424136e585SShuo Chen // std::unordered_map<string_view, uint64_t> items; 2434136e585SShuo Chen absl::flat_hash_map<string_view, uint64_t> items; 2440ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 2450ab2e892SShuo Chen { 2460ab2e892SShuo Chen string_view s((const char*)p+1, *p); 2470ab2e892SShuo Chen items[s]++; 2480ab2e892SShuo Chen p += 1 + *p; 2490ab2e892SShuo Chen } 2500ab2e892SShuo Chen printf(" count %.3f sec %ld items\n", now() - t, items.size()); 2510ab2e892SShuo Chen 2520ab2e892SShuo Chen t = now(); 2530ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 2540ab2e892SShuo Chen for (const auto& it : items) 2550ab2e892SShuo Chen { 2560ab2e892SShuo Chen if (it.second > 1) 2570ab2e892SShuo Chen counts.push_back(make_pair(it.second, it.first)); 2580ab2e892SShuo Chen } 2590ab2e892SShuo Chen printf(" select %.3f sec %ld\n", now() - t, counts.size()); 2600ab2e892SShuo Chen 2610ab2e892SShuo Chen t = now(); 2620ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 2630ab2e892SShuo Chen printf(" sort %.3f sec\n", now() - t); 2640ab2e892SShuo Chen 2650ab2e892SShuo Chen t = now(); 2660ab2e892SShuo Chen { 2674136e585SShuo Chen char buf[256]; 2684136e585SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards); 2694136e585SShuo Chen std::ofstream out(buf); 2704136e585SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 2710ab2e892SShuo Chen { 2724136e585SShuo Chen out << it->first << '\t' << it->second << '\n'; 2734136e585SShuo Chen } 2744136e585SShuo Chen for (const auto& it : items) 2754136e585SShuo Chen { 2764136e585SShuo Chen if (it.second == 1) 2774136e585SShuo Chen { 2784136e585SShuo Chen out << "1\t" << it.first << '\n'; 2794136e585SShuo Chen } 2800ab2e892SShuo Chen } 2810ab2e892SShuo Chen } 2820ab2e892SShuo Chen printf(" output %.3f sec\n", now() - t); 2830ab2e892SShuo Chen 2844136e585SShuo Chen t = now(); 2850ab2e892SShuo Chen if (munmap(mapped, len)) 2860ab2e892SShuo Chen perror("munmap"); 2874136e585SShuo Chen } 2884136e585SShuo Chen printf(" destruct %.3f sec\n", now() - t); 2894136e585SShuo Chen return len; 2900ab2e892SShuo Chen} 2910ab2e892SShuo Chen 2920ab2e892SShuo Chenvoid count_shards() 2930ab2e892SShuo Chen{ 2944136e585SShuo Chen Timer timer; 2954136e585SShuo Chen int64_t total = 0; 2960ab2e892SShuo Chen for (int shard = 0; shard < kShards; ++shard) 2970ab2e892SShuo Chen { 2980ab2e892SShuo Chen char buf[256]; 2990ab2e892SShuo Chen snprintf(buf, sizeof buf, "shard-%05d-of-%05d", shard, kShards); 3000ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 3014136e585SShuo Chen double t = now(); 3024136e585SShuo Chen int64_t len = count_shard(shard, fd); 3030ab2e892SShuo Chen ::close(fd); 3040ab2e892SShuo Chen ::unlink(buf); 3054136e585SShuo Chen total += len; 3064136e585SShuo Chen printf("shard %d: %.2f MiB/s\n", shard, len / (now() - t) / 1024 / 1024); 3070ab2e892SShuo Chen } 3084136e585SShuo Chen printf("count done "); 3094136e585SShuo Chen timer.report(total); 3100ab2e892SShuo Chen} 3110ab2e892SShuo Chen 3120ab2e892SShuo Chen// ======= merge ======= 3130ab2e892SShuo Chen 3140ab2e892SShuo Chenclass Source // copyable 3150ab2e892SShuo Chen{ 3160ab2e892SShuo Chen public: 3170ab2e892SShuo Chen explicit Source(std::istream* in) 3180ab2e892SShuo Chen : in_(in), 3190ab2e892SShuo Chen count_(0), 3200ab2e892SShuo Chen word_() 3210ab2e892SShuo Chen { 3220ab2e892SShuo Chen } 3230ab2e892SShuo Chen 3240ab2e892SShuo Chen bool next() 3250ab2e892SShuo Chen { 3260ab2e892SShuo Chen string line; 3270ab2e892SShuo Chen if (getline(*in_, line)) 3280ab2e892SShuo Chen { 3290ab2e892SShuo Chen size_t tab = line.find('\t'); 3300ab2e892SShuo Chen if (tab != string::npos) 3310ab2e892SShuo Chen { 3320ab2e892SShuo Chen count_ = strtol(line.c_str(), NULL, 10); 3330ab2e892SShuo Chen if (count_ > 0) 3340ab2e892SShuo Chen { 3350ab2e892SShuo Chen word_ = line.substr(tab+1); 3360ab2e892SShuo Chen return true; 3370ab2e892SShuo Chen } 3380ab2e892SShuo Chen } 3390ab2e892SShuo Chen } 3400ab2e892SShuo Chen return false; 3410ab2e892SShuo Chen } 3420ab2e892SShuo Chen 3430ab2e892SShuo Chen bool operator<(const Source& rhs) const 3440ab2e892SShuo Chen { 3450ab2e892SShuo Chen return count_ < rhs.count_; 3460ab2e892SShuo Chen } 3470ab2e892SShuo Chen 3480ab2e892SShuo Chen void outputTo(std::ostream& out) const 3490ab2e892SShuo Chen { 3500ab2e892SShuo Chen out << count_ << '\t' << word_ << '\n'; 3510ab2e892SShuo Chen } 3520ab2e892SShuo Chen 3530ab2e892SShuo Chen private: 3540ab2e892SShuo Chen std::istream* in_; 3550ab2e892SShuo Chen int64_t count_; 3560ab2e892SShuo Chen string word_; 3570ab2e892SShuo Chen}; 3580ab2e892SShuo Chen 3590ab2e892SShuo Chenvoid merge() 3600ab2e892SShuo Chen{ 3614136e585SShuo Chen Timer timer; 3620ab2e892SShuo Chen vector<unique_ptr<std::ifstream>> inputs; 3630ab2e892SShuo Chen vector<Source> keys; 3640ab2e892SShuo Chen 3654136e585SShuo Chen int64_t total = 0; 3660ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 3670ab2e892SShuo Chen { 3680ab2e892SShuo Chen char buf[256]; 3690ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards); 3704136e585SShuo Chen struct stat st; 3714136e585SShuo Chen ::stat(buf, &st); 3724136e585SShuo Chen total += st.st_size; 3730ab2e892SShuo Chen inputs.emplace_back(new std::ifstream(buf)); 3740ab2e892SShuo Chen Source rec(inputs.back().get()); 3750ab2e892SShuo Chen if (rec.next()) 3760ab2e892SShuo Chen { 3770ab2e892SShuo Chen keys.push_back(rec); 3780ab2e892SShuo Chen } 3790ab2e892SShuo Chen ::unlink(buf); 3800ab2e892SShuo Chen } 3810ab2e892SShuo Chen 3824136e585SShuo Chen { 3830ab2e892SShuo Chen std::ofstream out("output"); 3840ab2e892SShuo Chen std::make_heap(keys.begin(), keys.end()); 3850ab2e892SShuo Chen while (!keys.empty()) 3860ab2e892SShuo Chen { 3870ab2e892SShuo Chen std::pop_heap(keys.begin(), keys.end()); 3880ab2e892SShuo Chen keys.back().outputTo(out); 3890ab2e892SShuo Chen 3900ab2e892SShuo Chen if (keys.back().next()) 3910ab2e892SShuo Chen { 3920ab2e892SShuo Chen std::push_heap(keys.begin(), keys.end()); 3930ab2e892SShuo Chen } 3940ab2e892SShuo Chen else 3950ab2e892SShuo Chen { 3960ab2e892SShuo Chen keys.pop_back(); 3970ab2e892SShuo Chen } 3980ab2e892SShuo Chen } 3994136e585SShuo Chen } 4004136e585SShuo Chen printf("merging done "); 4014136e585SShuo Chen timer.report(total); 4020ab2e892SShuo Chen} 4030ab2e892SShuo Chen 4040ab2e892SShuo Chenint main(int argc, char* argv[]) 4050ab2e892SShuo Chen{ 4060ab2e892SShuo Chen /* 4070ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 4080ab2e892SShuo Chen double t = now(); 4094136e585SShuo Chen int64_t len = count_shard(0, fd); 4104136e585SShuo Chen double sec = now() - t; 4114136e585SShuo Chen printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6); 4124136e585SShuo Chen */ 4130ab2e892SShuo Chen 4144136e585SShuo Chen Timer timer; 4154136e585SShuo Chen int64_t total = shard_(argc, argv); 4160ab2e892SShuo Chen count_shards(); 4170ab2e892SShuo Chen merge(); 4184136e585SShuo Chen printf("All done "); 4194136e585SShuo Chen timer.report(total); 4200ab2e892SShuo Chen} 421