1/* sort word by frequency, sorting while counting version.
2
3   1. read input files, do counting, when count map > 10M keys, output to segment files
4      word \t count  -- sorted by word
5   2. read all segment files, do merging & counting, when count map > 10M keys, output to count files, each word goes to one count file only.
6      count \t word  -- sorted by count
7   3. read all count files, do merging and output
8*/
9#include <algorithm>
10#include <fstream>
11#include <iostream>
12#include <map>
13#include <memory>
14#include <unordered_map>
15#include <vector>
16
17#ifdef STD_STRING
18#warning "STD STRING"
19#include <string>
20using std::string;
21#else
22#include <ext/vstring.h>
23typedef __gnu_cxx::__sso_string string;
24#endif
25
26#include <sys/time.h>
27#include <unistd.h>
28
29const size_t kMaxSize = 10 * 1000 * 1000;
30
31inline double now()
32{
33  struct timeval tv = { 0, 0 };
34  gettimeofday(&tv, nullptr);
35  return tv.tv_sec + tv.tv_usec / 1000000.0;
36}
37
38int input(int argc, char* argv[])
39{
40  int count = 0;
41  double t = now();
42  for (int i = 1; i < argc; ++i)
43  {
44    std::cout << "  processing input file " << argv[i] << std::endl;
45    std::map<string, int64_t> counts;
46    std::ifstream in(argv[i]);
47    while (in && !in.eof())
48    {
49      double tt = now();
50      counts.clear();
51      string word;
52      while (in >> word)
53      {
54        counts[word]++;
55        if (counts.size() > kMaxSize)
56        {
57          std::cout << "  split " << now() - tt << " sec" << std::endl;
58          break;
59        }
60      }
61
62      tt = now();
63      char buf[256];
64      snprintf(buf, sizeof buf, "segment-%05d", count);
65      std::ofstream out(buf);
66      ++count;
67      for (const auto& kv : counts)
68      {
69        out << kv.first << '\t' << kv.second << '\n';
70      }
71      std::cout << "  writing " << buf << " " << now() - tt << " sec" << std::endl;
72    }
73  }
74  std::cout << "reading done " << count << " segments " << now() - t << " sec" << std::endl;
75  return count;
76}
77
78// ======= combine =======
79
80class Segment  // copyable
81{
82 public:
83  string word;
84  int64_t count = 0;
85
86  explicit Segment(std::istream* in)
87    : in_(in)
88  {
89  }
90
91  bool next()
92  {
93    string line;
94    if (getline(*in_, line))
95    {
96      size_t tab = line.find('\t');
97      if (tab != string::npos)
98      {
99        word = line.substr(0, tab);
100        count = strtol(line.c_str() + tab, NULL, 10);
101        return true;
102      }
103    }
104    return false;
105  }
106
107  bool operator<(const Segment& rhs) const
108  {
109    return word > rhs.word;
110  }
111
112 private:
113  std::istream* in_;
114};
115
116void output(int i, const std::unordered_map<string, int64_t>& counts)
117{
118  double t = now();
119  std::vector<std::pair<int64_t, string>> freq;
120  for (const auto& entry : counts)
121  {
122    freq.push_back(make_pair(entry.second, entry.first));
123  }
124  std::sort(freq.begin(), freq.end());
125  std::cout << "  sorting " << now() - t << " sec" << std::endl;
126
127  t = now();
128  char buf[256];
129  snprintf(buf, sizeof buf, "count-%05d", i);
130  std::ofstream out(buf);
131  for (auto it = freq.rbegin(); it != freq.rend(); ++it)
132  {
133    out << it->first << '\t' << it->second << '\n';
134  }
135  std::cout << "  writing " << buf << " " << now() - t << " sec" << std::endl;
136}
137
138int combine(int count)
139{
140  std::vector<std::unique_ptr<std::ifstream>> inputs;
141  std::vector<Segment> keys;
142
143  double t = now();
144
145  for (int i = 0; i < count; ++i)
146  {
147    char buf[256];
148    snprintf(buf, sizeof buf, "segment-%05d", i);
149    inputs.emplace_back(new std::ifstream(buf));
150    Segment rec(inputs.back().get());
151    if (rec.next())
152    {
153      keys.push_back(rec);
154    }
155    ::unlink(buf);
156  }
157
158  // std::cout << keys.size() << '\n';
159  int m = 0;
160  string last;
161  std::unordered_map<string, int64_t> counts;
162  std::make_heap(keys.begin(), keys.end());
163
164  double tt = now();
165  while (!keys.empty())
166  {
167    std::pop_heap(keys.begin(), keys.end());
168
169    if (keys.back().word != last)
170    {
171      last = keys.back().word;
172      if (counts.size() > kMaxSize)
173      {
174        std::cout << "    split " << now() - tt << " sec" << std::endl;
175        output(m++, counts);
176        tt = now();
177        counts.clear();
178      }
179    }
180
181    counts[keys.back().word] += keys.back().count;
182
183    if (keys.back().next())
184    {
185      std::push_heap(keys.begin(), keys.end());
186    }
187    else
188    {
189      keys.pop_back();
190    }
191  }
192
193  if (!counts.empty())
194  {
195    std::cout << "    split " << now() - tt << " sec" << std::endl;
196    output(m++, counts);
197  }
198  std::cout << "combining done " << m << " count files " << now() - t << " sec" << std::endl;
199  return m;
200}
201
202// ======= merge  =======
203
204class Source  // copyable
205{
206 public:
207  explicit Source(std::istream* in)
208    : in_(in)
209  {
210  }
211
212  bool next()
213  {
214    string line;
215    if (getline(*in_, line))
216    {
217      size_t tab = line.find('\t');
218      if (tab != string::npos)
219      {
220        count_ = strtol(line.c_str(), NULL, 10);
221        if (count_ > 0)
222        {
223          word_ = line.substr(tab+1);
224          return true;
225        }
226      }
227    }
228    return false;
229  }
230
231  bool operator<(const Source& rhs) const
232  {
233    return count_ < rhs.count_;
234  }
235
236  void outputTo(std::ostream& out) const
237  {
238    out << count_ << '\t' << word_ << '\n';
239  }
240
241 private:
242  std::istream* in_;
243  int64_t count_ = 0;
244  string word_;
245};
246
247void merge(int m)
248{
249  std::vector<std::unique_ptr<std::ifstream>> inputs;
250  std::vector<Source> keys;
251
252  double t = now();
253  for (int i = 0; i < m; ++i)
254  {
255    char buf[256];
256    snprintf(buf, sizeof buf, "count-%05d", i);
257    inputs.emplace_back(new std::ifstream(buf));
258    Source rec(inputs.back().get());
259    if (rec.next())
260    {
261      keys.push_back(rec);
262    }
263    ::unlink(buf);
264  }
265
266  std::ofstream out("output");
267  std::make_heap(keys.begin(), keys.end());
268  while (!keys.empty())
269  {
270    std::pop_heap(keys.begin(), keys.end());
271    keys.back().outputTo(out);
272
273    if (keys.back().next())
274    {
275      std::push_heap(keys.begin(), keys.end());
276    }
277    else
278    {
279      keys.pop_back();
280    }
281  }
282  std::cout << "merging done " << now() - t << " sec\n";
283}
284
285int main(int argc, char* argv[])
286{
287  int count = input(argc, argv);
288  int m = combine(count);
289  merge(m);
290}
291