word_freq_shards.cc revision 0ab2e892
10ab2e892SShuo Chen/* sort word by frequency, sharding while counting version. 2df3173cbSShuo Chen 3df3173cbSShuo Chen 1. read input file, do counting, if counts > 10M keys, write counts to 10 shard files: 4df3173cbSShuo Chen word \t count 5df3173cbSShuo Chen 2. assume each shard file fits in memory, read each shard file, accumulate counts, and write to 10 count files: 6df3173cbSShuo Chen count \t word 7df3173cbSShuo Chen 3. merge 10 count files using heap. 8df3173cbSShuo Chen 9df3173cbSShuo ChenLimits: each shard must fit in memory. 10df3173cbSShuo Chen*/ 11144e8e4eSShuo Chen#include <boost/noncopyable.hpp> 12144e8e4eSShuo Chen#include <boost/ptr_container/ptr_vector.hpp> 13144e8e4eSShuo Chen 14144e8e4eSShuo Chen#include <fstream> 15144e8e4eSShuo Chen#include <iostream> 16144e8e4eSShuo Chen#include <unordered_map> 17144e8e4eSShuo Chen 18144e8e4eSShuo Chen#ifdef STD_STRING 19144e8e4eSShuo Chen#warning "STD STRING" 20144e8e4eSShuo Chen#include <string> 21144e8e4eSShuo Chenusing std::string; 22144e8e4eSShuo Chen#else 23144e8e4eSShuo Chen#include <ext/vstring.h> 24144e8e4eSShuo Chentypedef __gnu_cxx::__sso_string string; 25144e8e4eSShuo Chen#endif 26144e8e4eSShuo Chen 27144e8e4eSShuo Chenconst size_t kMaxSize = 10 * 1000 * 1000; 28144e8e4eSShuo Chen 29144e8e4eSShuo Chenclass Sharder : boost::noncopyable 30144e8e4eSShuo Chen{ 31144e8e4eSShuo Chen public: 32144e8e4eSShuo Chen explicit Sharder(int nbuckets) 33144e8e4eSShuo Chen : buckets_(nbuckets) 34144e8e4eSShuo Chen { 35144e8e4eSShuo Chen for (int i = 0; i < nbuckets; ++i) 36144e8e4eSShuo Chen { 37144e8e4eSShuo Chen char buf[256]; 38144e8e4eSShuo Chen snprintf(buf, sizeof buf, "shard-%05d-of-%05d", i, nbuckets); 39144e8e4eSShuo Chen buckets_.push_back(new std::ofstream(buf)); 40144e8e4eSShuo Chen } 41144e8e4eSShuo Chen assert(buckets_.size() == static_cast<size_t>(nbuckets)); 42144e8e4eSShuo Chen } 43144e8e4eSShuo Chen 44cc454125SShuo Chen void output(const string& word, int64_t count) 45144e8e4eSShuo Chen { 46cc454125SShuo Chen size_t idx = std::hash<string>()(word) % buckets_.size(); 47cc454125SShuo Chen buckets_[idx] << word << '\t' << count << '\n'; 48144e8e4eSShuo Chen } 49144e8e4eSShuo Chen 50144e8e4eSShuo Chen protected: 51144e8e4eSShuo Chen boost::ptr_vector<std::ofstream> buckets_; 52144e8e4eSShuo Chen}; 53144e8e4eSShuo Chen 54144e8e4eSShuo Chenvoid shard(int nbuckets, int argc, char* argv[]) 55144e8e4eSShuo Chen{ 56144e8e4eSShuo Chen Sharder sharder(nbuckets); 57144e8e4eSShuo Chen for (int i = 1; i < argc; ++i) 58144e8e4eSShuo Chen { 59144e8e4eSShuo Chen std::cout << " processing input file " << argv[i] << std::endl; 60cc454125SShuo Chen std::unordered_map<string, int64_t> counts; 61144e8e4eSShuo Chen std::ifstream in(argv[i]); 62144e8e4eSShuo Chen while (in && !in.eof()) 63144e8e4eSShuo Chen { 64cc454125SShuo Chen counts.clear(); 65cc454125SShuo Chen string word; 66cc454125SShuo Chen while (in >> word) 67144e8e4eSShuo Chen { 687e3924f8SShuo Chen counts[word]++; 69cc454125SShuo Chen if (counts.size() > kMaxSize) 70144e8e4eSShuo Chen { 71144e8e4eSShuo Chen std::cout << " split" << std::endl; 72144e8e4eSShuo Chen break; 73144e8e4eSShuo Chen } 74144e8e4eSShuo Chen } 75144e8e4eSShuo Chen 767e3924f8SShuo Chen for (const auto& kv : counts) 77144e8e4eSShuo Chen { 78144e8e4eSShuo Chen sharder.output(kv.first, kv.second); 79144e8e4eSShuo Chen } 80144e8e4eSShuo Chen } 81144e8e4eSShuo Chen } 82144e8e4eSShuo Chen std::cout << "shuffling done" << std::endl; 83144e8e4eSShuo Chen} 84144e8e4eSShuo Chen 857e3924f8SShuo Chen// ======= sort_shards ======= 86144e8e4eSShuo Chen 87144e8e4eSShuo Chenstd::unordered_map<string, int64_t> read_shard(int idx, int nbuckets) 88144e8e4eSShuo Chen{ 89cc454125SShuo Chen std::unordered_map<string, int64_t> counts; 90144e8e4eSShuo Chen 91144e8e4eSShuo Chen char buf[256]; 92144e8e4eSShuo Chen snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets); 93144e8e4eSShuo Chen std::cout << " reading " << buf << std::endl; 94144e8e4eSShuo Chen { 95144e8e4eSShuo Chen std::ifstream in(buf); 96144e8e4eSShuo Chen string line; 97144e8e4eSShuo Chen 98144e8e4eSShuo Chen while (getline(in, line)) 99144e8e4eSShuo Chen { 100144e8e4eSShuo Chen size_t tab = line.find('\t'); 101144e8e4eSShuo Chen if (tab != string::npos) 102144e8e4eSShuo Chen { 103144e8e4eSShuo Chen int64_t count = strtol(line.c_str() + tab, NULL, 10); 104144e8e4eSShuo Chen if (count > 0) 105144e8e4eSShuo Chen { 106cc454125SShuo Chen counts[line.substr(0, tab)] += count; 107144e8e4eSShuo Chen } 108144e8e4eSShuo Chen } 109144e8e4eSShuo Chen } 110144e8e4eSShuo Chen } 111144e8e4eSShuo Chen 112144e8e4eSShuo Chen ::unlink(buf); 113cc454125SShuo Chen return counts; 114144e8e4eSShuo Chen} 115144e8e4eSShuo Chen 1167e3924f8SShuo Chenvoid sort_shards(const int nbuckets) 117144e8e4eSShuo Chen{ 118144e8e4eSShuo Chen for (int i = 0; i < nbuckets; ++i) 119144e8e4eSShuo Chen { 120144e8e4eSShuo Chen // std::cout << " sorting " << std::endl; 121144e8e4eSShuo Chen std::vector<std::pair<int64_t, string>> counts; 122bdeb7a78SShuo Chen for (const auto& entry : read_shard(i, nbuckets)) 123144e8e4eSShuo Chen { 124144e8e4eSShuo Chen counts.push_back(make_pair(entry.second, entry.first)); 125144e8e4eSShuo Chen } 126144e8e4eSShuo Chen std::sort(counts.begin(), counts.end()); 127144e8e4eSShuo Chen 128144e8e4eSShuo Chen char buf[256]; 129144e8e4eSShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets); 130144e8e4eSShuo Chen std::ofstream out(buf); 131144e8e4eSShuo Chen std::cout << " writing " << buf << std::endl; 132144e8e4eSShuo Chen for (auto it = counts.rbegin(); it != counts.rend(); ++it) 133144e8e4eSShuo Chen { 134144e8e4eSShuo Chen out << it->first << '\t' << it->second << '\n'; 135144e8e4eSShuo Chen } 136144e8e4eSShuo Chen } 137144e8e4eSShuo Chen 138144e8e4eSShuo Chen std::cout << "reducing done" << std::endl; 139144e8e4eSShuo Chen} 140144e8e4eSShuo Chen 141144e8e4eSShuo Chen// ======= merge ======= 142144e8e4eSShuo Chen 143cc454125SShuo Chenclass Source // copyable 144144e8e4eSShuo Chen{ 145144e8e4eSShuo Chen public: 1467e3924f8SShuo Chen explicit Source(std::istream* in) 147144e8e4eSShuo Chen : in_(in), 148144e8e4eSShuo Chen count_(0), 149cc454125SShuo Chen word_() 150144e8e4eSShuo Chen { 151144e8e4eSShuo Chen } 152144e8e4eSShuo Chen 153144e8e4eSShuo Chen bool next() 154144e8e4eSShuo Chen { 155144e8e4eSShuo Chen string line; 156144e8e4eSShuo Chen if (getline(*in_, line)) 157144e8e4eSShuo Chen { 158144e8e4eSShuo Chen size_t tab = line.find('\t'); 159144e8e4eSShuo Chen if (tab != string::npos) 160144e8e4eSShuo Chen { 161144e8e4eSShuo Chen count_ = strtol(line.c_str(), NULL, 10); 162144e8e4eSShuo Chen if (count_ > 0) 163144e8e4eSShuo Chen { 164cc454125SShuo Chen word_ = line.substr(tab+1); 165144e8e4eSShuo Chen return true; 166144e8e4eSShuo Chen } 167144e8e4eSShuo Chen } 168144e8e4eSShuo Chen } 169144e8e4eSShuo Chen return false; 170144e8e4eSShuo Chen } 171144e8e4eSShuo Chen 172144e8e4eSShuo Chen bool operator<(const Source& rhs) const 173144e8e4eSShuo Chen { 174144e8e4eSShuo Chen return count_ < rhs.count_; 175144e8e4eSShuo Chen } 176144e8e4eSShuo Chen 177cc454125SShuo Chen void outputTo(std::ostream& out) const 178144e8e4eSShuo Chen { 179cc454125SShuo Chen out << count_ << '\t' << word_ << '\n'; 180144e8e4eSShuo Chen } 181144e8e4eSShuo Chen 182144e8e4eSShuo Chen private: 1837e3924f8SShuo Chen std::istream* in_; 184144e8e4eSShuo Chen int64_t count_; 185cc454125SShuo Chen string word_; 186144e8e4eSShuo Chen}; 187144e8e4eSShuo Chen 188144e8e4eSShuo Chenvoid merge(const int nbuckets) 189144e8e4eSShuo Chen{ 190144e8e4eSShuo Chen boost::ptr_vector<std::ifstream> inputs; 191144e8e4eSShuo Chen std::vector<Source> keys; 192144e8e4eSShuo Chen 193144e8e4eSShuo Chen for (int i = 0; i < nbuckets; ++i) 194144e8e4eSShuo Chen { 195144e8e4eSShuo Chen char buf[256]; 196144e8e4eSShuo Chen snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets); 197144e8e4eSShuo Chen inputs.push_back(new std::ifstream(buf)); 198144e8e4eSShuo Chen Source rec(&inputs.back()); 199144e8e4eSShuo Chen if (rec.next()) 200144e8e4eSShuo Chen { 201144e8e4eSShuo Chen keys.push_back(rec); 202144e8e4eSShuo Chen } 203144e8e4eSShuo Chen ::unlink(buf); 204144e8e4eSShuo Chen } 205144e8e4eSShuo Chen 206144e8e4eSShuo Chen std::ofstream out("output"); 207144e8e4eSShuo Chen std::make_heap(keys.begin(), keys.end()); 208144e8e4eSShuo Chen while (!keys.empty()) 209144e8e4eSShuo Chen { 210144e8e4eSShuo Chen std::pop_heap(keys.begin(), keys.end()); 211cc454125SShuo Chen keys.back().outputTo(out); 212144e8e4eSShuo Chen 213144e8e4eSShuo Chen if (keys.back().next()) 214144e8e4eSShuo Chen { 215144e8e4eSShuo Chen std::push_heap(keys.begin(), keys.end()); 216144e8e4eSShuo Chen } 217144e8e4eSShuo Chen else 218144e8e4eSShuo Chen { 219144e8e4eSShuo Chen keys.pop_back(); 220144e8e4eSShuo Chen } 221144e8e4eSShuo Chen } 222144e8e4eSShuo Chen std::cout << "merging done\n"; 223144e8e4eSShuo Chen} 224144e8e4eSShuo Chen 225144e8e4eSShuo Chenint main(int argc, char* argv[]) 226144e8e4eSShuo Chen{ 227144e8e4eSShuo Chen int nbuckets = 10; 228144e8e4eSShuo Chen shard(nbuckets, argc, argv); 2297e3924f8SShuo Chen sort_shards(nbuckets); 230144e8e4eSShuo Chen merge(nbuckets); 231144e8e4eSShuo Chen} 232