sender.cc revision 69ab6119
15f798dd9SShuo Chen#include <muduo/base/Logging.h> 25f798dd9SShuo Chen#include <muduo/net/EventLoop.h> 35f798dd9SShuo Chen#include <muduo/net/TcpServer.h> 45f798dd9SShuo Chen 55f798dd9SShuo Chen#include <fstream> 65f798dd9SShuo Chen#include <vector> 75f798dd9SShuo Chen 85f798dd9SShuo Chen#include <stdio.h> 969ab6119SShuo Chen#define __STDC_FORMAT_MACROS 1069ab6119SShuo Chen#include <inttypes.h> 115f798dd9SShuo Chen 125f798dd9SShuo Chenusing namespace muduo; 135f798dd9SShuo Chenusing namespace muduo::net; 145f798dd9SShuo Chen 155f798dd9SShuo Chentypedef std::vector<std::pair<int64_t, string> > WordCountList; 165f798dd9SShuo Chen 175f798dd9SShuo ChenWordCountList g_wordCounts; 185f798dd9SShuo Chen 195f798dd9SShuo Chenvoid read(const char* file) 205f798dd9SShuo Chen{ 215f798dd9SShuo Chen std::ifstream in(file); 225f798dd9SShuo Chen std::string line; 235f798dd9SShuo Chen while (getline(in, line)) 245f798dd9SShuo Chen { 255f798dd9SShuo Chen size_t tab = line.find('\t'); 265f798dd9SShuo Chen if (tab != string::npos) 275f798dd9SShuo Chen { 285f798dd9SShuo Chen int64_t count = strtoll(line.c_str() + tab, NULL, 10); 295f798dd9SShuo Chen if (count > 0) 305f798dd9SShuo Chen { 315f798dd9SShuo Chen string word(line.begin(), line.begin()+tab); 325f798dd9SShuo Chen g_wordCounts.push_back(make_pair(count, word)); 335f798dd9SShuo Chen } 345f798dd9SShuo Chen } 355f798dd9SShuo Chen } 365f798dd9SShuo Chen std::sort(g_wordCounts.begin(), g_wordCounts.end(), 375f798dd9SShuo Chen std::greater<WordCountList::value_type>()); 385f798dd9SShuo Chen} 395f798dd9SShuo Chen 405f798dd9SShuo ChenWordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf) 415f798dd9SShuo Chen{ 425f798dd9SShuo Chen while (first != g_wordCounts.end()) 435f798dd9SShuo Chen { 4469ab6119SShuo Chen char count[32]; 4569ab6119SShuo Chen snprintf(count, sizeof count, "%" PRId64 "\t", first->first); 4669ab6119SShuo Chen buf->append(count); 475f798dd9SShuo Chen buf->append(first->second); 4869ab6119SShuo Chen buf->append("\n", 1); 495f798dd9SShuo Chen ++first; 505f798dd9SShuo Chen if (buf->readableBytes() > 65536) 515f798dd9SShuo Chen { 525f798dd9SShuo Chen break; 535f798dd9SShuo Chen } 545f798dd9SShuo Chen } 555f798dd9SShuo Chen return first; 565f798dd9SShuo Chen} 575f798dd9SShuo Chen 585f798dd9SShuo Chenvoid send(const TcpConnectionPtr& conn, WordCountList::iterator first) 595f798dd9SShuo Chen{ 605f798dd9SShuo Chen Buffer buf; 615f798dd9SShuo Chen WordCountList::iterator last = fillBuffer(first, &buf); 625f798dd9SShuo Chen conn->setContext(last); 635f798dd9SShuo Chen conn->send(&buf); 645f798dd9SShuo Chen} 655f798dd9SShuo Chen 665f798dd9SShuo Chenvoid onConnection(const TcpConnectionPtr& conn) 675f798dd9SShuo Chen{ 685f798dd9SShuo Chen LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> " 695f798dd9SShuo Chen << conn->localAddress().toIpPort() << " is " 705f798dd9SShuo Chen << (conn->connected() ? "UP" : "DOWN"); 715f798dd9SShuo Chen if (conn->connected()) 725f798dd9SShuo Chen { 735f798dd9SShuo Chen send(conn, g_wordCounts.begin()); 745f798dd9SShuo Chen } 755f798dd9SShuo Chen} 765f798dd9SShuo Chen 775f798dd9SShuo Chenvoid onWriteComplete(const TcpConnectionPtr& conn) 785f798dd9SShuo Chen{ 795f798dd9SShuo Chen WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext()); 805f798dd9SShuo Chen if (first != g_wordCounts.end()) 815f798dd9SShuo Chen { 825f798dd9SShuo Chen send(conn, first); 835f798dd9SShuo Chen } 845f798dd9SShuo Chen else 855f798dd9SShuo Chen { 865f798dd9SShuo Chen conn->shutdown(); 875f798dd9SShuo Chen LOG_INFO << "Sender - done"; 885f798dd9SShuo Chen } 895f798dd9SShuo Chen} 905f798dd9SShuo Chen 9169ab6119SShuo Chenvoid serve(uint16_t port) 925f798dd9SShuo Chen{ 935f798dd9SShuo Chen LOG_INFO << "Listen on port " << port; 945f798dd9SShuo Chen EventLoop loop; 955f798dd9SShuo Chen InetAddress listenAddr(port); 965f798dd9SShuo Chen TcpServer server(&loop, listenAddr, "Sender"); 975f798dd9SShuo Chen server.setConnectionCallback(onConnection); 985f798dd9SShuo Chen server.setWriteCompleteCallback(onWriteComplete); 995f798dd9SShuo Chen server.start(); 1005f798dd9SShuo Chen loop.loop(); 1015f798dd9SShuo Chen} 1025f798dd9SShuo Chen 1035f798dd9SShuo Chenint main(int argc, char* argv[]) 1045f798dd9SShuo Chen{ 1055f798dd9SShuo Chen if (argc > 1) 1065f798dd9SShuo Chen { 1075f798dd9SShuo Chen read(argv[1]); 1085f798dd9SShuo Chen int port = argc > 2 ? atoi(argv[2]) : 2013; 10969ab6119SShuo Chen serve(static_cast<uint16_t>(port)); 1105f798dd9SShuo Chen } 1115f798dd9SShuo Chen else 1125f798dd9SShuo Chen { 1135f798dd9SShuo Chen fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]); 1145f798dd9SShuo Chen } 1155f798dd9SShuo Chen} 116