10ab2e892SShuo Chen/* sort word by frequency, sharding while counting version.
2df3173cbSShuo Chen
3df3173cbSShuo Chen  1. read input file, do counting, if counts > 10M keys, write counts to 10 shard files:
4df3173cbSShuo Chen       word \t count
5df3173cbSShuo Chen  2. assume each shard file fits in memory, read each shard file, accumulate counts, and write to 10 count files:
6df3173cbSShuo Chen       count \t word
7df3173cbSShuo Chen  3. merge 10 count files using heap.
8df3173cbSShuo Chen
9df3173cbSShuo ChenLimits: each shard must fit in memory.
10df3173cbSShuo Chen*/
11144e8e4eSShuo Chen#include <boost/noncopyable.hpp>
12144e8e4eSShuo Chen#include <boost/ptr_container/ptr_vector.hpp>
13144e8e4eSShuo Chen
14144e8e4eSShuo Chen#include <fstream>
15144e8e4eSShuo Chen#include <iostream>
16144e8e4eSShuo Chen#include <unordered_map>
17144e8e4eSShuo Chen
18144e8e4eSShuo Chen#ifdef STD_STRING
19144e8e4eSShuo Chen#warning "STD STRING"
20144e8e4eSShuo Chen#include <string>
21144e8e4eSShuo Chenusing std::string;
22144e8e4eSShuo Chen#else
23144e8e4eSShuo Chen#include <ext/vstring.h>
24144e8e4eSShuo Chentypedef __gnu_cxx::__sso_string string;
25144e8e4eSShuo Chen#endif
26144e8e4eSShuo Chen
27144e8e4eSShuo Chenconst size_t kMaxSize = 10 * 1000 * 1000;
28144e8e4eSShuo Chen
29144e8e4eSShuo Chenclass Sharder : boost::noncopyable
30144e8e4eSShuo Chen{
31144e8e4eSShuo Chen public:
32144e8e4eSShuo Chen  explicit Sharder(int nbuckets)
33144e8e4eSShuo Chen    : buckets_(nbuckets)
34144e8e4eSShuo Chen  {
35144e8e4eSShuo Chen    for (int i = 0; i < nbuckets; ++i)
36144e8e4eSShuo Chen    {
37144e8e4eSShuo Chen      char buf[256];
38144e8e4eSShuo Chen      snprintf(buf, sizeof buf, "shard-%05d-of-%05d", i, nbuckets);
39144e8e4eSShuo Chen      buckets_.push_back(new std::ofstream(buf));
40144e8e4eSShuo Chen    }
41144e8e4eSShuo Chen    assert(buckets_.size() == static_cast<size_t>(nbuckets));
42144e8e4eSShuo Chen  }
43144e8e4eSShuo Chen
44cc454125SShuo Chen  void output(const string& word, int64_t count)
45144e8e4eSShuo Chen  {
46cc454125SShuo Chen    size_t idx = std::hash<string>()(word) % buckets_.size();
47cc454125SShuo Chen    buckets_[idx] << word << '\t' << count << '\n';
48144e8e4eSShuo Chen  }
49144e8e4eSShuo Chen
50144e8e4eSShuo Chen protected:
51144e8e4eSShuo Chen  boost::ptr_vector<std::ofstream> buckets_;
52144e8e4eSShuo Chen};
53144e8e4eSShuo Chen
54144e8e4eSShuo Chenvoid shard(int nbuckets, int argc, char* argv[])
55144e8e4eSShuo Chen{
56144e8e4eSShuo Chen  Sharder sharder(nbuckets);
57144e8e4eSShuo Chen  for (int i = 1; i < argc; ++i)
58144e8e4eSShuo Chen  {
59144e8e4eSShuo Chen    std::cout << "  processing input file " << argv[i] << std::endl;
60cc454125SShuo Chen    std::unordered_map<string, int64_t> counts;
61144e8e4eSShuo Chen    std::ifstream in(argv[i]);
62144e8e4eSShuo Chen    while (in && !in.eof())
63144e8e4eSShuo Chen    {
64cc454125SShuo Chen      counts.clear();
65cc454125SShuo Chen      string word;
66cc454125SShuo Chen      while (in >> word)
67144e8e4eSShuo Chen      {
687e3924f8SShuo Chen        counts[word]++;
69cc454125SShuo Chen        if (counts.size() > kMaxSize)
70144e8e4eSShuo Chen        {
71144e8e4eSShuo Chen          std::cout << "    split" << std::endl;
72144e8e4eSShuo Chen          break;
73144e8e4eSShuo Chen        }
74144e8e4eSShuo Chen      }
75144e8e4eSShuo Chen
767e3924f8SShuo Chen      for (const auto& kv : counts)
77144e8e4eSShuo Chen      {
78144e8e4eSShuo Chen        sharder.output(kv.first, kv.second);
79144e8e4eSShuo Chen      }
80144e8e4eSShuo Chen    }
81144e8e4eSShuo Chen  }
82144e8e4eSShuo Chen  std::cout << "shuffling done" << std::endl;
83144e8e4eSShuo Chen}
84144e8e4eSShuo Chen
857e3924f8SShuo Chen// ======= sort_shards =======
86144e8e4eSShuo Chen
87144e8e4eSShuo Chenstd::unordered_map<string, int64_t> read_shard(int idx, int nbuckets)
88144e8e4eSShuo Chen{
89cc454125SShuo Chen  std::unordered_map<string, int64_t> counts;
90144e8e4eSShuo Chen
91144e8e4eSShuo Chen  char buf[256];
92144e8e4eSShuo Chen  snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets);
93144e8e4eSShuo Chen  std::cout << "  reading " << buf << std::endl;
94144e8e4eSShuo Chen  {
95144e8e4eSShuo Chen    std::ifstream in(buf);
96144e8e4eSShuo Chen    string line;
97144e8e4eSShuo Chen
98144e8e4eSShuo Chen    while (getline(in, line))
99144e8e4eSShuo Chen    {
100144e8e4eSShuo Chen      size_t tab = line.find('\t');
101144e8e4eSShuo Chen      if (tab != string::npos)
102144e8e4eSShuo Chen      {
103144e8e4eSShuo Chen        int64_t count = strtol(line.c_str() + tab, NULL, 10);
104144e8e4eSShuo Chen        if (count > 0)
105144e8e4eSShuo Chen        {
106cc454125SShuo Chen          counts[line.substr(0, tab)] += count;
107144e8e4eSShuo Chen        }
108144e8e4eSShuo Chen      }
109144e8e4eSShuo Chen    }
110144e8e4eSShuo Chen  }
111144e8e4eSShuo Chen
112144e8e4eSShuo Chen  ::unlink(buf);
113cc454125SShuo Chen  return counts;
114144e8e4eSShuo Chen}
115144e8e4eSShuo Chen
1167e3924f8SShuo Chenvoid sort_shards(const int nbuckets)
117144e8e4eSShuo Chen{
118144e8e4eSShuo Chen  for (int i = 0; i < nbuckets; ++i)
119144e8e4eSShuo Chen  {
120144e8e4eSShuo Chen    // std::cout << "  sorting " << std::endl;
121144e8e4eSShuo Chen    std::vector<std::pair<int64_t, string>> counts;
122bdeb7a78SShuo Chen    for (const auto& entry : read_shard(i, nbuckets))
123144e8e4eSShuo Chen    {
124144e8e4eSShuo Chen      counts.push_back(make_pair(entry.second, entry.first));
125144e8e4eSShuo Chen    }
126144e8e4eSShuo Chen    std::sort(counts.begin(), counts.end());
127144e8e4eSShuo Chen
128144e8e4eSShuo Chen    char buf[256];
129144e8e4eSShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
130144e8e4eSShuo Chen    std::ofstream out(buf);
131144e8e4eSShuo Chen    std::cout << "  writing " << buf << std::endl;
132144e8e4eSShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
133144e8e4eSShuo Chen    {
134144e8e4eSShuo Chen      out << it->first << '\t' << it->second << '\n';
135144e8e4eSShuo Chen    }
136144e8e4eSShuo Chen  }
137144e8e4eSShuo Chen
138144e8e4eSShuo Chen  std::cout << "reducing done" << std::endl;
139144e8e4eSShuo Chen}
140144e8e4eSShuo Chen
141144e8e4eSShuo Chen// ======= merge =======
142144e8e4eSShuo Chen
143cc454125SShuo Chenclass Source  // copyable
144144e8e4eSShuo Chen{
145144e8e4eSShuo Chen public:
1467e3924f8SShuo Chen  explicit Source(std::istream* in)
147144e8e4eSShuo Chen    : in_(in),
148144e8e4eSShuo Chen      count_(0),
149cc454125SShuo Chen      word_()
150144e8e4eSShuo Chen  {
151144e8e4eSShuo Chen  }
152144e8e4eSShuo Chen
153144e8e4eSShuo Chen  bool next()
154144e8e4eSShuo Chen  {
155144e8e4eSShuo Chen    string line;
156144e8e4eSShuo Chen    if (getline(*in_, line))
157144e8e4eSShuo Chen    {
158144e8e4eSShuo Chen      size_t tab = line.find('\t');
159144e8e4eSShuo Chen      if (tab != string::npos)
160144e8e4eSShuo Chen      {
161144e8e4eSShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
162144e8e4eSShuo Chen        if (count_ > 0)
163144e8e4eSShuo Chen        {
164cc454125SShuo Chen          word_ = line.substr(tab+1);
165144e8e4eSShuo Chen          return true;
166144e8e4eSShuo Chen        }
167144e8e4eSShuo Chen      }
168144e8e4eSShuo Chen    }
169144e8e4eSShuo Chen    return false;
170144e8e4eSShuo Chen  }
171144e8e4eSShuo Chen
172144e8e4eSShuo Chen  bool operator<(const Source& rhs) const
173144e8e4eSShuo Chen  {
174144e8e4eSShuo Chen    return count_ < rhs.count_;
175144e8e4eSShuo Chen  }
176144e8e4eSShuo Chen
177cc454125SShuo Chen  void outputTo(std::ostream& out) const
178144e8e4eSShuo Chen  {
179cc454125SShuo Chen    out << count_ << '\t' << word_ << '\n';
180144e8e4eSShuo Chen  }
181144e8e4eSShuo Chen
182144e8e4eSShuo Chen private:
1837e3924f8SShuo Chen  std::istream* in_;
184144e8e4eSShuo Chen  int64_t count_;
185cc454125SShuo Chen  string word_;
186144e8e4eSShuo Chen};
187144e8e4eSShuo Chen
188144e8e4eSShuo Chenvoid merge(const int nbuckets)
189144e8e4eSShuo Chen{
190144e8e4eSShuo Chen  boost::ptr_vector<std::ifstream> inputs;
191144e8e4eSShuo Chen  std::vector<Source> keys;
192144e8e4eSShuo Chen
193144e8e4eSShuo Chen  for (int i = 0; i < nbuckets; ++i)
194144e8e4eSShuo Chen  {
195144e8e4eSShuo Chen    char buf[256];
196144e8e4eSShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
197144e8e4eSShuo Chen    inputs.push_back(new std::ifstream(buf));
198144e8e4eSShuo Chen    Source rec(&inputs.back());
199144e8e4eSShuo Chen    if (rec.next())
200144e8e4eSShuo Chen    {
201144e8e4eSShuo Chen      keys.push_back(rec);
202144e8e4eSShuo Chen    }
203144e8e4eSShuo Chen    ::unlink(buf);
204144e8e4eSShuo Chen  }
205144e8e4eSShuo Chen
206144e8e4eSShuo Chen  std::ofstream out("output");
207144e8e4eSShuo Chen  std::make_heap(keys.begin(), keys.end());
208144e8e4eSShuo Chen  while (!keys.empty())
209144e8e4eSShuo Chen  {
210144e8e4eSShuo Chen    std::pop_heap(keys.begin(), keys.end());
211cc454125SShuo Chen    keys.back().outputTo(out);
212144e8e4eSShuo Chen
213144e8e4eSShuo Chen    if (keys.back().next())
214144e8e4eSShuo Chen    {
215144e8e4eSShuo Chen      std::push_heap(keys.begin(), keys.end());
216144e8e4eSShuo Chen    }
217144e8e4eSShuo Chen    else
218144e8e4eSShuo Chen    {
219144e8e4eSShuo Chen      keys.pop_back();
220144e8e4eSShuo Chen    }
221144e8e4eSShuo Chen  }
222144e8e4eSShuo Chen  std::cout << "merging done\n";
223144e8e4eSShuo Chen}
224144e8e4eSShuo Chen
225144e8e4eSShuo Chenint main(int argc, char* argv[])
226144e8e4eSShuo Chen{
227144e8e4eSShuo Chen  int nbuckets = 10;
228144e8e4eSShuo Chen  shard(nbuckets, argc, argv);
2297e3924f8SShuo Chen  sort_shards(nbuckets);
230144e8e4eSShuo Chen  merge(nbuckets);
231144e8e4eSShuo Chen}
232