sender.cc revision 5f798dd9
1#include <muduo/base/Logging.h> 2#include <muduo/net/EventLoop.h> 3#include <muduo/net/TcpServer.h> 4 5#include <fstream> 6#include <vector> 7 8#include <stdio.h> 9 10using namespace muduo; 11using namespace muduo::net; 12 13typedef std::vector<std::pair<int64_t, string> > WordCountList; 14 15WordCountList g_wordCounts; 16 17void read(const char* file) 18{ 19 std::ifstream in(file); 20 std::string line; 21 while (getline(in, line)) 22 { 23 size_t tab = line.find('\t'); 24 if (tab != string::npos) 25 { 26 int64_t count = strtoll(line.c_str() + tab, NULL, 10); 27 if (count > 0) 28 { 29 string word(line.begin(), line.begin()+tab); 30 g_wordCounts.push_back(make_pair(count, word)); 31 } 32 } 33 } 34 std::sort(g_wordCounts.begin(), g_wordCounts.end(), 35 std::greater<WordCountList::value_type>()); 36} 37 38WordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf) 39{ 40 LogStream stream; 41 while (first != g_wordCounts.end()) 42 { 43 buf->append(first->second); 44 stream.resetBuffer(); 45 stream << '\t' << first->first << "\r\n"; 46 buf->append(stream.buffer().data(), stream.buffer().length()); 47 ++first; 48 if (buf->readableBytes() > 65536) 49 { 50 break; 51 } 52 } 53 return first; 54} 55 56void send(const TcpConnectionPtr& conn, WordCountList::iterator first) 57{ 58 Buffer buf; 59 WordCountList::iterator last = fillBuffer(first, &buf); 60 conn->setContext(last); 61 conn->send(&buf); 62} 63 64void onConnection(const TcpConnectionPtr& conn) 65{ 66 LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> " 67 << conn->localAddress().toIpPort() << " is " 68 << (conn->connected() ? "UP" : "DOWN"); 69 if (conn->connected()) 70 { 71 send(conn, g_wordCounts.begin()); 72 } 73} 74 75void onWriteComplete(const TcpConnectionPtr& conn) 76{ 77 WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext()); 78 if (first != g_wordCounts.end()) 79 { 80 send(conn, first); 81 } 82 else 83 { 84 conn->shutdown(); 85 LOG_INFO << "Sender - done"; 86 } 87} 88 89void serve(int16_t port) 90{ 91 LOG_INFO << "Listen on port " << port; 92 EventLoop loop; 93 InetAddress listenAddr(port); 94 TcpServer server(&loop, listenAddr, "Sender"); 95 server.setConnectionCallback(onConnection); 96 server.setWriteCompleteCallback(onWriteComplete); 97 server.start(); 98 loop.loop(); 99} 100 101int main(int argc, char* argv[]) 102{ 103 if (argc > 1) 104 { 105 read(argv[1]); 106 int port = argc > 2 ? atoi(argv[2]) : 2013; 107 serve(static_cast<int16_t>(port)); 108 } 109 else 110 { 111 fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]); 112 } 113} 114