15f798dd9SShuo Chen#include <muduo/base/Logging.h>
25f798dd9SShuo Chen#include <muduo/net/EventLoop.h>
35f798dd9SShuo Chen#include <muduo/net/TcpServer.h>
45f798dd9SShuo Chen
55f798dd9SShuo Chen#include <fstream>
65f798dd9SShuo Chen#include <vector>
75f798dd9SShuo Chen
85f798dd9SShuo Chen#include <stdio.h>
969ab6119SShuo Chen#define __STDC_FORMAT_MACROS
1069ab6119SShuo Chen#include <inttypes.h>
115f798dd9SShuo Chen
125f798dd9SShuo Chenusing namespace muduo;
135f798dd9SShuo Chenusing namespace muduo::net;
145f798dd9SShuo Chen
155f798dd9SShuo Chentypedef std::vector<std::pair<int64_t, string> > WordCountList;
165f798dd9SShuo Chen
175f798dd9SShuo ChenWordCountList g_wordCounts;
185f798dd9SShuo Chen
195f798dd9SShuo Chenvoid read(const char* file)
205f798dd9SShuo Chen{
215f798dd9SShuo Chen  std::ifstream in(file);
225f798dd9SShuo Chen  std::string line;
235f798dd9SShuo Chen  while (getline(in, line))
245f798dd9SShuo Chen  {
255f798dd9SShuo Chen    size_t tab = line.find('\t');
265f798dd9SShuo Chen    if (tab != string::npos)
275f798dd9SShuo Chen    {
285f798dd9SShuo Chen      int64_t count = strtoll(line.c_str() + tab, NULL, 10);
295f798dd9SShuo Chen      if (count > 0)
305f798dd9SShuo Chen      {
315f798dd9SShuo Chen        string word(line.begin(), line.begin()+tab);
325f798dd9SShuo Chen        g_wordCounts.push_back(make_pair(count, word));
335f798dd9SShuo Chen      }
345f798dd9SShuo Chen    }
355f798dd9SShuo Chen  }
365f798dd9SShuo Chen  std::sort(g_wordCounts.begin(), g_wordCounts.end(),
375f798dd9SShuo Chen            std::greater<WordCountList::value_type>());
385f798dd9SShuo Chen}
395f798dd9SShuo Chen
405f798dd9SShuo ChenWordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf)
415f798dd9SShuo Chen{
425f798dd9SShuo Chen  while (first != g_wordCounts.end())
435f798dd9SShuo Chen  {
4469ab6119SShuo Chen    char count[32];
4569ab6119SShuo Chen    snprintf(count, sizeof count, "%" PRId64 "\t", first->first);
4669ab6119SShuo Chen    buf->append(count);
475f798dd9SShuo Chen    buf->append(first->second);
4869ab6119SShuo Chen    buf->append("\n", 1);
495f798dd9SShuo Chen    ++first;
505f798dd9SShuo Chen    if (buf->readableBytes() > 65536)
515f798dd9SShuo Chen    {
525f798dd9SShuo Chen      break;
535f798dd9SShuo Chen    }
545f798dd9SShuo Chen  }
555f798dd9SShuo Chen  return first;
565f798dd9SShuo Chen}
575f798dd9SShuo Chen
585f798dd9SShuo Chenvoid send(const TcpConnectionPtr& conn, WordCountList::iterator first)
595f798dd9SShuo Chen{
605f798dd9SShuo Chen  Buffer buf;
615f798dd9SShuo Chen  WordCountList::iterator last = fillBuffer(first, &buf);
625f798dd9SShuo Chen  conn->setContext(last);
635f798dd9SShuo Chen  conn->send(&buf);
645f798dd9SShuo Chen}
655f798dd9SShuo Chen
665f798dd9SShuo Chenvoid onConnection(const TcpConnectionPtr& conn)
675f798dd9SShuo Chen{
685f798dd9SShuo Chen  LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> "
695f798dd9SShuo Chen           << conn->localAddress().toIpPort() << " is "
705f798dd9SShuo Chen           << (conn->connected() ? "UP" : "DOWN");
715f798dd9SShuo Chen  if (conn->connected())
725f798dd9SShuo Chen  {
735f798dd9SShuo Chen    send(conn, g_wordCounts.begin());
745f798dd9SShuo Chen  }
755f798dd9SShuo Chen}
765f798dd9SShuo Chen
775f798dd9SShuo Chenvoid onWriteComplete(const TcpConnectionPtr& conn)
785f798dd9SShuo Chen{
795f798dd9SShuo Chen  WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext());
805f798dd9SShuo Chen  if (first != g_wordCounts.end())
815f798dd9SShuo Chen  {
825f798dd9SShuo Chen    send(conn, first);
835f798dd9SShuo Chen  }
845f798dd9SShuo Chen  else
855f798dd9SShuo Chen  {
865f798dd9SShuo Chen    conn->shutdown();
875f798dd9SShuo Chen    LOG_INFO << "Sender - done";
885f798dd9SShuo Chen  }
895f798dd9SShuo Chen}
905f798dd9SShuo Chen
9169ab6119SShuo Chenvoid serve(uint16_t port)
925f798dd9SShuo Chen{
935f798dd9SShuo Chen  LOG_INFO << "Listen on port " << port;
945f798dd9SShuo Chen  EventLoop loop;
955f798dd9SShuo Chen  InetAddress listenAddr(port);
965f798dd9SShuo Chen  TcpServer server(&loop, listenAddr, "Sender");
975f798dd9SShuo Chen  server.setConnectionCallback(onConnection);
985f798dd9SShuo Chen  server.setWriteCompleteCallback(onWriteComplete);
995f798dd9SShuo Chen  server.start();
1005f798dd9SShuo Chen  loop.loop();
1015f798dd9SShuo Chen}
1025f798dd9SShuo Chen
1035f798dd9SShuo Chenint main(int argc, char* argv[])
1045f798dd9SShuo Chen{
1055f798dd9SShuo Chen  if (argc > 1)
1065f798dd9SShuo Chen  {
1075f798dd9SShuo Chen    read(argv[1]);
1085f798dd9SShuo Chen    int port = argc > 2 ? atoi(argv[2]) : 2013;
10969ab6119SShuo Chen    serve(static_cast<uint16_t>(port));
1105f798dd9SShuo Chen  }
1115f798dd9SShuo Chen  else
1125f798dd9SShuo Chen  {
1135f798dd9SShuo Chen    fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]);
1145f798dd9SShuo Chen  }
1155f798dd9SShuo Chen}
116