1#include <boost/asio/io_service.hpp> 2#include <boost/asio/ip/tcp.hpp> 3#include <boost/ptr_container/ptr_vector.hpp> 4 5#include <fstream> 6 7#include <stdio.h> 8 9class Source 10{ 11 public: 12 explicit Source(std::istream* in) 13 : in_(in), 14 count_(0), 15 word_() 16 { 17 } 18 19 bool next() 20 { 21 std::string line; 22 if (getline(*in_, line)) 23 { 24 size_t tab = line.find('\t'); 25 if (tab != std::string::npos) 26 { 27 count_ = atoll(line.c_str()); 28 if (count_ > 0) 29 { 30 word_ = line.substr(tab+1); 31 return true; 32 } 33 } 34 } 35 return false; 36 } 37 38 bool operator<(const Source& rhs) const 39 { 40 return count_ < rhs.count_; 41 } 42 43 void output(std::ostream& out) 44 { 45 out << count_ << '\t' << word_ << '\n'; 46 } 47 48 private: 49 std::istream* in_; 50 int64_t count_; 51 std::string word_; 52}; 53 54boost::asio::ip::tcp::endpoint get_endpoint(const std::string& ipport) 55{ 56 size_t colon = ipport.find(':'); 57 if (colon != std::string::npos) 58 { 59 std::string ip = ipport.substr(0, colon); 60 uint16_t port = static_cast<uint16_t>(atoi(ipport.c_str() + colon + 1)); 61 return boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip), port); 62 } 63 else 64 { 65 throw std::invalid_argument("Invalid format of endpoint"); 66 } 67} 68 69int main(int argc, char* argv[]) 70{ 71 if (argc >= 3) 72 { 73 boost::ptr_vector<boost::asio::ip::tcp::iostream> inputs; 74 std::vector<Source> keys; 75 const int64_t topK = atoll(argv[1]); 76 77 for (int i = 2; i < argc; ++i) 78 { 79 inputs.push_back(new boost::asio::ip::tcp::iostream(get_endpoint(argv[i]))); 80 Source src(&inputs.back()); 81 if (src.next()) 82 { 83 keys.push_back(src); 84 } 85 } 86 printf("Connected to %zd sender(s)\n", keys.size()); 87 88 std::ofstream out("output"); 89 int64_t cnt = 0; 90 std::make_heap(keys.begin(), keys.end()); 91 while (!keys.empty() && cnt < topK) 92 { 93 std::pop_heap(keys.begin(), keys.end()); 94 keys.back().output(out); 95 ++cnt; 96 97 if (keys.back().next()) 98 { 99 std::push_heap(keys.begin(), keys.end()); 100 } 101 else 102 { 103 keys.pop_back(); 104 } 105 } 106 printf("merging done\n"); 107 } 108 else 109 { 110 printf("Usage: %s topK ip1:port1 [ip2:port2 ...]\n", argv[0]); 111 } 112} 113 114