word_freq_shards.cc revision 144e8e4e
1144e8e4eSShuo Chen#include <boost/noncopyable.hpp>
2144e8e4eSShuo Chen#include <boost/ptr_container/ptr_vector.hpp>
3144e8e4eSShuo Chen
4144e8e4eSShuo Chen#include <fstream>
5144e8e4eSShuo Chen#include <iostream>
6144e8e4eSShuo Chen#include <unordered_map>
7144e8e4eSShuo Chen
8144e8e4eSShuo Chen#ifdef STD_STRING
9144e8e4eSShuo Chen#warning "STD STRING"
10144e8e4eSShuo Chen#include <string>
11144e8e4eSShuo Chenusing std::string;
12144e8e4eSShuo Chen#else
13144e8e4eSShuo Chen#include <ext/vstring.h>
14144e8e4eSShuo Chentypedef __gnu_cxx::__sso_string string;
15144e8e4eSShuo Chen#endif
16144e8e4eSShuo Chen
17144e8e4eSShuo Chenconst size_t kMaxSize = 10 * 1000 * 1000;
18144e8e4eSShuo Chen
19144e8e4eSShuo Chenclass Sharder : boost::noncopyable
20144e8e4eSShuo Chen{
21144e8e4eSShuo Chen public:
22144e8e4eSShuo Chen  explicit Sharder(int nbuckets)
23144e8e4eSShuo Chen    : buckets_(nbuckets)
24144e8e4eSShuo Chen  {
25144e8e4eSShuo Chen    for (int i = 0; i < nbuckets; ++i)
26144e8e4eSShuo Chen    {
27144e8e4eSShuo Chen      char buf[256];
28144e8e4eSShuo Chen      snprintf(buf, sizeof buf, "shard-%05d-of-%05d", i, nbuckets);
29144e8e4eSShuo Chen      buckets_.push_back(new std::ofstream(buf));
30144e8e4eSShuo Chen    }
31144e8e4eSShuo Chen    assert(buckets_.size() == static_cast<size_t>(nbuckets));
32144e8e4eSShuo Chen  }
33144e8e4eSShuo Chen
34144e8e4eSShuo Chen  void output(const string& query, int64_t count)
35144e8e4eSShuo Chen  {
36144e8e4eSShuo Chen    size_t idx = std::hash<string>()(query) % buckets_.size();
37144e8e4eSShuo Chen    buckets_[idx] << query << '\t' << count << '\n';
38144e8e4eSShuo Chen  }
39144e8e4eSShuo Chen
40144e8e4eSShuo Chen protected:
41144e8e4eSShuo Chen  boost::ptr_vector<std::ofstream> buckets_;
42144e8e4eSShuo Chen};
43144e8e4eSShuo Chen
44144e8e4eSShuo Chenvoid shard(int nbuckets, int argc, char* argv[])
45144e8e4eSShuo Chen{
46144e8e4eSShuo Chen  Sharder sharder(nbuckets);
47144e8e4eSShuo Chen  for (int i = 1; i < argc; ++i)
48144e8e4eSShuo Chen  {
49144e8e4eSShuo Chen    std::cout << "  processing input file " << argv[i] << std::endl;
50144e8e4eSShuo Chen    std::unordered_map<string, int64_t> queries;
51144e8e4eSShuo Chen    std::ifstream in(argv[i]);
52144e8e4eSShuo Chen    while (in && !in.eof())
53144e8e4eSShuo Chen    {
54144e8e4eSShuo Chen      queries.clear();
55144e8e4eSShuo Chen      string query;
56144e8e4eSShuo Chen      while (getline(in, query))
57144e8e4eSShuo Chen      {
58144e8e4eSShuo Chen        queries[query] += 1;
59144e8e4eSShuo Chen        if (queries.size() > kMaxSize)
60144e8e4eSShuo Chen        {
61144e8e4eSShuo Chen          std::cout << "    split" << std::endl;
62144e8e4eSShuo Chen          break;
63144e8e4eSShuo Chen        }
64144e8e4eSShuo Chen      }
65144e8e4eSShuo Chen
66144e8e4eSShuo Chen      for (auto kv : queries)
67144e8e4eSShuo Chen      {
68144e8e4eSShuo Chen        sharder.output(kv.first, kv.second);
69144e8e4eSShuo Chen      }
70144e8e4eSShuo Chen    }
71144e8e4eSShuo Chen  }
72144e8e4eSShuo Chen  std::cout << "shuffling done" << std::endl;
73144e8e4eSShuo Chen}
74144e8e4eSShuo Chen
75144e8e4eSShuo Chen// ======= combine =======
76144e8e4eSShuo Chen
77144e8e4eSShuo Chenstd::unordered_map<string, int64_t> read_shard(int idx, int nbuckets)
78144e8e4eSShuo Chen{
79144e8e4eSShuo Chen  std::unordered_map<string, int64_t> queries;
80144e8e4eSShuo Chen
81144e8e4eSShuo Chen  char buf[256];
82144e8e4eSShuo Chen  snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets);
83144e8e4eSShuo Chen  std::cout << "  reading " << buf << std::endl;
84144e8e4eSShuo Chen  {
85144e8e4eSShuo Chen    std::ifstream in(buf);
86144e8e4eSShuo Chen    string line;
87144e8e4eSShuo Chen
88144e8e4eSShuo Chen    while (getline(in, line))
89144e8e4eSShuo Chen    {
90144e8e4eSShuo Chen      size_t tab = line.find('\t');
91144e8e4eSShuo Chen      if (tab != string::npos)
92144e8e4eSShuo Chen      {
93144e8e4eSShuo Chen        int64_t count = strtol(line.c_str() + tab, NULL, 10);
94144e8e4eSShuo Chen        if (count > 0)
95144e8e4eSShuo Chen        {
96144e8e4eSShuo Chen          queries[line.substr(0, tab)] += count;
97144e8e4eSShuo Chen        }
98144e8e4eSShuo Chen      }
99144e8e4eSShuo Chen    }
100144e8e4eSShuo Chen  }
101144e8e4eSShuo Chen
102144e8e4eSShuo Chen  ::unlink(buf);
103144e8e4eSShuo Chen  return queries;
104144e8e4eSShuo Chen}
105144e8e4eSShuo Chen
106144e8e4eSShuo Chenvoid combine(const int nbuckets)
107144e8e4eSShuo Chen{
108144e8e4eSShuo Chen  for (int i = 0; i < nbuckets; ++i)
109144e8e4eSShuo Chen  {
110144e8e4eSShuo Chen    std::unordered_map<string, int64_t> queries(read_shard(i, nbuckets));
111144e8e4eSShuo Chen
112144e8e4eSShuo Chen    // std::cout << "  sorting " << std::endl;
113144e8e4eSShuo Chen    std::vector<std::pair<int64_t, string>> counts;
114144e8e4eSShuo Chen    for (const auto& entry : queries)
115144e8e4eSShuo Chen    {
116144e8e4eSShuo Chen      counts.push_back(make_pair(entry.second, entry.first));
117144e8e4eSShuo Chen    }
118144e8e4eSShuo Chen    std::sort(counts.begin(), counts.end());
119144e8e4eSShuo Chen
120144e8e4eSShuo Chen    char buf[256];
121144e8e4eSShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
122144e8e4eSShuo Chen    std::ofstream out(buf);
123144e8e4eSShuo Chen    std::cout << "  writing " << buf << std::endl;
124144e8e4eSShuo Chen    for (auto it = counts.rbegin(); it != counts.rend(); ++it)
125144e8e4eSShuo Chen    {
126144e8e4eSShuo Chen      out << it->first << '\t' << it->second << '\n';
127144e8e4eSShuo Chen    }
128144e8e4eSShuo Chen  }
129144e8e4eSShuo Chen
130144e8e4eSShuo Chen  std::cout << "reducing done" << std::endl;
131144e8e4eSShuo Chen}
132144e8e4eSShuo Chen
133144e8e4eSShuo Chen// ======= merge =======
134144e8e4eSShuo Chen
135144e8e4eSShuo Chenclass Source
136144e8e4eSShuo Chen{
137144e8e4eSShuo Chen public:
138144e8e4eSShuo Chen  explicit Source(std::ifstream* in)
139144e8e4eSShuo Chen    : in_(in),
140144e8e4eSShuo Chen      count_(0),
141144e8e4eSShuo Chen      query_()
142144e8e4eSShuo Chen  {
143144e8e4eSShuo Chen  }
144144e8e4eSShuo Chen
145144e8e4eSShuo Chen  bool next()
146144e8e4eSShuo Chen  {
147144e8e4eSShuo Chen    string line;
148144e8e4eSShuo Chen    if (getline(*in_, line))
149144e8e4eSShuo Chen    {
150144e8e4eSShuo Chen      size_t tab = line.find('\t');
151144e8e4eSShuo Chen      if (tab != string::npos)
152144e8e4eSShuo Chen      {
153144e8e4eSShuo Chen        count_ = strtol(line.c_str(), NULL, 10);
154144e8e4eSShuo Chen        if (count_ > 0)
155144e8e4eSShuo Chen        {
156144e8e4eSShuo Chen          query_ = line.substr(tab+1);
157144e8e4eSShuo Chen          return true;
158144e8e4eSShuo Chen        }
159144e8e4eSShuo Chen      }
160144e8e4eSShuo Chen    }
161144e8e4eSShuo Chen    return false;
162144e8e4eSShuo Chen  }
163144e8e4eSShuo Chen
164144e8e4eSShuo Chen  bool operator<(const Source& rhs) const
165144e8e4eSShuo Chen  {
166144e8e4eSShuo Chen    return count_ < rhs.count_;
167144e8e4eSShuo Chen  }
168144e8e4eSShuo Chen
169144e8e4eSShuo Chen  void output(std::ostream& out) const
170144e8e4eSShuo Chen  {
171144e8e4eSShuo Chen    out << count_ << '\t' << query_ << '\n';
172144e8e4eSShuo Chen  }
173144e8e4eSShuo Chen
174144e8e4eSShuo Chen private:
175144e8e4eSShuo Chen  std::ifstream* in_;
176144e8e4eSShuo Chen  int64_t count_;
177144e8e4eSShuo Chen  string query_;
178144e8e4eSShuo Chen};
179144e8e4eSShuo Chen
180144e8e4eSShuo Chenvoid merge(const int nbuckets)
181144e8e4eSShuo Chen{
182144e8e4eSShuo Chen  boost::ptr_vector<std::ifstream> inputs;
183144e8e4eSShuo Chen  std::vector<Source> keys;
184144e8e4eSShuo Chen
185144e8e4eSShuo Chen  for (int i = 0; i < nbuckets; ++i)
186144e8e4eSShuo Chen  {
187144e8e4eSShuo Chen    char buf[256];
188144e8e4eSShuo Chen    snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
189144e8e4eSShuo Chen    inputs.push_back(new std::ifstream(buf));
190144e8e4eSShuo Chen    Source rec(&inputs.back());
191144e8e4eSShuo Chen    if (rec.next())
192144e8e4eSShuo Chen    {
193144e8e4eSShuo Chen      keys.push_back(rec);
194144e8e4eSShuo Chen    }
195144e8e4eSShuo Chen    ::unlink(buf);
196144e8e4eSShuo Chen  }
197144e8e4eSShuo Chen
198144e8e4eSShuo Chen  std::ofstream out("output");
199144e8e4eSShuo Chen  std::make_heap(keys.begin(), keys.end());
200144e8e4eSShuo Chen  while (!keys.empty())
201144e8e4eSShuo Chen  {
202144e8e4eSShuo Chen    std::pop_heap(keys.begin(), keys.end());
203144e8e4eSShuo Chen    keys.back().output(out);
204144e8e4eSShuo Chen
205144e8e4eSShuo Chen    if (keys.back().next())
206144e8e4eSShuo Chen    {
207144e8e4eSShuo Chen      std::push_heap(keys.begin(), keys.end());
208144e8e4eSShuo Chen    }
209144e8e4eSShuo Chen    else
210144e8e4eSShuo Chen    {
211144e8e4eSShuo Chen      keys.pop_back();
212144e8e4eSShuo Chen    }
213144e8e4eSShuo Chen  }
214144e8e4eSShuo Chen  std::cout << "merging done\n";
215144e8e4eSShuo Chen}
216144e8e4eSShuo Chen
217144e8e4eSShuo Chenint main(int argc, char* argv[])
218144e8e4eSShuo Chen{
219144e8e4eSShuo Chen  int nbuckets = 10;
220144e8e4eSShuo Chen  shard(nbuckets, argc, argv);
221144e8e4eSShuo Chen  combine(nbuckets);
222144e8e4eSShuo Chen  merge(nbuckets);
223144e8e4eSShuo Chen}
224