sender.cc revision 5f798dd9
15f798dd9SShuo Chen#include <muduo/base/Logging.h> 25f798dd9SShuo Chen#include <muduo/net/EventLoop.h> 35f798dd9SShuo Chen#include <muduo/net/TcpServer.h> 45f798dd9SShuo Chen 55f798dd9SShuo Chen#include <fstream> 65f798dd9SShuo Chen#include <vector> 75f798dd9SShuo Chen 85f798dd9SShuo Chen#include <stdio.h> 95f798dd9SShuo Chen 105f798dd9SShuo Chenusing namespace muduo; 115f798dd9SShuo Chenusing namespace muduo::net; 125f798dd9SShuo Chen 135f798dd9SShuo Chentypedef std::vector<std::pair<int64_t, string> > WordCountList; 145f798dd9SShuo Chen 155f798dd9SShuo ChenWordCountList g_wordCounts; 165f798dd9SShuo Chen 175f798dd9SShuo Chenvoid read(const char* file) 185f798dd9SShuo Chen{ 195f798dd9SShuo Chen std::ifstream in(file); 205f798dd9SShuo Chen std::string line; 215f798dd9SShuo Chen while (getline(in, line)) 225f798dd9SShuo Chen { 235f798dd9SShuo Chen size_t tab = line.find('\t'); 245f798dd9SShuo Chen if (tab != string::npos) 255f798dd9SShuo Chen { 265f798dd9SShuo Chen int64_t count = strtoll(line.c_str() + tab, NULL, 10); 275f798dd9SShuo Chen if (count > 0) 285f798dd9SShuo Chen { 295f798dd9SShuo Chen string word(line.begin(), line.begin()+tab); 305f798dd9SShuo Chen g_wordCounts.push_back(make_pair(count, word)); 315f798dd9SShuo Chen } 325f798dd9SShuo Chen } 335f798dd9SShuo Chen } 345f798dd9SShuo Chen std::sort(g_wordCounts.begin(), g_wordCounts.end(), 355f798dd9SShuo Chen std::greater<WordCountList::value_type>()); 365f798dd9SShuo Chen} 375f798dd9SShuo Chen 385f798dd9SShuo ChenWordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf) 395f798dd9SShuo Chen{ 405f798dd9SShuo Chen LogStream stream; 415f798dd9SShuo Chen while (first != g_wordCounts.end()) 425f798dd9SShuo Chen { 435f798dd9SShuo Chen buf->append(first->second); 445f798dd9SShuo Chen stream.resetBuffer(); 455f798dd9SShuo Chen stream << '\t' << first->first << "\r\n"; 465f798dd9SShuo Chen buf->append(stream.buffer().data(), stream.buffer().length()); 475f798dd9SShuo Chen ++first; 485f798dd9SShuo Chen if (buf->readableBytes() > 65536) 495f798dd9SShuo Chen { 505f798dd9SShuo Chen break; 515f798dd9SShuo Chen } 525f798dd9SShuo Chen } 535f798dd9SShuo Chen return first; 545f798dd9SShuo Chen} 555f798dd9SShuo Chen 565f798dd9SShuo Chenvoid send(const TcpConnectionPtr& conn, WordCountList::iterator first) 575f798dd9SShuo Chen{ 585f798dd9SShuo Chen Buffer buf; 595f798dd9SShuo Chen WordCountList::iterator last = fillBuffer(first, &buf); 605f798dd9SShuo Chen conn->setContext(last); 615f798dd9SShuo Chen conn->send(&buf); 625f798dd9SShuo Chen} 635f798dd9SShuo Chen 645f798dd9SShuo Chenvoid onConnection(const TcpConnectionPtr& conn) 655f798dd9SShuo Chen{ 665f798dd9SShuo Chen LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> " 675f798dd9SShuo Chen << conn->localAddress().toIpPort() << " is " 685f798dd9SShuo Chen << (conn->connected() ? "UP" : "DOWN"); 695f798dd9SShuo Chen if (conn->connected()) 705f798dd9SShuo Chen { 715f798dd9SShuo Chen send(conn, g_wordCounts.begin()); 725f798dd9SShuo Chen } 735f798dd9SShuo Chen} 745f798dd9SShuo Chen 755f798dd9SShuo Chenvoid onWriteComplete(const TcpConnectionPtr& conn) 765f798dd9SShuo Chen{ 775f798dd9SShuo Chen WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext()); 785f798dd9SShuo Chen if (first != g_wordCounts.end()) 795f798dd9SShuo Chen { 805f798dd9SShuo Chen send(conn, first); 815f798dd9SShuo Chen } 825f798dd9SShuo Chen else 835f798dd9SShuo Chen { 845f798dd9SShuo Chen conn->shutdown(); 855f798dd9SShuo Chen LOG_INFO << "Sender - done"; 865f798dd9SShuo Chen } 875f798dd9SShuo Chen} 885f798dd9SShuo Chen 895f798dd9SShuo Chenvoid serve(int16_t port) 905f798dd9SShuo Chen{ 915f798dd9SShuo Chen LOG_INFO << "Listen on port " << port; 925f798dd9SShuo Chen EventLoop loop; 935f798dd9SShuo Chen InetAddress listenAddr(port); 945f798dd9SShuo Chen TcpServer server(&loop, listenAddr, "Sender"); 955f798dd9SShuo Chen server.setConnectionCallback(onConnection); 965f798dd9SShuo Chen server.setWriteCompleteCallback(onWriteComplete); 975f798dd9SShuo Chen server.start(); 985f798dd9SShuo Chen loop.loop(); 995f798dd9SShuo Chen} 1005f798dd9SShuo Chen 1015f798dd9SShuo Chenint main(int argc, char* argv[]) 1025f798dd9SShuo Chen{ 1035f798dd9SShuo Chen if (argc > 1) 1045f798dd9SShuo Chen { 1055f798dd9SShuo Chen read(argv[1]); 1065f798dd9SShuo Chen int port = argc > 2 ? atoi(argv[2]) : 2013; 1075f798dd9SShuo Chen serve(static_cast<int16_t>(port)); 1085f798dd9SShuo Chen } 1095f798dd9SShuo Chen else 1105f798dd9SShuo Chen { 1115f798dd9SShuo Chen fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]); 1125f798dd9SShuo Chen } 1135f798dd9SShuo Chen} 114