10ab2e892SShuo Chen/* sort word by frequency, sharding version. 20ab2e892SShuo Chen 30ab2e892SShuo Chen 1. read input file, shard to N files: 40ab2e892SShuo Chen word 50ab2e892SShuo Chen 2. assume each shard file fits in memory, read each shard file, count words and sort by count, then write to N count files: 60ab2e892SShuo Chen count \t word 70ab2e892SShuo Chen 3. merge N count files using heap. 80ab2e892SShuo Chen 90ab2e892SShuo ChenLimits: each shard must fit in memory. 100ab2e892SShuo Chen*/ 110ab2e892SShuo Chen 120ab2e892SShuo Chen#include <assert.h> 130ab2e892SShuo Chen 1485147189SShuo Chen#include "file.h" 15da39c979SShuo Chen#include "merge.h" 1685147189SShuo Chen#include "timer.h" 1785147189SShuo Chen 184136e585SShuo Chen#include "absl/container/flat_hash_map.h" 192cf09315SShuo Chen#include "absl/hash/hash.h" 202a129a12SShuo Chen#include "absl/strings/str_format.h" 212a129a12SShuo Chen#include "muduo/base/Logging.h" 22a251380aSShuo Chen#include "muduo/base/ThreadPool.h" 234136e585SShuo Chen 240ab2e892SShuo Chen#include <algorithm> 250ab2e892SShuo Chen#include <memory> 260ab2e892SShuo Chen#include <string> 270ab2e892SShuo Chen#include <unordered_map> 280ab2e892SShuo Chen#include <vector> 290ab2e892SShuo Chen 300ab2e892SShuo Chen#include <fcntl.h> 310ab2e892SShuo Chen#include <string.h> 320ab2e892SShuo Chen#include <sys/mman.h> 334136e585SShuo Chen#include <sys/stat.h> 340ab2e892SShuo Chen#include <unistd.h> 350ab2e892SShuo Chen 362cf09315SShuo Chenusing absl::string_view; 370ab2e892SShuo Chenusing std::string; 380ab2e892SShuo Chenusing std::vector; 390ab2e892SShuo Chenusing std::unique_ptr; 400ab2e892SShuo Chen 4185147189SShuo Chenint kShards = 10, kThreads = 4; 4285147189SShuo Chenbool g_verbose = false, g_keep = false; 43a6693141SShuo Chenconst char* shard_dir = "."; 4485147189SShuo Chenconst char* g_output = "output"; 45270b6cceSShuo Chen 460ab2e892SShuo Chenclass Sharder // : boost::noncopyable 470ab2e892SShuo Chen{ 480ab2e892SShuo Chen public: 490ab2e892SShuo Chen Sharder() 500ab2e892SShuo Chen : files_(kShards) 510ab2e892SShuo Chen { 520ab2e892SShuo Chen for (int i = 0; i < kShards; ++i) 530ab2e892SShuo Chen { 540ab2e892SShuo Chen char name[256]; 55a6693141SShuo Chen snprintf(name, sizeof name, "%s/shard-%05d-of-%05d", shard_dir, i, kShards); 560ab2e892SShuo Chen files_[i].reset(new OutputFile(name)); 570ab2e892SShuo Chen } 580ab2e892SShuo Chen assert(files_.size() == static_cast<size_t>(kShards)); 590ab2e892SShuo Chen } 600ab2e892SShuo Chen 610ab2e892SShuo Chen void output(string_view word) 620ab2e892SShuo Chen { 630ab2e892SShuo Chen size_t shard = hash(word) % files_.size(); 64270b6cceSShuo Chen files_[shard]->appendRecord(word); 650ab2e892SShuo Chen } 660ab2e892SShuo Chen 670ab2e892SShuo Chen void finish() 680ab2e892SShuo Chen { 694136e585SShuo Chen int shard = 0; 704136e585SShuo Chen for (const auto& file : files_) 710ab2e892SShuo Chen { 7285147189SShuo Chen // if (g_verbose) 734136e585SShuo Chen printf(" shard %d: %ld bytes, %ld items\n", shard, file->tell(), file->items()); 744136e585SShuo Chen ++shard; 754136e585SShuo Chen file->close(); 760ab2e892SShuo Chen } 770ab2e892SShuo Chen } 780ab2e892SShuo Chen 790ab2e892SShuo Chen private: 802cf09315SShuo Chen absl::Hash<string_view> hash; 810ab2e892SShuo Chen vector<unique_ptr<OutputFile>> files_; 820ab2e892SShuo Chen}; 830ab2e892SShuo Chen 844136e585SShuo Chenint64_t shard_(int argc, char* argv[]) 850ab2e892SShuo Chen{ 860ab2e892SShuo Chen Sharder sharder; 874136e585SShuo Chen Timer timer; 884136e585SShuo Chen int64_t total = 0; 892a129a12SShuo Chen for (int i = optind; i < argc; ++i) 900ab2e892SShuo Chen { 912a129a12SShuo Chen LOG_INFO << "Processing input file " << argv[i]; 9285147189SShuo Chen double t = Timer::now(); 9385147189SShuo Chen string line; 9485147189SShuo Chen InputFile input(argv[i]); 9585147189SShuo Chen while (input.getline(&line)) 960ab2e892SShuo Chen { 9785147189SShuo Chen sharder.output(line); 980ab2e892SShuo Chen } 9985147189SShuo Chen size_t len = input.tell(); 1004136e585SShuo Chen total += len; 10185147189SShuo Chen double sec = Timer::now() - t; 1022a129a12SShuo Chen LOG_INFO << "Done file " << argv[i] << absl::StrFormat(" %.3f sec %.2f MiB/s", sec, len / sec / 1024 / 1024); 1030ab2e892SShuo Chen } 1040ab2e892SShuo Chen sharder.finish(); 1052a129a12SShuo Chen LOG_INFO << "Sharding done " << timer.report(total); 1064136e585SShuo Chen return total; 1070ab2e892SShuo Chen} 1080ab2e892SShuo Chen 1090ab2e892SShuo Chen// ======= count_shards ======= 1100ab2e892SShuo Chen 111ecd7048bSShuo Chenvoid count_shard(int shard, int fd, size_t len) 1120ab2e892SShuo Chen{ 113ecd7048bSShuo Chen Timer timer; 114ecd7048bSShuo Chen 11585147189SShuo Chen double t = Timer::now(); 1162a129a12SShuo Chen LOG_INFO << absl::StrFormat("counting shard %d: input file size %ld", shard, len); 1174136e585SShuo Chen { 1180ab2e892SShuo Chen void* mapped = mmap(NULL, len, PROT_READ, MAP_PRIVATE, fd, 0); 1190ab2e892SShuo Chen assert(mapped != MAP_FAILED); 1200ab2e892SShuo Chen const uint8_t* const start = static_cast<const uint8_t*>(mapped); 1210ab2e892SShuo Chen const uint8_t* const end = start + len; 1220ab2e892SShuo Chen 1234136e585SShuo Chen // std::unordered_map<string_view, uint64_t> items; 1244136e585SShuo Chen absl::flat_hash_map<string_view, uint64_t> items; 1252a129a12SShuo Chen int64_t count = 0; 1260ab2e892SShuo Chen for (const uint8_t* p = start; p < end;) 1270ab2e892SShuo Chen { 1280ab2e892SShuo Chen string_view s((const char*)p+1, *p); 1290ab2e892SShuo Chen items[s]++; 1300ab2e892SShuo Chen p += 1 + *p; 1312a129a12SShuo Chen ++count; 1320ab2e892SShuo Chen } 133270b6cceSShuo Chen LOG_INFO << "items " << count << " unique " << items.size(); 13485147189SShuo Chen if (g_verbose) 13585147189SShuo Chen printf(" count %.3f sec %ld items\n", Timer::now() - t, items.size()); 1360ab2e892SShuo Chen 13785147189SShuo Chen t = Timer::now(); 1380ab2e892SShuo Chen vector<std::pair<size_t, string_view>> counts; 1390ab2e892SShuo Chen for (const auto& it : items) 1400ab2e892SShuo Chen { 1410ab2e892SShuo Chen if (it.second > 1) 1422cf09315SShuo Chen counts.push_back(std::make_pair(it.second, it.first)); 1430ab2e892SShuo Chen } 14485147189SShuo Chen if (g_verbose) 14585147189SShuo Chen printf(" select %.3f sec %ld\n", Timer::now() - t, counts.size()); 1460ab2e892SShuo Chen 14785147189SShuo Chen t = Timer::now(); 1480ab2e892SShuo Chen std::sort(counts.begin(), counts.end()); 14985147189SShuo Chen if (g_verbose) 15085147189SShuo Chen printf(" sort %.3f sec\n", Timer::now() - t); 1510ab2e892SShuo Chen 15285147189SShuo Chen t = Timer::now(); 153c377920eSShuo Chen int64_t out_len = 0; 1540ab2e892SShuo Chen { 155ecd7048bSShuo Chen char buf[256]; 156da39c979SShuo Chen snprintf(buf, sizeof buf, "count-%05d", shard); 157ecd7048bSShuo Chen OutputFile output(buf); 158ecd7048bSShuo Chen 1594136e585SShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 1600ab2e892SShuo Chen { 161c377920eSShuo Chen output.write(absl::StrFormat("%d\t%s\n", it->first, it->second)); 1624136e585SShuo Chen } 163270b6cceSShuo Chen 1644136e585SShuo Chen for (const auto& it : items) 1654136e585SShuo Chen { 1664136e585SShuo Chen if (it.second == 1) 1674136e585SShuo Chen { 168c377920eSShuo Chen output.write(absl::StrFormat("1\t%s\n", it.first)); 1694136e585SShuo Chen } 1700ab2e892SShuo Chen } 171c377920eSShuo Chen out_len = output.tell(); 1720ab2e892SShuo Chen } 173c377920eSShuo Chen if (g_verbose) 174c377920eSShuo Chen printf(" output %.3f sec %lu\n", Timer::now() - t, out_len); 1750ab2e892SShuo Chen 1760ab2e892SShuo Chen if (munmap(mapped, len)) 1770ab2e892SShuo Chen perror("munmap"); 1784136e585SShuo Chen } 179ecd7048bSShuo Chen ::close(fd); 180ecd7048bSShuo Chen LOG_INFO << "shard " << shard << " done " << timer.report(len); 1810ab2e892SShuo Chen} 1820ab2e892SShuo Chen 18385147189SShuo Chenvoid count_shards(int shards) 1840ab2e892SShuo Chen{ 18585147189SShuo Chen assert(shards <= kShards); 1864136e585SShuo Chen Timer timer; 1874136e585SShuo Chen int64_t total = 0; 188a251380aSShuo Chen muduo::ThreadPool threadPool; 18985147189SShuo Chen threadPool.setMaxQueueSize(2*kThreads); 19085147189SShuo Chen threadPool.start(kThreads); 19185147189SShuo Chen 19285147189SShuo Chen for (int shard = 0; shard < shards; ++shard) 1930ab2e892SShuo Chen { 1940ab2e892SShuo Chen char buf[256]; 195a6693141SShuo Chen snprintf(buf, sizeof buf, "%s/shard-%05d-of-%05d", shard_dir, shard, kShards); 1960ab2e892SShuo Chen int fd = open(buf, O_RDONLY); 197ecd7048bSShuo Chen assert(fd >= 0); 19885147189SShuo Chen if (!g_keep) 199ecd7048bSShuo Chen ::unlink(buf); 2002a129a12SShuo Chen 201ecd7048bSShuo Chen struct stat st; 202ecd7048bSShuo Chen if (::fstat(fd, &st) == 0) 203ecd7048bSShuo Chen { 204ecd7048bSShuo Chen size_t len = st.st_size; 205ecd7048bSShuo Chen total += len; 206ecd7048bSShuo Chen threadPool.run([shard, fd, len]{ count_shard(shard, fd, len); }); 207ecd7048bSShuo Chen } 208a251380aSShuo Chen } 209a251380aSShuo Chen while (threadPool.queueSize() > 0) 210a251380aSShuo Chen { 21185147189SShuo Chen LOG_DEBUG << "waiting for ThreadPool " << threadPool.queueSize(); 212ecd7048bSShuo Chen muduo::CurrentThread::sleepUsec(1000*1000); 2130ab2e892SShuo Chen } 214a251380aSShuo Chen threadPool.stop(); 215270b6cceSShuo Chen LOG_INFO << "Counting done "<< timer.report(total); 2160ab2e892SShuo Chen} 2170ab2e892SShuo Chen 2180ab2e892SShuo Chen// ======= merge ======= 2190ab2e892SShuo Chen 2200ab2e892SShuo Chenint main(int argc, char* argv[]) 2210ab2e892SShuo Chen{ 2220ab2e892SShuo Chen /* 2230ab2e892SShuo Chen int fd = open("shard-00000-of-00010", O_RDONLY); 22485147189SShuo Chen double t = Timer::now(); 2254136e585SShuo Chen int64_t len = count_shard(0, fd); 22685147189SShuo Chen double sec = Timer::now() - t; 2274136e585SShuo Chen printf("count_shard %.3f sec %.2f MB/s\n", sec, len / sec / 1e6); 2284136e585SShuo Chen */ 2293e607da5SShuo Chen setlocale(LC_NUMERIC, ""); 2300ab2e892SShuo Chen 2312a129a12SShuo Chen int opt; 23285147189SShuo Chen int count_only = 0; 233da39c979SShuo Chen int merge_only = 0; 234da39c979SShuo Chen while ((opt = getopt(argc, argv, "c:km:o:p:s:t:v")) != -1) 2352a129a12SShuo Chen { 2362a129a12SShuo Chen switch (opt) 2372a129a12SShuo Chen { 23885147189SShuo Chen case 'c': 23985147189SShuo Chen count_only = atoi(optarg); 24085147189SShuo Chen break; 2412a129a12SShuo Chen case 'k': 24285147189SShuo Chen g_keep = true; 24385147189SShuo Chen break; 24485147189SShuo Chen case 'm': 245da39c979SShuo Chen merge_only = atoi(optarg); 2462a129a12SShuo Chen break; 247a6693141SShuo Chen case 'o': 24885147189SShuo Chen g_output = optarg; 24985147189SShuo Chen break; 25085147189SShuo Chen case 'p': // Path for temp shard files 25185147189SShuo Chen shard_dir = optarg; 252a6693141SShuo Chen break; 2532a129a12SShuo Chen case 's': 2542a129a12SShuo Chen kShards = atoi(optarg); 2552a129a12SShuo Chen break; 256a6693141SShuo Chen case 't': 25785147189SShuo Chen kThreads = atoi(optarg); 258a6693141SShuo Chen break; 2592a129a12SShuo Chen case 'v': 26085147189SShuo Chen g_verbose = true; 2612a129a12SShuo Chen break; 2622a129a12SShuo Chen } 2632a129a12SShuo Chen } 2642a129a12SShuo Chen 26585147189SShuo Chen if (count_only > 0 || merge_only) 26685147189SShuo Chen { 26785147189SShuo Chen g_keep = true; 2683a0488b5SShuo Chen //g_verbose = true; 26985147189SShuo Chen count_only = std::min(count_only, kShards); 27085147189SShuo Chen 27185147189SShuo Chen if (count_only > 0) 27285147189SShuo Chen { 27385147189SShuo Chen count_shards(count_only); 27485147189SShuo Chen } 27585147189SShuo Chen 276da39c979SShuo Chen if (merge_only > 0) 27785147189SShuo Chen { 278da39c979SShuo Chen merge(merge_only); 27985147189SShuo Chen } 28085147189SShuo Chen } 28185147189SShuo Chen else 28285147189SShuo Chen { 28385147189SShuo Chen // Run all three steps 28485147189SShuo Chen Timer timer; 28585147189SShuo Chen LOG_INFO << argc - optind << " input files, " << kShards << " shards, " 28685147189SShuo Chen << "output " << g_output <<" , temp " << shard_dir; 28785147189SShuo Chen int64_t input = 0; 28885147189SShuo Chen input = shard_(argc, argv); 28985147189SShuo Chen count_shards(kShards); 290da39c979SShuo Chen int64_t output_size = merge(kShards); 29185147189SShuo Chen LOG_INFO << "All done " << timer.report(input) << " output " << output_size; 29285147189SShuo Chen } 2930ab2e892SShuo Chen} 294