1/* sort word by frequency, sorting while counting version. 2 3 1. read input files, do counting, when count map > 10M keys, output to segment files 4 word \t count -- sorted by word 5 2. read all segment files, do merging & counting, when count map > 10M keys, output to count files, each word goes to one count file only. 6 count \t word -- sorted by count 7 3. read all count files, do merging and output 8*/ 9#include <algorithm> 10#include <fstream> 11#include <iostream> 12#include <map> 13#include <memory> 14#include <unordered_map> 15#include <vector> 16 17#ifdef STD_STRING 18#warning "STD STRING" 19#include <string> 20using std::string; 21#else 22#include <ext/vstring.h> 23typedef __gnu_cxx::__sso_string string; 24#endif 25 26#include <sys/time.h> 27#include <unistd.h> 28 29const size_t kMaxSize = 10 * 1000 * 1000; 30 31inline double now() 32{ 33 struct timeval tv = { 0, 0 }; 34 gettimeofday(&tv, nullptr); 35 return tv.tv_sec + tv.tv_usec / 1000000.0; 36} 37 38int input(int argc, char* argv[]) 39{ 40 int count = 0; 41 double t = now(); 42 for (int i = 1; i < argc; ++i) 43 { 44 std::cout << " processing input file " << argv[i] << std::endl; 45 std::map<string, int64_t> counts; 46 std::ifstream in(argv[i]); 47 while (in && !in.eof()) 48 { 49 double tt = now(); 50 counts.clear(); 51 string word; 52 while (in >> word) 53 { 54 counts[word]++; 55 if (counts.size() > kMaxSize) 56 { 57 std::cout << " split " << now() - tt << " sec" << std::endl; 58 break; 59 } 60 } 61 62 tt = now(); 63 char buf[256]; 64 snprintf(buf, sizeof buf, "segment-%05d", count); 65 std::ofstream out(buf); 66 ++count; 67 for (const auto& kv : counts) 68 { 69 out << kv.first << '\t' << kv.second << '\n'; 70 } 71 std::cout << " writing " << buf << " " << now() - tt << " sec" << std::endl; 72 } 73 } 74 std::cout << "reading done " << count << " segments " << now() - t << " sec" << std::endl; 75 return count; 76} 77 78// ======= combine ======= 79 80class Segment // copyable 81{ 82 public: 83 string word; 84 int64_t count = 0; 85 86 explicit Segment(std::istream* in) 87 : in_(in) 88 { 89 } 90 91 bool next() 92 { 93 string line; 94 if (getline(*in_, line)) 95 { 96 size_t tab = line.find('\t'); 97 if (tab != string::npos) 98 { 99 word = line.substr(0, tab); 100 count = strtol(line.c_str() + tab, NULL, 10); 101 return true; 102 } 103 } 104 return false; 105 } 106 107 bool operator<(const Segment& rhs) const 108 { 109 return word > rhs.word; 110 } 111 112 private: 113 std::istream* in_; 114}; 115 116void output(int i, const std::unordered_map<string, int64_t>& counts) 117{ 118 double t = now(); 119 std::vector<std::pair<int64_t, string>> freq; 120 for (const auto& entry : counts) 121 { 122 freq.push_back(make_pair(entry.second, entry.first)); 123 } 124 std::sort(freq.begin(), freq.end()); 125 std::cout << " sorting " << now() - t << " sec" << std::endl; 126 127 t = now(); 128 char buf[256]; 129 snprintf(buf, sizeof buf, "count-%05d", i); 130 std::ofstream out(buf); 131 for (auto it = freq.rbegin(); it != freq.rend(); ++it) 132 { 133 out << it->first << '\t' << it->second << '\n'; 134 } 135 std::cout << " writing " << buf << " " << now() - t << " sec" << std::endl; 136} 137 138int combine(int count) 139{ 140 std::vector<std::unique_ptr<std::ifstream>> inputs; 141 std::vector<Segment> keys; 142 143 double t = now(); 144 145 for (int i = 0; i < count; ++i) 146 { 147 char buf[256]; 148 snprintf(buf, sizeof buf, "segment-%05d", i); 149 inputs.emplace_back(new std::ifstream(buf)); 150 Segment rec(inputs.back().get()); 151 if (rec.next()) 152 { 153 keys.push_back(rec); 154 } 155 ::unlink(buf); 156 } 157 158 // std::cout << keys.size() << '\n'; 159 int m = 0; 160 string last; 161 std::unordered_map<string, int64_t> counts; 162 std::make_heap(keys.begin(), keys.end()); 163 164 double tt = now(); 165 while (!keys.empty()) 166 { 167 std::pop_heap(keys.begin(), keys.end()); 168 169 if (keys.back().word != last) 170 { 171 last = keys.back().word; 172 if (counts.size() > kMaxSize) 173 { 174 std::cout << " split " << now() - tt << " sec" << std::endl; 175 output(m++, counts); 176 tt = now(); 177 counts.clear(); 178 } 179 } 180 181 counts[keys.back().word] += keys.back().count; 182 183 if (keys.back().next()) 184 { 185 std::push_heap(keys.begin(), keys.end()); 186 } 187 else 188 { 189 keys.pop_back(); 190 } 191 } 192 193 if (!counts.empty()) 194 { 195 std::cout << " split " << now() - tt << " sec" << std::endl; 196 output(m++, counts); 197 } 198 std::cout << "combining done " << m << " count files " << now() - t << " sec" << std::endl; 199 return m; 200} 201 202// ======= merge ======= 203 204class Source // copyable 205{ 206 public: 207 explicit Source(std::istream* in) 208 : in_(in) 209 { 210 } 211 212 bool next() 213 { 214 string line; 215 if (getline(*in_, line)) 216 { 217 size_t tab = line.find('\t'); 218 if (tab != string::npos) 219 { 220 count_ = strtol(line.c_str(), NULL, 10); 221 if (count_ > 0) 222 { 223 word_ = line.substr(tab+1); 224 return true; 225 } 226 } 227 } 228 return false; 229 } 230 231 bool operator<(const Source& rhs) const 232 { 233 return count_ < rhs.count_; 234 } 235 236 void outputTo(std::ostream& out) const 237 { 238 out << count_ << '\t' << word_ << '\n'; 239 } 240 241 private: 242 std::istream* in_; 243 int64_t count_ = 0; 244 string word_; 245}; 246 247void merge(int m) 248{ 249 std::vector<std::unique_ptr<std::ifstream>> inputs; 250 std::vector<Source> keys; 251 252 double t = now(); 253 for (int i = 0; i < m; ++i) 254 { 255 char buf[256]; 256 snprintf(buf, sizeof buf, "count-%05d", i); 257 inputs.emplace_back(new std::ifstream(buf)); 258 Source rec(inputs.back().get()); 259 if (rec.next()) 260 { 261 keys.push_back(rec); 262 } 263 ::unlink(buf); 264 } 265 266 std::ofstream out("output"); 267 std::make_heap(keys.begin(), keys.end()); 268 while (!keys.empty()) 269 { 270 std::pop_heap(keys.begin(), keys.end()); 271 keys.back().outputTo(out); 272 273 if (keys.back().next()) 274 { 275 std::push_heap(keys.begin(), keys.end()); 276 } 277 else 278 { 279 keys.pop_back(); 280 } 281 } 282 std::cout << "merging done " << now() - t << " sec\n"; 283} 284 285int main(int argc, char* argv[]) 286{ 287 int count = input(argc, argv); 288 int m = combine(count); 289 merge(m); 290} 291