1#include <muduo/base/Logging.h> 2#include <muduo/net/EventLoop.h> 3#include <muduo/net/TcpServer.h> 4 5#include <fstream> 6#include <vector> 7 8#include <stdio.h> 9#define __STDC_FORMAT_MACROS 10#include <inttypes.h> 11 12using namespace muduo; 13using namespace muduo::net; 14 15typedef std::vector<std::pair<int64_t, string> > WordCountList; 16 17WordCountList g_wordCounts; 18 19void read(const char* file) 20{ 21 std::ifstream in(file); 22 std::string line; 23 while (getline(in, line)) 24 { 25 size_t tab = line.find('\t'); 26 if (tab != string::npos) 27 { 28 int64_t count = strtoll(line.c_str() + tab, NULL, 10); 29 if (count > 0) 30 { 31 string word(line.begin(), line.begin()+tab); 32 g_wordCounts.push_back(make_pair(count, word)); 33 } 34 } 35 } 36 std::sort(g_wordCounts.begin(), g_wordCounts.end(), 37 std::greater<WordCountList::value_type>()); 38} 39 40WordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf) 41{ 42 while (first != g_wordCounts.end()) 43 { 44 char count[32]; 45 snprintf(count, sizeof count, "%" PRId64 "\t", first->first); 46 buf->append(count); 47 buf->append(first->second); 48 buf->append("\n", 1); 49 ++first; 50 if (buf->readableBytes() > 65536) 51 { 52 break; 53 } 54 } 55 return first; 56} 57 58void send(const TcpConnectionPtr& conn, WordCountList::iterator first) 59{ 60 Buffer buf; 61 WordCountList::iterator last = fillBuffer(first, &buf); 62 conn->setContext(last); 63 conn->send(&buf); 64} 65 66void onConnection(const TcpConnectionPtr& conn) 67{ 68 LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> " 69 << conn->localAddress().toIpPort() << " is " 70 << (conn->connected() ? "UP" : "DOWN"); 71 if (conn->connected()) 72 { 73 send(conn, g_wordCounts.begin()); 74 } 75} 76 77void onWriteComplete(const TcpConnectionPtr& conn) 78{ 79 WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext()); 80 if (first != g_wordCounts.end()) 81 { 82 send(conn, first); 83 } 84 else 85 { 86 conn->shutdown(); 87 LOG_INFO << "Sender - done"; 88 } 89} 90 91void serve(uint16_t port) 92{ 93 LOG_INFO << "Listen on port " << port; 94 EventLoop loop; 95 InetAddress listenAddr(port); 96 TcpServer server(&loop, listenAddr, "Sender"); 97 server.setConnectionCallback(onConnection); 98 server.setWriteCompleteCallback(onWriteComplete); 99 server.start(); 100 loop.loop(); 101} 102 103int main(int argc, char* argv[]) 104{ 105 if (argc > 1) 106 { 107 read(argv[1]); 108 int port = argc > 2 ? atoi(argv[2]) : 2013; 109 serve(static_cast<uint16_t>(port)); 110 } 111 else 112 { 113 fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]); 114 } 115} 116