#include #include #include #include #include #include using namespace muduo; using namespace muduo::net; typedef std::vector > WordCountList; WordCountList g_wordCounts; void read(const char* file) { std::ifstream in(file); std::string line; while (getline(in, line)) { size_t tab = line.find('\t'); if (tab != string::npos) { int64_t count = strtoll(line.c_str() + tab, NULL, 10); if (count > 0) { string word(line.begin(), line.begin()+tab); g_wordCounts.push_back(make_pair(count, word)); } } } std::sort(g_wordCounts.begin(), g_wordCounts.end(), std::greater()); } WordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf) { LogStream stream; while (first != g_wordCounts.end()) { buf->append(first->second); stream.resetBuffer(); stream << '\t' << first->first << "\r\n"; buf->append(stream.buffer().data(), stream.buffer().length()); ++first; if (buf->readableBytes() > 65536) { break; } } return first; } void send(const TcpConnectionPtr& conn, WordCountList::iterator first) { Buffer buf; WordCountList::iterator last = fillBuffer(first, &buf); conn->setContext(last); conn->send(&buf); } void onConnection(const TcpConnectionPtr& conn) { LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); if (conn->connected()) { send(conn, g_wordCounts.begin()); } } void onWriteComplete(const TcpConnectionPtr& conn) { WordCountList::iterator first = boost::any_cast(conn->getContext()); if (first != g_wordCounts.end()) { send(conn, first); } else { conn->shutdown(); LOG_INFO << "Sender - done"; } } void serve(int16_t port) { LOG_INFO << "Listen on port " << port; EventLoop loop; InetAddress listenAddr(port); TcpServer server(&loop, listenAddr, "Sender"); server.setConnectionCallback(onConnection); server.setWriteCompleteCallback(onWriteComplete); server.start(); loop.loop(); } int main(int argc, char* argv[]) { if (argc > 1) { read(argv[1]); int port = argc > 2 ? atoi(argv[2]) : 2013; serve(static_cast(port)); } else { fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]); } }