sender.cc revision 5f798dd9
15f798dd9SShuo Chen#include <muduo/base/Logging.h>
25f798dd9SShuo Chen#include <muduo/net/EventLoop.h>
35f798dd9SShuo Chen#include <muduo/net/TcpServer.h>
45f798dd9SShuo Chen
55f798dd9SShuo Chen#include <fstream>
65f798dd9SShuo Chen#include <vector>
75f798dd9SShuo Chen
85f798dd9SShuo Chen#include <stdio.h>
95f798dd9SShuo Chen
105f798dd9SShuo Chenusing namespace muduo;
115f798dd9SShuo Chenusing namespace muduo::net;
125f798dd9SShuo Chen
135f798dd9SShuo Chentypedef std::vector<std::pair<int64_t, string> > WordCountList;
145f798dd9SShuo Chen
155f798dd9SShuo ChenWordCountList g_wordCounts;
165f798dd9SShuo Chen
175f798dd9SShuo Chenvoid read(const char* file)
185f798dd9SShuo Chen{
195f798dd9SShuo Chen  std::ifstream in(file);
205f798dd9SShuo Chen  std::string line;
215f798dd9SShuo Chen  while (getline(in, line))
225f798dd9SShuo Chen  {
235f798dd9SShuo Chen    size_t tab = line.find('\t');
245f798dd9SShuo Chen    if (tab != string::npos)
255f798dd9SShuo Chen    {
265f798dd9SShuo Chen      int64_t count = strtoll(line.c_str() + tab, NULL, 10);
275f798dd9SShuo Chen      if (count > 0)
285f798dd9SShuo Chen      {
295f798dd9SShuo Chen        string word(line.begin(), line.begin()+tab);
305f798dd9SShuo Chen        g_wordCounts.push_back(make_pair(count, word));
315f798dd9SShuo Chen      }
325f798dd9SShuo Chen    }
335f798dd9SShuo Chen  }
345f798dd9SShuo Chen  std::sort(g_wordCounts.begin(), g_wordCounts.end(),
355f798dd9SShuo Chen            std::greater<WordCountList::value_type>());
365f798dd9SShuo Chen}
375f798dd9SShuo Chen
385f798dd9SShuo ChenWordCountList::iterator fillBuffer(WordCountList::iterator first, Buffer* buf)
395f798dd9SShuo Chen{
405f798dd9SShuo Chen  LogStream stream;
415f798dd9SShuo Chen  while (first != g_wordCounts.end())
425f798dd9SShuo Chen  {
435f798dd9SShuo Chen    buf->append(first->second);
445f798dd9SShuo Chen    stream.resetBuffer();
455f798dd9SShuo Chen    stream << '\t' << first->first << "\r\n";
465f798dd9SShuo Chen    buf->append(stream.buffer().data(), stream.buffer().length());
475f798dd9SShuo Chen    ++first;
485f798dd9SShuo Chen    if (buf->readableBytes() > 65536)
495f798dd9SShuo Chen    {
505f798dd9SShuo Chen      break;
515f798dd9SShuo Chen    }
525f798dd9SShuo Chen  }
535f798dd9SShuo Chen  return first;
545f798dd9SShuo Chen}
555f798dd9SShuo Chen
565f798dd9SShuo Chenvoid send(const TcpConnectionPtr& conn, WordCountList::iterator first)
575f798dd9SShuo Chen{
585f798dd9SShuo Chen  Buffer buf;
595f798dd9SShuo Chen  WordCountList::iterator last = fillBuffer(first, &buf);
605f798dd9SShuo Chen  conn->setContext(last);
615f798dd9SShuo Chen  conn->send(&buf);
625f798dd9SShuo Chen}
635f798dd9SShuo Chen
645f798dd9SShuo Chenvoid onConnection(const TcpConnectionPtr& conn)
655f798dd9SShuo Chen{
665f798dd9SShuo Chen  LOG_INFO << "Sender - " << conn->peerAddress().toIpPort() << " -> "
675f798dd9SShuo Chen           << conn->localAddress().toIpPort() << " is "
685f798dd9SShuo Chen           << (conn->connected() ? "UP" : "DOWN");
695f798dd9SShuo Chen  if (conn->connected())
705f798dd9SShuo Chen  {
715f798dd9SShuo Chen    send(conn, g_wordCounts.begin());
725f798dd9SShuo Chen  }
735f798dd9SShuo Chen}
745f798dd9SShuo Chen
755f798dd9SShuo Chenvoid onWriteComplete(const TcpConnectionPtr& conn)
765f798dd9SShuo Chen{
775f798dd9SShuo Chen  WordCountList::iterator first = boost::any_cast<WordCountList::iterator>(conn->getContext());
785f798dd9SShuo Chen  if (first != g_wordCounts.end())
795f798dd9SShuo Chen  {
805f798dd9SShuo Chen    send(conn, first);
815f798dd9SShuo Chen  }
825f798dd9SShuo Chen  else
835f798dd9SShuo Chen  {
845f798dd9SShuo Chen    conn->shutdown();
855f798dd9SShuo Chen    LOG_INFO << "Sender - done";
865f798dd9SShuo Chen  }
875f798dd9SShuo Chen}
885f798dd9SShuo Chen
895f798dd9SShuo Chenvoid serve(int16_t port)
905f798dd9SShuo Chen{
915f798dd9SShuo Chen  LOG_INFO << "Listen on port " << port;
925f798dd9SShuo Chen  EventLoop loop;
935f798dd9SShuo Chen  InetAddress listenAddr(port);
945f798dd9SShuo Chen  TcpServer server(&loop, listenAddr, "Sender");
955f798dd9SShuo Chen  server.setConnectionCallback(onConnection);
965f798dd9SShuo Chen  server.setWriteCompleteCallback(onWriteComplete);
975f798dd9SShuo Chen  server.start();
985f798dd9SShuo Chen  loop.loop();
995f798dd9SShuo Chen}
1005f798dd9SShuo Chen
1015f798dd9SShuo Chenint main(int argc, char* argv[])
1025f798dd9SShuo Chen{
1035f798dd9SShuo Chen  if (argc > 1)
1045f798dd9SShuo Chen  {
1055f798dd9SShuo Chen    read(argv[1]);
1065f798dd9SShuo Chen    int port = argc > 2 ? atoi(argv[2]) : 2013;
1075f798dd9SShuo Chen    serve(static_cast<int16_t>(port));
1085f798dd9SShuo Chen  }
1095f798dd9SShuo Chen  else
1105f798dd9SShuo Chen  {
1115f798dd9SShuo Chen    fprintf(stderr, "Usage: %s shard_file [port]\n", argv[0]);
1125f798dd9SShuo Chen  }
1135f798dd9SShuo Chen}
114