1#include <muduo/base/Logging.h>
2#include <muduo/net/EventLoop.h>
3#include <muduo/net/TcpServer.h>
4
5#include <fstream>
6#include <vector>
7
8#include <stdio.h>
9#define __STDC_FORMAT_MACROS
10#include <inttypes.h>
11
12using namespace muduo;
13using namespace muduo::net;
14
15typedef std::vector<std::pair<int64_t, string> > WordCountList;
16
17WordCountList g_wordCounts;
18
19void read(const char* file)
20{
21  std::ifstream in(file);
22  std::string line;
23  while (getline(in, line))
24  {
25    size_t tab = line.find('\t');
26    if (tab != string::npos)
27    {
28      int64_t count = strtoll(line.c_str() + tab, NULL, 10);
29      if (count > 0)
30      {
31        string word(line.begin(), line.begin()+tab);
32        g_wordCounts.push_back(make_pair(count, word));
33      }
34    }
35  }
36  std::sort(g_wordCounts.begin(), g_wordCounts.end(),
37            std::greater<WordCountList::value_type>());
38}
39
40WordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf)
41{
42  while (first != g_wordCounts.end())
43  {
44    char count[32];
45    snprintf(count, sizeof count, "%" PRId64 "\t", first->first);
46    buf->append(count);
47    buf->append(first->second);
48    buf->append("\n", 1);
49    ++first;
50    if (buf->readableBytes() > 65536)
51    {
52      break;
53    }
54  }
55  return first;
56}
57
58void send(const TcpConnectionPtr& conn, WordCountList::iterator first)
59{
60  Buffer buf;
61  WordCountList::iterator last = fillBuffer(first, &buf);
62  conn->setContext(last);
63  conn->send(&buf);
64}
65
66void onConnection(const TcpConnectionPtr& conn)
67{
68  LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> "
69           << conn->localAddress().toIpPort() << " is "
70           << (conn->connected() ? "UP" : "DOWN");
71  if (conn->connected())
72  {
73    send(conn, g_wordCounts.begin());
74  }
75}
76
77void onWriteComplete(const TcpConnectionPtr& conn)
78{
79  WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext());
80  if (first != g_wordCounts.end())
81  {
82    send(conn, first);
83  }
84  else
85  {
86    conn->shutdown();
87    LOG_INFO << "Sender - done";
88  }
89}
90
91void serve(uint16_t port)
92{
93  LOG_INFO << "Listen on port " << port;
94  EventLoop loop;
95  InetAddress listenAddr(port);
96  TcpServer server(&loop, listenAddr, "Sender");
97  server.setConnectionCallback(onConnection);
98  server.setWriteCompleteCallback(onWriteComplete);
99  server.start();
100  loop.loop();
101}
102
103int main(int argc, char* argv[])
104{
105  if (argc > 1)
106  {
107    read(argv[1]);
108    int port = argc > 2 ? atoi(argv[2]) : 2013;
109    serve(static_cast<uint16_t>(port));
110  }
111  else
112  {
113    fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]);
114  }
115}
116