1#include <boost/asio/io_service.hpp>
2#include <boost/asio/ip/tcp.hpp>
3#include <boost/ptr_container/ptr_vector.hpp>
4
5#include <fstream>
6
7#include <stdio.h>
8
9class Source
10{
11 public:
12  explicit Source(std::istream* in)
13    : in_(in),
14      count_(0),
15      word_()
16  {
17  }
18
19  bool next()
20  {
21    std::string line;
22    if (getline(*in_, line))
23    {
24      size_t tab = line.find('\t');
25      if (tab != std::string::npos)
26      {
27        count_ = atoll(line.c_str());
28        if (count_ > 0)
29        {
30          word_ = line.substr(tab+1);
31          return true;
32        }
33      }
34    }
35    return false;
36  }
37
38  bool operator<(const Source& rhs) const
39  {
40    return count_ < rhs.count_;
41  }
42
43  void output(std::ostream& out)
44  {
45    out << count_ << '\t' << word_ << '\n';
46  }
47
48 private:
49  std::istream* in_;
50  int64_t count_;
51  std::string word_;
52};
53
54boost::asio::ip::tcp::endpoint get_endpoint(const std::string& ipport)
55{
56  size_t colon = ipport.find(':');
57  if (colon != std::string::npos)
58  {
59    std::string ip = ipport.substr(0, colon);
60    uint16_t port = static_cast<uint16_t>(atoi(ipport.c_str() + colon + 1));
61    return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip), port);
62  }
63  else
64  {
65    throw std::invalid_argument("Invalid format of endpoint");
66  }
67}
68
69int main(int argc, char* argv[])
70{
71  if (argc >= 3)
72  {
73    boost::ptr_vector<boost::asio::ip::tcp::iostream> inputs;
74    std::vector<Source> keys;
75    const int64_t topK = atoll(argv[1]);
76
77    for (int i = 2; i < argc; ++i)
78    {
79      inputs.push_back(new boost::asio::ip::tcp::iostream(get_endpoint(argv[i])));
80      Source src(&inputs.back());
81      if (src.next())
82      {
83        keys.push_back(src);
84      }
85    }
86    printf("Connected to %zd sender(s)\n", keys.size());
87
88    std::ofstream out("output");
89    int64_t cnt = 0;
90    std::make_heap(keys.begin(), keys.end());
91    while (!keys.empty() && cnt < topK)
92    {
93      std::pop_heap(keys.begin(), keys.end());
94      keys.back().output(out);
95      ++cnt;
96
97      if (keys.back().next())
98      {
99        std::push_heap(keys.begin(), keys.end());
100      }
101      else
102      {
103        keys.pop_back();
104      }
105    }
106    printf("merging done\n");
107  }
108  else
109  {
110    printf("Usage: %s topK ip1:port1 [ip2:port2 ...]\n", argv[0]);
111  }
112}
113
114