sender.cc revision 5f798dd9
1#include <muduo/base/Logging.h>
2#include <muduo/net/EventLoop.h>
3#include <muduo/net/TcpServer.h>
4
5#include <fstream>
6#include <vector>
7
8#include <stdio.h>
9
10using namespace muduo;
11using namespace muduo::net;
12
13typedef std::vector<std::pair<int64_t, string> > WordCountList;
14
15WordCountList g_wordCounts;
16
17void read(const char* file)
18{
19  std::ifstream in(file);
20  std::string line;
21  while (getline(in, line))
22  {
23    size_t tab = line.find('\t');
24    if (tab != string::npos)
25    {
26      int64_t count = strtoll(line.c_str() + tab, NULL, 10);
27      if (count > 0)
28      {
29        string word(line.begin(), line.begin()+tab);
30        g_wordCounts.push_back(make_pair(count, word));
31      }
32    }
33  }
34  std::sort(g_wordCounts.begin(), g_wordCounts.end(),
35            std::greater<WordCountList::value_type>());
36}
37
38WordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf)
39{
40  LogStream stream;
41  while (first != g_wordCounts.end())
42  {
43    buf->append(first->second);
44    stream.resetBuffer();
45    stream << '\t' << first->first << "\r\n";
46    buf->append(stream.buffer().data(), stream.buffer().length());
47    ++first;
48    if (buf->readableBytes() > 65536)
49    {
50      break;
51    }
52  }
53  return first;
54}
55
56void send(const TcpConnectionPtr& conn, WordCountList::iterator first)
57{
58  Buffer buf;
59  WordCountList::iterator last = fillBuffer(first, &buf);
60  conn->setContext(last);
61  conn->send(&buf);
62}
63
64void onConnection(const TcpConnectionPtr& conn)
65{
66  LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> "
67           << conn->localAddress().toIpPort() << " is "
68           << (conn->connected() ? "UP" : "DOWN");
69  if (conn->connected())
70  {
71    send(conn, g_wordCounts.begin());
72  }
73}
74
75void onWriteComplete(const TcpConnectionPtr& conn)
76{
77  WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext());
78  if (first != g_wordCounts.end())
79  {
80    send(conn, first);
81  }
82  else
83  {
84    conn->shutdown();
85    LOG_INFO << "Sender - done";
86  }
87}
88
89void serve(int16_t port)
90{
91  LOG_INFO << "Listen on port " << port;
92  EventLoop loop;
93  InetAddress listenAddr(port);
94  TcpServer server(&loop, listenAddr, "Sender");
95  server.setConnectionCallback(onConnection);
96  server.setWriteCompleteCallback(onWriteComplete);
97  server.start();
98  loop.loop();
99}
100
101int main(int argc, char* argv[])
102{
103  if (argc > 1)
104  {
105    read(argv[1]);
106    int port = argc > 2 ? atoi(argv[2]) : 2013;
107    serve(static_cast<int16_t>(port));
108  }
109  else
110  {
111    fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]);
112  }
113}
114