word_freq_shards_basic.cc revision 0ab2e892
10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 140ab2e892SShuo Chen#include <algorithm> 150ab2e892SShuo Chen#include <fstream> 160ab2e892SShuo Chen#include <iostream> 170ab2e892SShuo Chen#include <memory> 180ab2e892SShuo Chen#include <string> 190ab2e892SShuo Chen#include <unordered_map> 200ab2e892SShuo Chen#include <vector> 210ab2e892SShuo Chen 220ab2e892SShuo Chen#include <fcntl.h> 230ab2e892SShuo Chen#include <string.h> 240ab2e892SShuo Chen#include <sys/mman.h> 250ab2e892SShuo Chen#include <sys/time.h> 260ab2e892SShuo Chen#include <unistd.h> 270ab2e892SShuo Chen 280ab2e892SShuo Chenusing std::string; 290ab2e892SShuo Chenusing std::string_view; 300ab2e892SShuo Chenusing std::vector; 310ab2e892SShuo Chenusing std::unique_ptr; 320ab2e892SShuo Chen 330ab2e892SShuo Chenint kShards = 10; 340ab2e892SShuo Chen 350ab2e892SShuo Cheninline double now() 360ab2e892SShuo Chen{ 370ab2e892SShuo Chen struct timeval tv = { 0, 0 }; 380ab2e892SShuo Chen gettimeofday(&tv, nullptr); 390ab2e892SShuo Chen return tv.tv_sec + tv.tv_usec / 1000000.0; 400ab2e892SShuo Chen} 410ab2e892SShuo Chen 420ab2e892SShuo Chenclass OutputFile // : boost::noncopyable 430ab2e892SShuo Chen{ 440ab2e892SShuo Chen public: 450ab2e892SShuo Chen explicit OutputFile(const string& filename) 460ab2e892SShuo Chen : file_(::fopen(filename.c_str(), "w+")) 470ab2e892SShuo Chen { 480ab2e892SShuo Chen assert(file_); 490ab2e892SShuo Chen ::setbuffer(file_, buffer_, sizeof buffer_); 500ab2e892SShuo Chen } 510ab2e892SShuo Chen 520ab2e892SShuo Chen ~OutputFile() 530ab2e892SShuo Chen { 540ab2e892SShuo Chen close(); 550ab2e892SShuo Chen } 560ab2e892SShuo Chen 570ab2e892SShuo Chen void append(string_view s) 580ab2e892SShuo Chen { 590ab2e892SShuo Chen assert(s.size() < 255); 600ab2e892SShuo Chen uint8_t len = s.size(); 610ab2e892SShuo Chen ::fwrite(&len, 1, sizeof len, file_); 620ab2e892SShuo Chen ::fwrite(s.data(), 1, len, file_); 630ab2e892SShuo Chen ++items_; 640ab2e892SShuo Chen } 650ab2e892SShuo Chen 660ab2e892SShuo Chen /* 670ab2e892SShuo Chen void append(uint64_t x) 680ab2e892SShuo Chen { 690ab2e892SShuo Chen // FIXME: htobe64(x); 700ab2e892SShuo Chen ::fwrite(&x, 1, sizeof x, file_); 710ab2e892SShuo Chen } 720ab2e892SShuo Chen */ 730ab2e892SShuo Chen 740ab2e892SShuo Chen void flush() 750ab2e892SShuo Chen { 760ab2e892SShuo Chen ::fflush(file_); 770ab2e892SShuo Chen } 780ab2e892SShuo Chen 790ab2e892SShuo Chen void close() 800ab2e892SShuo Chen { 810ab2e892SShuo Chen if (file_) 820ab2e892SShuo Chen ::fclose(file_); 830ab2e892SShuo Chen file_ = nullptr; 840ab2e892SShuo Chen } 850ab2e892SShuo Chen 860ab2e892SShuo Chen int64_t tell() 870ab2e892SShuo Chen { 880ab2e892SShuo Chen return ::ftell(file_); 890ab2e892SShuo Chen } 900ab2e892SShuo Chen 910ab2e892SShuo Chen int fd() 920ab2e892SShuo Chen { 930ab2e892SShuo Chen return ::fileno(file_); 940ab2e892SShuo Chen } 950ab2e892SShuo Chen 960ab2e892SShuo Chen size_t items() 970ab2e892SShuo Chen { 980ab2e892SShuo Chen return items_; 990ab2e892SShuo Chen } 1000ab2e892SShuo Chen 1010ab2e892SShuo Chen private: 1020ab2e892SShuo Chen FILE* file_; 1030ab2e892SShuo Chen char buffer_[64 * 1024]; 1040ab2e892SShuo Chen size_t items_ = 0; 1050ab2e892SShuo Chen 1060ab2e892SShuo Chen OutputFile(const OutputFile&) = delete; 1070ab2e892SShuo Chen void operator=(const OutputFile&) = delete; 1080ab2e892SShuo Chen}; 1090ab2e892SShuo Chen 1100ab2e892SShuo Chenclass Sharder // : boost::noncopyable 1110ab2e892SShuo Chen{ 1120ab2e892SShuo Chen public: 1130ab2e892SShuo Chen Sharder() 1140ab2e892SShuo Chen : files_(kShards) 1150ab2e892SShuo Chen { 1160ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 1170ab2e892SShuo Chen { 1180ab2e892SShuo Chen char name[256]; 1190ab2e892SShuo Chen snprintf(name, sizeof name, "shard-%05d-of-%05d", i, kShards); 1200ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 1210ab2e892SShuo Chen } 1220ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 1230ab2e892SShuo Chen } 1240ab2e892SShuo Chen 1250ab2e892SShuo Chen void output(string_view word) 1260ab2e892SShuo Chen { 1270ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 1280ab2e892SShuo Chen files_[shard]->append(word); 1290ab2e892SShuo Chen } 1300ab2e892SShuo Chen 1310ab2e892SShuo Chen void finish() 1320ab2e892SShuo Chen { 1330ab2e892SShuo Chen for (int i = 0; i < files_.size(); ++i) 1340ab2e892SShuo Chen { 1350ab2e892SShuo Chen printf("shard %d: %ld bytes, %ld items\n", i, files_[i]->tell(), files_[i]->items()); 1360ab2e892SShuo Chen } 1370ab2e892SShuo Chen } 1380ab2e892SShuo Chen 1390ab2e892SShuo Chen private: 1400ab2e892SShuo Chen std::hash<string_view> hash; 1410ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 1420ab2e892SShuo Chen}; 1430ab2e892SShuo Chen 1440ab2e892SShuo Chenvoid shard(int argc, char* argv[]) 1450ab2e892SShuo Chen{ 1460ab2e892SShuo Chen Sharder sharder; 1470ab2e892SShuo Chen double t = now(); 1480ab2e892SShuo Chen for (int i = 1; i < argc; ++i) 1490ab2e892SShuo Chen { 1500ab2e892SShuo Chen std::cout << " processing input file " << argv[i] << std::endl; 1510ab2e892SShuo Chen double t = now(); 1520ab2e892SShuo Chen char line[1024]; 1530ab2e892SShuo Chen FILE* fp = fopen(argv[i], "r"); 1540ab2e892SShuo Chen char buffer[65536]; 1550ab2e892SShuo Chen ::setbuffer(fp, buffer, sizeof buffer); 1560ab2e892SShuo Chen while (fgets(line, sizeof line, fp)) 1570ab2e892SShuo Chen { 1580ab2e892SShuo Chen size_t len = strlen(line); 1590ab2e892SShuo Chen if (len > 0 && line[len-1] == '\n') 1600ab2e892SShuo Chen line[len-1] = '\0'; 1610ab2e892SShuo Chen sharder.output(line); 1620ab2e892SShuo Chen } 1630ab2e892SShuo Chen size_t total = ftell(fp); 1640ab2e892SShuo Chen fclose(fp); 1650ab2e892SShuo Chen double sec = now() - t; 1660ab2e892SShuo Chen printf("%.3f sec %.2f MB/s\n", sec, total / sec / 1000 / 1000); 1670ab2e892SShuo Chen } 1680ab2e892SShuo Chen sharder.finish(); 1690ab2e892SShuo Chen std::cout << "shuffling done " << now() - t << " sec" << std::endl; 1700ab2e892SShuo Chen} 1710ab2e892SShuo Chen 1720ab2e892SShuo Chen// ======= count_shards ======= 1730ab2e892SShuo Chen 1740ab2e892SShuo Chenvoid count_shard(int shard, int fd) 1750ab2e892SShuo Chen{ 1760ab2e892SShuo Chen const int64_t len = lseek(fd, 0, SEEK_END); 1770ab2e892SShuo Chen lseek(fd, 0, SEEK_SET); 1780ab2e892SShuo Chen double t = now(); 1790ab2e892SShuo Chen printf("shard %d: file size %ld\n", shard, len); 1800ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 1810ab2e892SShuo Chen assert(mapped != MAP_FAILED); 1820ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 1830ab2e892SShuo Chen const uint8_t* const end = start + len; 1840ab2e892SShuo Chen 1850ab2e892SShuo Chen std::unordered_map<string_view, uint64_t> items; 1860ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 1870ab2e892SShuo Chen { 1880ab2e892SShuo Chen string_view s((const char*)p+1, *p); 1890ab2e892SShuo Chen items[s]++; 1900ab2e892SShuo Chen p += 1 + *p; 1910ab2e892SShuo Chen } 1920ab2e892SShuo Chen printf(" count %.3f sec %ld items\n", now() - t, items.size()); 1930ab2e892SShuo Chen 1940ab2e892SShuo Chen t = now(); 1950ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 1960ab2e892SShuo Chen for (const auto& it : items) 1970ab2e892SShuo Chen { 1980ab2e892SShuo Chen if (it.second > 1) 1990ab2e892SShuo Chen counts.push_back(make_pair(it.second, it.first)); 2000ab2e892SShuo Chen } 2010ab2e892SShuo Chen printf(" select %.3f sec %ld\n", now() - t, counts.size()); 2020ab2e892SShuo Chen 2030ab2e892SShuo Chen t = now(); 2040ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 2050ab2e892SShuo Chen printf(" sort %.3f sec\n", now() - t); 2060ab2e892SShuo Chen 2070ab2e892SShuo Chen t = now(); 2080ab2e892SShuo Chen char buf[256]; 2090ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards); 2100ab2e892SShuo Chen std::ofstream out(buf); 2110ab2e892SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 2120ab2e892SShuo Chen { 2130ab2e892SShuo Chen out << it->first << '\t' << it->second << '\n'; 2140ab2e892SShuo Chen } 2150ab2e892SShuo Chen for (const auto& it : items) 2160ab2e892SShuo Chen { 2170ab2e892SShuo Chen if (it.second == 1) 2180ab2e892SShuo Chen { 2190ab2e892SShuo Chen out << "1\t" << it.first << '\n'; 2200ab2e892SShuo Chen } 2210ab2e892SShuo Chen } 2220ab2e892SShuo Chen printf(" output %.3f sec\n", now() - t); 2230ab2e892SShuo Chen 2240ab2e892SShuo Chen if (munmap(mapped, len)) 2250ab2e892SShuo Chen perror("munmap"); 2260ab2e892SShuo Chen} 2270ab2e892SShuo Chen 2280ab2e892SShuo Chenvoid count_shards() 2290ab2e892SShuo Chen{ 2300ab2e892SShuo Chen double t = now(); 2310ab2e892SShuo Chen for (int shard = 0; shard < kShards; ++shard) 2320ab2e892SShuo Chen { 2330ab2e892SShuo Chen char buf[256]; 2340ab2e892SShuo Chen snprintf(buf, sizeof buf, "shard-%05d-of-%05d", shard, kShards); 2350ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 2360ab2e892SShuo Chen count_shard(shard, fd); 2370ab2e892SShuo Chen ::close(fd); 2380ab2e892SShuo Chen ::unlink(buf); 2390ab2e892SShuo Chen } 2400ab2e892SShuo Chen std::cout << "count done " << now() - t << " sec\n"; 2410ab2e892SShuo Chen} 2420ab2e892SShuo Chen 2430ab2e892SShuo Chen// ======= merge ======= 2440ab2e892SShuo Chen 2450ab2e892SShuo Chenclass Source // copyable 2460ab2e892SShuo Chen{ 2470ab2e892SShuo Chen public: 2480ab2e892SShuo Chen explicit Source(std::istream* in) 2490ab2e892SShuo Chen : in_(in), 2500ab2e892SShuo Chen count_(0), 2510ab2e892SShuo Chen word_() 2520ab2e892SShuo Chen { 2530ab2e892SShuo Chen } 2540ab2e892SShuo Chen 2550ab2e892SShuo Chen bool next() 2560ab2e892SShuo Chen { 2570ab2e892SShuo Chen string line; 2580ab2e892SShuo Chen if (getline(*in_, line)) 2590ab2e892SShuo Chen { 2600ab2e892SShuo Chen size_t tab = line.find('\t'); 2610ab2e892SShuo Chen if (tab != string::npos) 2620ab2e892SShuo Chen { 2630ab2e892SShuo Chen count_ = strtol(line.c_str(), NULL, 10); 2640ab2e892SShuo Chen if (count_ > 0) 2650ab2e892SShuo Chen { 2660ab2e892SShuo Chen word_ = line.substr(tab+1); 2670ab2e892SShuo Chen return true; 2680ab2e892SShuo Chen } 2690ab2e892SShuo Chen } 2700ab2e892SShuo Chen } 2710ab2e892SShuo Chen return false; 2720ab2e892SShuo Chen } 2730ab2e892SShuo Chen 2740ab2e892SShuo Chen bool operator<(const Source& rhs) const 2750ab2e892SShuo Chen { 2760ab2e892SShuo Chen return count_ < rhs.count_; 2770ab2e892SShuo Chen } 2780ab2e892SShuo Chen 2790ab2e892SShuo Chen void outputTo(std::ostream& out) const 2800ab2e892SShuo Chen { 2810ab2e892SShuo Chen out << count_ << '\t' << word_ << '\n'; 2820ab2e892SShuo Chen } 2830ab2e892SShuo Chen 2840ab2e892SShuo Chen private: 2850ab2e892SShuo Chen std::istream* in_; 2860ab2e892SShuo Chen int64_t count_; 2870ab2e892SShuo Chen string word_; 2880ab2e892SShuo Chen}; 2890ab2e892SShuo Chen 2900ab2e892SShuo Chenvoid merge() 2910ab2e892SShuo Chen{ 2920ab2e892SShuo Chen vector<unique_ptr<std::ifstream>> inputs; 2930ab2e892SShuo Chen vector<Source> keys; 2940ab2e892SShuo Chen 2950ab2e892SShuo Chen double t = now(); 2960ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 2970ab2e892SShuo Chen { 2980ab2e892SShuo Chen char buf[256]; 2990ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards); 3000ab2e892SShuo Chen inputs.emplace_back(new std::ifstream(buf)); 3010ab2e892SShuo Chen Source rec(inputs.back().get()); 3020ab2e892SShuo Chen if (rec.next()) 3030ab2e892SShuo Chen { 3040ab2e892SShuo Chen keys.push_back(rec); 3050ab2e892SShuo Chen } 3060ab2e892SShuo Chen ::unlink(buf); 3070ab2e892SShuo Chen } 3080ab2e892SShuo Chen 3090ab2e892SShuo Chen std::ofstream out("output"); 3100ab2e892SShuo Chen std::make_heap(keys.begin(), keys.end()); 3110ab2e892SShuo Chen while (!keys.empty()) 3120ab2e892SShuo Chen { 3130ab2e892SShuo Chen std::pop_heap(keys.begin(), keys.end()); 3140ab2e892SShuo Chen keys.back().outputTo(out); 3150ab2e892SShuo Chen 3160ab2e892SShuo Chen if (keys.back().next()) 3170ab2e892SShuo Chen { 3180ab2e892SShuo Chen std::push_heap(keys.begin(), keys.end()); 3190ab2e892SShuo Chen } 3200ab2e892SShuo Chen else 3210ab2e892SShuo Chen { 3220ab2e892SShuo Chen keys.pop_back(); 3230ab2e892SShuo Chen } 3240ab2e892SShuo Chen } 3250ab2e892SShuo Chen std::cout << "merging done " << now() - t << " sec\n"; 3260ab2e892SShuo Chen} 3270ab2e892SShuo Chen 3280ab2e892SShuo Chenint main(int argc, char* argv[]) 3290ab2e892SShuo Chen{ 3300ab2e892SShuo Chen /* 3310ab2e892SShuo Chen kShards = 9; 3320ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 3330ab2e892SShuo Chen double t = now(); 3340ab2e892SShuo Chen sort_shard(0, fd, 1074462684, 100030936); 3350ab2e892SShuo Chen printf("sort_shard %.3f sec\n", now() - t); 3360ab2e892SShuo Chen t = now(); 3370ab2e892SShuo Chen count_shard(1, fd); 3380ab2e892SShuo Chen printf("count_shard %.3f sec\n", now() - t); 3390ab2e892SShuo Chen /*/ 3400ab2e892SShuo Chen 3410ab2e892SShuo Chen shard(argc, argv); 3420ab2e892SShuo Chen count_shards(); 3430ab2e892SShuo Chen merge(); 3440ab2e892SShuo Chen //*/ 3450ab2e892SShuo Chen} 346