word_freq_shards_basic.cc revision 85147189
10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 1485147189SShuo Chen#include "file.h" 1585147189SShuo Chen#include "timer.h" 1685147189SShuo Chen 174136e585SShuo Chen#include "absl/container/flat_hash_map.h" 182a129a12SShuo Chen#include "absl/strings/str_format.h" 1985147189SShuo Chen#include "muduo/base/BoundedBlockingQueue.h" 202a129a12SShuo Chen#include "muduo/base/Logging.h" 21a251380aSShuo Chen#include "muduo/base/ThreadPool.h" 224136e585SShuo Chen 230ab2e892SShuo Chen#include <algorithm> 240ab2e892SShuo Chen#include <memory> 250ab2e892SShuo Chen#include <string> 260ab2e892SShuo Chen#include <unordered_map> 270ab2e892SShuo Chen#include <vector> 280ab2e892SShuo Chen 292a129a12SShuo Chen#include <boost/program_options.hpp> 302a129a12SShuo Chen 310ab2e892SShuo Chen#include <fcntl.h> 320ab2e892SShuo Chen#include <string.h> 330ab2e892SShuo Chen#include <sys/mman.h> 344136e585SShuo Chen#include <sys/stat.h> 350ab2e892SShuo Chen#include <unistd.h> 360ab2e892SShuo Chen 370ab2e892SShuo Chenusing std::string; 380ab2e892SShuo Chenusing std::string_view; 390ab2e892SShuo Chenusing std::vector; 400ab2e892SShuo Chenusing std::unique_ptr; 410ab2e892SShuo Chen 4285147189SShuo Chenint kShards = 10, kThreads = 4; 4385147189SShuo Chenbool g_verbose = false, g_keep = false; 44a6693141SShuo Chenconst char* shard_dir = "."; 4585147189SShuo Chenconst char* g_output = "output"; 46270b6cceSShuo Chen 470ab2e892SShuo Chenclass Sharder // : boost::noncopyable 480ab2e892SShuo Chen{ 490ab2e892SShuo Chen public: 500ab2e892SShuo Chen Sharder() 510ab2e892SShuo Chen : files_(kShards) 520ab2e892SShuo Chen { 530ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 540ab2e892SShuo Chen { 550ab2e892SShuo Chen char name[256]; 56a6693141SShuo Chen snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards); 570ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 580ab2e892SShuo Chen } 590ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 600ab2e892SShuo Chen } 610ab2e892SShuo Chen 620ab2e892SShuo Chen void output(string_view word) 630ab2e892SShuo Chen { 640ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 65270b6cceSShuo Chen files_[shard]->appendRecord(word); 660ab2e892SShuo Chen } 670ab2e892SShuo Chen 680ab2e892SShuo Chen void finish() 690ab2e892SShuo Chen { 704136e585SShuo Chen int shard = 0; 714136e585SShuo Chen for (const auto& file : files_) 720ab2e892SShuo Chen { 7385147189SShuo Chen // if (g_verbose) 744136e585SShuo Chen printf(" shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items()); 754136e585SShuo Chen ++shard; 764136e585SShuo Chen file->close(); 770ab2e892SShuo Chen } 780ab2e892SShuo Chen } 790ab2e892SShuo Chen 800ab2e892SShuo Chen private: 810ab2e892SShuo Chen std::hash<string_view> hash; 820ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 830ab2e892SShuo Chen}; 840ab2e892SShuo Chen 854136e585SShuo Chenint64_t shard_(int argc, char* argv[]) 860ab2e892SShuo Chen{ 870ab2e892SShuo Chen Sharder sharder; 884136e585SShuo Chen Timer timer; 894136e585SShuo Chen int64_t total = 0; 902a129a12SShuo Chen for (int i = optind; i < argc; ++i) 910ab2e892SShuo Chen { 922a129a12SShuo Chen LOG_INFO << "Processing input file " << argv[i]; 9385147189SShuo Chen double t = Timer::now(); 9485147189SShuo Chen string line; 9585147189SShuo Chen InputFile input(argv[i]); 9685147189SShuo Chen while (input.getline(&line)) 970ab2e892SShuo Chen { 9885147189SShuo Chen sharder.output(line); 990ab2e892SShuo Chen } 10085147189SShuo Chen size_t len = input.tell(); 1014136e585SShuo Chen total += len; 10285147189SShuo Chen double sec = Timer::now() - t; 1032a129a12SShuo Chen LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024); 1040ab2e892SShuo Chen } 1050ab2e892SShuo Chen sharder.finish(); 1062a129a12SShuo Chen LOG_INFO << "Sharding done " << timer.report(total); 1074136e585SShuo Chen return total; 1080ab2e892SShuo Chen} 1090ab2e892SShuo Chen 1100ab2e892SShuo Chen// ======= count_shards ======= 1110ab2e892SShuo Chen 112ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len) 1130ab2e892SShuo Chen{ 114ecd7048bSShuo Chen Timer timer; 115ecd7048bSShuo Chen 11685147189SShuo Chen double t = Timer::now(); 1172a129a12SShuo Chen LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len); 1184136e585SShuo Chen { 1190ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 1200ab2e892SShuo Chen assert(mapped != MAP_FAILED); 1210ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 1220ab2e892SShuo Chen const uint8_t* const end = start + len; 1230ab2e892SShuo Chen 1244136e585SShuo Chen // std::unordered_map<string_view, uint64_t> items; 1254136e585SShuo Chen absl::flat_hash_map<string_view, uint64_t> items; 1262a129a12SShuo Chen int64_t count = 0; 1270ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 1280ab2e892SShuo Chen { 1290ab2e892SShuo Chen string_view s((const char*)p+1, *p); 1300ab2e892SShuo Chen items[s]++; 1310ab2e892SShuo Chen p += 1 + *p; 1322a129a12SShuo Chen ++count; 1330ab2e892SShuo Chen } 134270b6cceSShuo Chen LOG_INFO << "items " << count << " unique " << items.size(); 13585147189SShuo Chen if (g_verbose) 13685147189SShuo Chen printf(" count %.3f sec %ld items\n", Timer::now() - t, items.size()); 1370ab2e892SShuo Chen 13885147189SShuo Chen t = Timer::now(); 1390ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 1400ab2e892SShuo Chen for (const auto& it : items) 1410ab2e892SShuo Chen { 1420ab2e892SShuo Chen if (it.second > 1) 1430ab2e892SShuo Chen counts.push_back(make_pair(it.second, it.first)); 1440ab2e892SShuo Chen } 14585147189SShuo Chen if (g_verbose) 14685147189SShuo Chen printf(" select %.3f sec %ld\n", Timer::now() - t, counts.size()); 1470ab2e892SShuo Chen 14885147189SShuo Chen t = Timer::now(); 1490ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 15085147189SShuo Chen if (g_verbose) 15185147189SShuo Chen printf(" sort %.3f sec\n", Timer::now() - t); 1520ab2e892SShuo Chen 15385147189SShuo Chen t = Timer::now(); 1540ab2e892SShuo Chen { 155ecd7048bSShuo Chen char buf[256]; 156ecd7048bSShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", shard, kShards); 157ecd7048bSShuo Chen OutputFile output(buf); 158ecd7048bSShuo Chen 1594136e585SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 1600ab2e892SShuo Chen { 161270b6cceSShuo Chen string s(it->second); 162ecd7048bSShuo Chen output.write(absl::StrFormat("%d\t%s\n", it->first, s)); // FIXME %s with string_view doesn't work in C++17 163270b6cceSShuo Chen /* 164270b6cceSShuo Chen char buf[1024]; 165270b6cceSShuo Chen snprintf(buf, sizeof buf, "%zd\t%s\n", 166270b6cceSShuo Chen out.write(buf); 167270b6cceSShuo Chen */ 1684136e585SShuo Chen } 169270b6cceSShuo Chen 1704136e585SShuo Chen for (const auto& it : items) 1714136e585SShuo Chen { 1724136e585SShuo Chen if (it.second == 1) 1734136e585SShuo Chen { 174270b6cceSShuo Chen string s(it.first); 175270b6cceSShuo Chen // FIXME: bug of absl? 176270b6cceSShuo Chen // out.write(absl::StrCat("1\t", s, "\n")); 177ecd7048bSShuo Chen output.write(absl::StrFormat("1\t%s\n", s)); 1784136e585SShuo Chen } 1790ab2e892SShuo Chen } 1800ab2e892SShuo Chen } 18185147189SShuo Chen //if (g_verbose) 18285147189SShuo Chen //printf(" output %.3f sec %lu\n", Timer::now() - t, st.st_size); 1830ab2e892SShuo Chen 1840ab2e892SShuo Chen if (munmap(mapped, len)) 1850ab2e892SShuo Chen perror("munmap"); 1864136e585SShuo Chen } 187ecd7048bSShuo Chen ::close(fd); 188ecd7048bSShuo Chen LOG_INFO << "shard " << shard << " done " << timer.report(len); 1890ab2e892SShuo Chen} 1900ab2e892SShuo Chen 19185147189SShuo Chenvoid count_shards(int shards) 1920ab2e892SShuo Chen{ 19385147189SShuo Chen assert(shards <= kShards); 1944136e585SShuo Chen Timer timer; 1954136e585SShuo Chen int64_t total = 0; 196a251380aSShuo Chen muduo::ThreadPool threadPool; 19785147189SShuo Chen threadPool.setMaxQueueSize(2*kThreads); 19885147189SShuo Chen threadPool.start(kThreads); 19985147189SShuo Chen 20085147189SShuo Chen for (int shard = 0; shard < shards; ++shard) 2010ab2e892SShuo Chen { 2020ab2e892SShuo Chen char buf[256]; 203a6693141SShuo Chen snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards); 2040ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 205ecd7048bSShuo Chen assert(fd >= 0); 20685147189SShuo Chen if (!g_keep) 207ecd7048bSShuo Chen ::unlink(buf); 2082a129a12SShuo Chen 209ecd7048bSShuo Chen struct stat st; 210ecd7048bSShuo Chen if (::fstat(fd, &st) == 0) 211ecd7048bSShuo Chen { 212ecd7048bSShuo Chen size_t len = st.st_size; 213ecd7048bSShuo Chen total += len; 214ecd7048bSShuo Chen threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); }); 215ecd7048bSShuo Chen } 216a251380aSShuo Chen } 217a251380aSShuo Chen while (threadPool.queueSize() > 0) 218a251380aSShuo Chen { 21985147189SShuo Chen LOG_DEBUG << "waiting for ThreadPool " << threadPool.queueSize(); 220ecd7048bSShuo Chen muduo::CurrentThread::sleepUsec(1000*1000); 2210ab2e892SShuo Chen } 222a251380aSShuo Chen threadPool.stop(); 223270b6cceSShuo Chen LOG_INFO << "Counting done "<< timer.report(total); 2240ab2e892SShuo Chen} 2250ab2e892SShuo Chen 2260ab2e892SShuo Chen// ======= merge ======= 2270ab2e892SShuo Chen 2280ab2e892SShuo Chenclass Source // copyable 2290ab2e892SShuo Chen{ 2300ab2e892SShuo Chen public: 231270b6cceSShuo Chen explicit Source(InputFile* in) 2320ab2e892SShuo Chen : in_(in), 2330ab2e892SShuo Chen count_(0), 2340ab2e892SShuo Chen word_() 2350ab2e892SShuo Chen { 2360ab2e892SShuo Chen } 2370ab2e892SShuo Chen 2380ab2e892SShuo Chen bool next() 2390ab2e892SShuo Chen { 2400ab2e892SShuo Chen string line; 241270b6cceSShuo Chen if (in_->getline(&line)) 2420ab2e892SShuo Chen { 2430ab2e892SShuo Chen size_t tab = line.find('\t'); 2440ab2e892SShuo Chen if (tab != string::npos) 2450ab2e892SShuo Chen { 2460ab2e892SShuo Chen count_ = strtol(line.c_str(), NULL, 10); 2470ab2e892SShuo Chen if (count_ > 0) 2480ab2e892SShuo Chen { 2490ab2e892SShuo Chen word_ = line.substr(tab+1); 2500ab2e892SShuo Chen return true; 2510ab2e892SShuo Chen } 2520ab2e892SShuo Chen } 2530ab2e892SShuo Chen } 2540ab2e892SShuo Chen return false; 2550ab2e892SShuo Chen } 2560ab2e892SShuo Chen 2570ab2e892SShuo Chen bool operator<(const Source& rhs) const 2580ab2e892SShuo Chen { 2590ab2e892SShuo Chen return count_ < rhs.count_; 2600ab2e892SShuo Chen } 2610ab2e892SShuo Chen 262270b6cceSShuo Chen void outputTo(OutputFile* out) const 2630ab2e892SShuo Chen { 26485147189SShuo Chen //char buf[1024]; 26585147189SShuo Chen //snprintf(buf, sizeof buf, "%ld\t%s\n", count_, word_.c_str()); 26685147189SShuo Chen //out->write(buf); 267270b6cceSShuo Chen out->write(absl::StrFormat("%d\t%s\n", count_, word_)); 2680ab2e892SShuo Chen } 2690ab2e892SShuo Chen 27085147189SShuo Chen std::pair<int64_t, string> item() 27185147189SShuo Chen { 27285147189SShuo Chen return make_pair(count_, std::move(word_)); 27385147189SShuo Chen } 27485147189SShuo Chen 2750ab2e892SShuo Chen private: 276270b6cceSShuo Chen InputFile* in_; // not owned 2770ab2e892SShuo Chen int64_t count_; 2780ab2e892SShuo Chen string word_; 2790ab2e892SShuo Chen}; 2800ab2e892SShuo Chen 28185147189SShuo Chenint64_t merge() 2820ab2e892SShuo Chen{ 2834136e585SShuo Chen Timer timer; 284270b6cceSShuo Chen vector<unique_ptr<InputFile>> inputs; 2850ab2e892SShuo Chen vector<Source> keys; 2860ab2e892SShuo Chen 2874136e585SShuo Chen int64_t total = 0; 2880ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 2890ab2e892SShuo Chen { 2900ab2e892SShuo Chen char buf[256]; 2910ab2e892SShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, kShards); 2924136e585SShuo Chen struct stat st; 293a6693141SShuo Chen if (::stat(buf, &st) == 0) 2940ab2e892SShuo Chen { 295a6693141SShuo Chen total += st.st_size; 29685147189SShuo Chen // TODO: select buffer size based on kShards. 29785147189SShuo Chen inputs.push_back(std::make_unique<InputFile>(buf, 32 * 1024 * 1024)); 298a6693141SShuo Chen Source rec(inputs.back().get()); 299a6693141SShuo Chen if (rec.next()) 300a6693141SShuo Chen { 301a6693141SShuo Chen keys.push_back(rec); 302a6693141SShuo Chen } 30385147189SShuo Chen if (!g_keep) 304a6693141SShuo Chen ::unlink(buf); 305a6693141SShuo Chen } 306a6693141SShuo Chen else 307a6693141SShuo Chen { 308a6693141SShuo Chen perror("Unable to stat file:"); 3090ab2e892SShuo Chen } 3100ab2e892SShuo Chen } 3112a129a12SShuo Chen LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total"; 3120ab2e892SShuo Chen 3134136e585SShuo Chen { 31485147189SShuo Chen OutputFile out(g_output); 31585147189SShuo Chen /* 31685147189SShuo Chen muduo::BoundedBlockingQueue<vector<std::pair<int64_t, string>>> queue(1024); 31785147189SShuo Chen muduo::Thread thr([&queue] { 31885147189SShuo Chen OutputFile out(g_output); 31985147189SShuo Chen while (true) { 32085147189SShuo Chen auto vec = queue.take(); 32185147189SShuo Chen if (vec.size() == 0) 32285147189SShuo Chen break; 32385147189SShuo Chen for (const auto& x : vec) 32485147189SShuo Chen out.write(absl::StrFormat("%d\t%s\n", x.first, x.second)); 32585147189SShuo Chen } 32685147189SShuo Chen }); 32785147189SShuo Chen thr.start(); 32885147189SShuo Chen 32985147189SShuo Chen vector<std::pair<int64_t, string>> batch; 33085147189SShuo Chen */ 3310ab2e892SShuo Chen std::make_heap(keys.begin(), keys.end()); 3320ab2e892SShuo Chen while (!keys.empty()) 3330ab2e892SShuo Chen { 3340ab2e892SShuo Chen std::pop_heap(keys.begin(), keys.end()); 335270b6cceSShuo Chen keys.back().outputTo(&out); 33685147189SShuo Chen /* 33785147189SShuo Chen batch.push_back(std::move(keys.back().item())); 33885147189SShuo Chen if (batch.size() >= 10*1024*1024) 33985147189SShuo Chen { 34085147189SShuo Chen queue.put(std::move(batch)); 34185147189SShuo Chen batch.clear(); 34285147189SShuo Chen } 34385147189SShuo Chen */ 3440ab2e892SShuo Chen 3450ab2e892SShuo Chen if (keys.back().next()) 3460ab2e892SShuo Chen { 3470ab2e892SShuo Chen std::push_heap(keys.begin(), keys.end()); 3480ab2e892SShuo Chen } 3490ab2e892SShuo Chen else 3500ab2e892SShuo Chen { 3510ab2e892SShuo Chen keys.pop_back(); 3520ab2e892SShuo Chen } 3530ab2e892SShuo Chen } 35485147189SShuo Chen /* 35585147189SShuo Chen queue.put(batch); 35685147189SShuo Chen batch.clear(); 35785147189SShuo Chen queue.put(batch); 35885147189SShuo Chen thr.join(); 35985147189SShuo Chen */ 3604136e585SShuo Chen } 361a251380aSShuo Chen LOG_INFO << "Merging done " << timer.report(total); 3622a129a12SShuo Chen return total; 3630ab2e892SShuo Chen} 3640ab2e892SShuo Chen 3650ab2e892SShuo Chenint main(int argc, char* argv[]) 3660ab2e892SShuo Chen{ 3670ab2e892SShuo Chen /* 3680ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 36985147189SShuo Chen double t = Timer::now(); 3704136e585SShuo Chen int64_t len = count_shard(0, fd); 37185147189SShuo Chen double sec = Timer::now() - t; 3724136e585SShuo Chen printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6); 3734136e585SShuo Chen */ 3740ab2e892SShuo Chen 3752a129a12SShuo Chen int opt; 37685147189SShuo Chen int count_only = 0; 37785147189SShuo Chen bool merge_only = false; 37885147189SShuo Chen while ((opt = getopt(argc, argv, "c:kmo:p:s:t:v")) != -1) 3792a129a12SShuo Chen { 3802a129a12SShuo Chen switch (opt) 3812a129a12SShuo Chen { 38285147189SShuo Chen case 'c': 38385147189SShuo Chen count_only = atoi(optarg); 38485147189SShuo Chen break; 3852a129a12SShuo Chen case 'k': 38685147189SShuo Chen g_keep = true; 38785147189SShuo Chen break; 38885147189SShuo Chen case 'm': 38985147189SShuo Chen merge_only = true; 3902a129a12SShuo Chen break; 391a6693141SShuo Chen case 'o': 39285147189SShuo Chen g_output = optarg; 39385147189SShuo Chen break; 39485147189SShuo Chen case 'p': // Path for temp shard files 39585147189SShuo Chen shard_dir = optarg; 396a6693141SShuo Chen break; 3972a129a12SShuo Chen case 's': 3982a129a12SShuo Chen kShards = atoi(optarg); 3992a129a12SShuo Chen break; 400a6693141SShuo Chen case 't': 40185147189SShuo Chen kThreads = atoi(optarg); 402a6693141SShuo Chen break; 4032a129a12SShuo Chen case 'v': 40485147189SShuo Chen g_verbose = true; 4052a129a12SShuo Chen break; 4062a129a12SShuo Chen } 4072a129a12SShuo Chen } 4082a129a12SShuo Chen 40985147189SShuo Chen if (count_only > 0 || merge_only) 41085147189SShuo Chen { 41185147189SShuo Chen g_keep = true; 41285147189SShuo Chen g_verbose = true; 41385147189SShuo Chen count_only = std::min(count_only, kShards); 41485147189SShuo Chen 41585147189SShuo Chen if (count_only > 0) 41685147189SShuo Chen { 41785147189SShuo Chen count_shards(count_only); 41885147189SShuo Chen } 41985147189SShuo Chen 42085147189SShuo Chen if (merge_only) 42185147189SShuo Chen { 42285147189SShuo Chen merge(); 42385147189SShuo Chen } 42485147189SShuo Chen } 42585147189SShuo Chen else 42685147189SShuo Chen { 42785147189SShuo Chen // Run all three steps 42885147189SShuo Chen Timer timer; 42985147189SShuo Chen LOG_INFO << argc - optind << " input files, " << kShards << " shards, " 43085147189SShuo Chen << "output " << g_output <<" , temp " << shard_dir; 43185147189SShuo Chen int64_t input = 0; 43285147189SShuo Chen input = shard_(argc, argv); 43385147189SShuo Chen count_shards(kShards); 43485147189SShuo Chen int64_t output_size = merge(); 43585147189SShuo Chen LOG_INFO << "All done " << timer.report(input) << " output " << output_size; 43685147189SShuo Chen } 4370ab2e892SShuo Chen} 438