merger.cc revision cc454125
169ab6119SShuo Chen#include <boost/asio/io_service.hpp> 269ab6119SShuo Chen#include <boost/asio/ip/tcp.hpp> 369ab6119SShuo Chen#include <boost/ptr_container/ptr_vector.hpp> 469ab6119SShuo Chen 569ab6119SShuo Chen#include <fstream> 669ab6119SShuo Chen 769ab6119SShuo Chen#include <stdio.h> 869ab6119SShuo Chen 969ab6119SShuo Chenclass Source 1069ab6119SShuo Chen{ 1169ab6119SShuo Chen public: 1269ab6119SShuo Chen explicit Source(std::istream* in) 1369ab6119SShuo Chen : in_(in), 1469ab6119SShuo Chen count_(0), 15cc454125SShuo Chen word_() 1669ab6119SShuo Chen { 1769ab6119SShuo Chen } 1869ab6119SShuo Chen 1969ab6119SShuo Chen bool next() 2069ab6119SShuo Chen { 2169ab6119SShuo Chen std::string line; 2269ab6119SShuo Chen if (getline(*in_, line)) 2369ab6119SShuo Chen { 2469ab6119SShuo Chen size_t tab = line.find('\t'); 2569ab6119SShuo Chen if (tab != std::string::npos) 2669ab6119SShuo Chen { 2769ab6119SShuo Chen count_ = atoll(line.c_str()); 2869ab6119SShuo Chen if (count_ > 0) 2969ab6119SShuo Chen { 30cc454125SShuo Chen word_ = line.substr(tab+1); 3169ab6119SShuo Chen return true; 3269ab6119SShuo Chen } 3369ab6119SShuo Chen } 3469ab6119SShuo Chen } 3569ab6119SShuo Chen return false; 3669ab6119SShuo Chen } 3769ab6119SShuo Chen 3869ab6119SShuo Chen bool operator<(const Source& rhs) const 3969ab6119SShuo Chen { 4069ab6119SShuo Chen return count_ < rhs.count_; 4169ab6119SShuo Chen } 4269ab6119SShuo Chen 4369ab6119SShuo Chen void output(std::ostream& out) 4469ab6119SShuo Chen { 45cc454125SShuo Chen out << count_ << '\t' << word_ << '\n'; 4669ab6119SShuo Chen } 4769ab6119SShuo Chen 4869ab6119SShuo Chen private: 4969ab6119SShuo Chen std::istream* in_; 5069ab6119SShuo Chen int64_t count_; 51cc454125SShuo Chen std::string word_; 5269ab6119SShuo Chen}; 5369ab6119SShuo Chen 5469ab6119SShuo Chenboost::asio::ip::tcp::endpoint get_endpoint(const std::string& ipport) 5569ab6119SShuo Chen{ 5669ab6119SShuo Chen size_t colon = ipport.find(':'); 5769ab6119SShuo Chen if (colon != std::string::npos) 5869ab6119SShuo Chen { 5969ab6119SShuo Chen std::string ip = ipport.substr(0, colon); 6069ab6119SShuo Chen uint16_t port = static_cast<uint16_t>(atoi(ipport.c_str() + colon + 1)); 6169ab6119SShuo Chen return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip), port); 6269ab6119SShuo Chen } 6369ab6119SShuo Chen else 6469ab6119SShuo Chen { 6569ab6119SShuo Chen throw std::invalid_argument("Invalid format of endpoint"); 6669ab6119SShuo Chen } 6769ab6119SShuo Chen} 6869ab6119SShuo Chen 6969ab6119SShuo Chenint main(int argc, char* argv[]) 7069ab6119SShuo Chen{ 7169ab6119SShuo Chen if (argc >= 3) 7269ab6119SShuo Chen { 7369ab6119SShuo Chen boost::ptr_vector<boost::asio::ip::tcp::iostream> inputs; 7469ab6119SShuo Chen std::vector<Source> keys; 7569ab6119SShuo Chen const int64_t topK = atoll(argv[1]); 7669ab6119SShuo Chen 7769ab6119SShuo Chen for (int i = 2; i < argc; ++i) 7869ab6119SShuo Chen { 7969ab6119SShuo Chen inputs.push_back(new boost::asio::ip::tcp::iostream(get_endpoint(argv[i]))); 8069ab6119SShuo Chen Source src(&inputs.back()); 8169ab6119SShuo Chen if (src.next()) 8269ab6119SShuo Chen { 8369ab6119SShuo Chen keys.push_back(src); 8469ab6119SShuo Chen } 8569ab6119SShuo Chen } 8669ab6119SShuo Chen printf("Connected to %zd sender(s)\n", keys.size()); 8769ab6119SShuo Chen 8869ab6119SShuo Chen std::ofstream out("output"); 8969ab6119SShuo Chen int64_t cnt = 0; 9069ab6119SShuo Chen std::make_heap(keys.begin(), keys.end()); 9169ab6119SShuo Chen while (!keys.empty() && cnt < topK) 9269ab6119SShuo Chen { 9369ab6119SShuo Chen std::pop_heap(keys.begin(), keys.end()); 9469ab6119SShuo Chen keys.back().output(out); 9569ab6119SShuo Chen ++cnt; 9669ab6119SShuo Chen 9769ab6119SShuo Chen if (keys.back().next()) 9869ab6119SShuo Chen { 9969ab6119SShuo Chen std::push_heap(keys.begin(), keys.end()); 10069ab6119SShuo Chen } 10169ab6119SShuo Chen else 10269ab6119SShuo Chen { 10369ab6119SShuo Chen keys.pop_back(); 10469ab6119SShuo Chen } 10569ab6119SShuo Chen } 10669ab6119SShuo Chen printf("merging done\n"); 10769ab6119SShuo Chen } 10869ab6119SShuo Chen else 10969ab6119SShuo Chen { 11069ab6119SShuo Chen printf("Usage: %s topK ip1:port1 [ip2:port2 ...]\n", argv[0]); 11169ab6119SShuo Chen } 11269ab6119SShuo Chen} 11369ab6119SShuo Chen 114