merger.cc revision cc454125
169ab6119SShuo Chen#include <boost/asio/io_service.hpp>
269ab6119SShuo Chen#include <boost/asio/ip/tcp.hpp>
369ab6119SShuo Chen#include <boost/ptr_container/ptr_vector.hpp>
469ab6119SShuo Chen
569ab6119SShuo Chen#include <fstream>
669ab6119SShuo Chen
769ab6119SShuo Chen#include <stdio.h>
869ab6119SShuo Chen
969ab6119SShuo Chenclass Source
1069ab6119SShuo Chen{
1169ab6119SShuo Chen public:
1269ab6119SShuo Chen  explicit Source(std::istream* in)
1369ab6119SShuo Chen    : in_(in),
1469ab6119SShuo Chen      count_(0),
15cc454125SShuo Chen      word_()
1669ab6119SShuo Chen  {
1769ab6119SShuo Chen  }
1869ab6119SShuo Chen
1969ab6119SShuo Chen  bool next()
2069ab6119SShuo Chen  {
2169ab6119SShuo Chen    std::string line;
2269ab6119SShuo Chen    if (getline(*in_, line))
2369ab6119SShuo Chen    {
2469ab6119SShuo Chen      size_t tab = line.find('\t');
2569ab6119SShuo Chen      if (tab != std::string::npos)
2669ab6119SShuo Chen      {
2769ab6119SShuo Chen        count_ = atoll(line.c_str());
2869ab6119SShuo Chen        if (count_ > 0)
2969ab6119SShuo Chen        {
30cc454125SShuo Chen          word_ = line.substr(tab+1);
3169ab6119SShuo Chen          return true;
3269ab6119SShuo Chen        }
3369ab6119SShuo Chen      }
3469ab6119SShuo Chen    }
3569ab6119SShuo Chen    return false;
3669ab6119SShuo Chen  }
3769ab6119SShuo Chen
3869ab6119SShuo Chen  bool operator<(const Source& rhs) const
3969ab6119SShuo Chen  {
4069ab6119SShuo Chen    return count_ < rhs.count_;
4169ab6119SShuo Chen  }
4269ab6119SShuo Chen
4369ab6119SShuo Chen  void output(std::ostream& out)
4469ab6119SShuo Chen  {
45cc454125SShuo Chen    out << count_ << '\t' << word_ << '\n';
4669ab6119SShuo Chen  }
4769ab6119SShuo Chen
4869ab6119SShuo Chen private:
4969ab6119SShuo Chen  std::istream* in_;
5069ab6119SShuo Chen  int64_t count_;
51cc454125SShuo Chen  std::string word_;
5269ab6119SShuo Chen};
5369ab6119SShuo Chen
5469ab6119SShuo Chenboost::asio::ip::tcp::endpoint get_endpoint(const std::string& ipport)
5569ab6119SShuo Chen{
5669ab6119SShuo Chen  size_t colon = ipport.find(':');
5769ab6119SShuo Chen  if (colon != std::string::npos)
5869ab6119SShuo Chen  {
5969ab6119SShuo Chen    std::string ip = ipport.substr(0, colon);
6069ab6119SShuo Chen    uint16_t port = static_cast<uint16_t>(atoi(ipport.c_str() + colon + 1));
6169ab6119SShuo Chen    return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip), port);
6269ab6119SShuo Chen  }
6369ab6119SShuo Chen  else
6469ab6119SShuo Chen  {
6569ab6119SShuo Chen    throw std::invalid_argument("Invalid format of endpoint");
6669ab6119SShuo Chen  }
6769ab6119SShuo Chen}
6869ab6119SShuo Chen
6969ab6119SShuo Chenint main(int argc, char* argv[])
7069ab6119SShuo Chen{
7169ab6119SShuo Chen  if (argc >= 3)
7269ab6119SShuo Chen  {
7369ab6119SShuo Chen    boost::ptr_vector<boost::asio::ip::tcp::iostream> inputs;
7469ab6119SShuo Chen    std::vector<Source> keys;
7569ab6119SShuo Chen    const int64_t topK = atoll(argv[1]);
7669ab6119SShuo Chen
7769ab6119SShuo Chen    for (int i = 2; i < argc; ++i)
7869ab6119SShuo Chen    {
7969ab6119SShuo Chen      inputs.push_back(new boost::asio::ip::tcp::iostream(get_endpoint(argv[i])));
8069ab6119SShuo Chen      Source src(&inputs.back());
8169ab6119SShuo Chen      if (src.next())
8269ab6119SShuo Chen      {
8369ab6119SShuo Chen        keys.push_back(src);
8469ab6119SShuo Chen      }
8569ab6119SShuo Chen    }
8669ab6119SShuo Chen    printf("Connected to %zd sender(s)\n", keys.size());
8769ab6119SShuo Chen
8869ab6119SShuo Chen    std::ofstream out("output");
8969ab6119SShuo Chen    int64_t cnt = 0;
9069ab6119SShuo Chen    std::make_heap(keys.begin(), keys.end());
9169ab6119SShuo Chen    while (!keys.empty() && cnt < topK)
9269ab6119SShuo Chen    {
9369ab6119SShuo Chen      std::pop_heap(keys.begin(), keys.end());
9469ab6119SShuo Chen      keys.back().output(out);
9569ab6119SShuo Chen      ++cnt;
9669ab6119SShuo Chen
9769ab6119SShuo Chen      if (keys.back().next())
9869ab6119SShuo Chen      {
9969ab6119SShuo Chen        std::push_heap(keys.begin(), keys.end());
10069ab6119SShuo Chen      }
10169ab6119SShuo Chen      else
10269ab6119SShuo Chen      {
10369ab6119SShuo Chen        keys.pop_back();
10469ab6119SShuo Chen      }
10569ab6119SShuo Chen    }
10669ab6119SShuo Chen    printf("merging done\n");
10769ab6119SShuo Chen  }
10869ab6119SShuo Chen  else
10969ab6119SShuo Chen  {
11069ab6119SShuo Chen    printf("Usage: %s topK ip1:port1 [ip2:port2 ...]\n", argv[0]);
11169ab6119SShuo Chen  }
11269ab6119SShuo Chen}
11369ab6119SShuo Chen
114