BlockingQueue_test.cc revision cd139dc7
1cd139dc7SShuo Chen#include "../BlockingQueue.h"
2cd139dc7SShuo Chen#include "../CountDownLatch.h"
3cd139dc7SShuo Chen#include "../Thread.h"
4cd139dc7SShuo Chen
5cd139dc7SShuo Chen#include <boost/bind.hpp>
6cd139dc7SShuo Chen#include <boost/ptr_container/ptr_vector.hpp>
7cd139dc7SShuo Chen#include <string>
8cd139dc7SShuo Chen#include <stdio.h>
9cd139dc7SShuo Chen
10cd139dc7SShuo Chenclass Test
11cd139dc7SShuo Chen{
12cd139dc7SShuo Chen public:
13cd139dc7SShuo Chen  Test(int numThreads)
14cd139dc7SShuo Chen    : latch_(numThreads),
15cd139dc7SShuo Chen      threads_(numThreads)
16cd139dc7SShuo Chen  {
17cd139dc7SShuo Chen    for (int i = 0; i < numThreads; ++i)
18cd139dc7SShuo Chen    {
19cd139dc7SShuo Chen      char name[32];
20cd139dc7SShuo Chen      snprintf(name, sizeof name, "work thread %d", i);
21cd139dc7SShuo Chen      threads_.push_back(new muduo::Thread(
22cd139dc7SShuo Chen            boost::bind(&Test::threadFunc, this), std::string(name)));
23cd139dc7SShuo Chen    }
24cd139dc7SShuo Chen    for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
25cd139dc7SShuo Chen  }
26cd139dc7SShuo Chen
27cd139dc7SShuo Chen  void run(int times)
28cd139dc7SShuo Chen  {
29cd139dc7SShuo Chen    printf("waiting for count down latch\n");
30cd139dc7SShuo Chen    latch_.wait();
31cd139dc7SShuo Chen    printf("all threads started\n");
32cd139dc7SShuo Chen    for (int i = 0; i < times; ++i)
33cd139dc7SShuo Chen    {
34cd139dc7SShuo Chen      char buf[32];
35cd139dc7SShuo Chen      snprintf(buf, sizeof buf, "hello %d", i);
36cd139dc7SShuo Chen      queue_.put(buf);
37cd139dc7SShuo Chen    }
38cd139dc7SShuo Chen  }
39cd139dc7SShuo Chen
40cd139dc7SShuo Chen  void joinAll()
41cd139dc7SShuo Chen  {
42cd139dc7SShuo Chen    for (size_t i = 0; i < threads_.size(); ++i)
43cd139dc7SShuo Chen    {
44cd139dc7SShuo Chen      queue_.put("stop");
45cd139dc7SShuo Chen    }
46cd139dc7SShuo Chen
47cd139dc7SShuo Chen    for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
48cd139dc7SShuo Chen  }
49cd139dc7SShuo Chen
50cd139dc7SShuo Chen private:
51cd139dc7SShuo Chen
52cd139dc7SShuo Chen  void threadFunc()
53cd139dc7SShuo Chen  {
54cd139dc7SShuo Chen    printf("tid=%d, %s started\n",
55cd139dc7SShuo Chen           muduo::CurrentThread::tid(),
56cd139dc7SShuo Chen           muduo::CurrentThread::name());
57cd139dc7SShuo Chen
58cd139dc7SShuo Chen    latch_.countDown();
59cd139dc7SShuo Chen    bool running = true;
60cd139dc7SShuo Chen    while (running)
61cd139dc7SShuo Chen    {
62cd139dc7SShuo Chen      std::string d(queue_.take());
63cd139dc7SShuo Chen      printf("tid=%d, data = %s\n", muduo::CurrentThread::tid(), d.c_str());
64cd139dc7SShuo Chen      running = (d != "stop");
65cd139dc7SShuo Chen    }
66cd139dc7SShuo Chen
67cd139dc7SShuo Chen    printf("tid=%d, %s stopped\n",
68cd139dc7SShuo Chen           muduo::CurrentThread::tid(),
69cd139dc7SShuo Chen           muduo::CurrentThread::name());
70cd139dc7SShuo Chen  }
71cd139dc7SShuo Chen
72cd139dc7SShuo Chen  muduo::BlockingQueue<std::string> queue_;
73cd139dc7SShuo Chen  muduo::CountDownLatch latch_;
74cd139dc7SShuo Chen  boost::ptr_vector<muduo::Thread> threads_;
75cd139dc7SShuo Chen};
76cd139dc7SShuo Chen
77cd139dc7SShuo Chenint main()
78cd139dc7SShuo Chen{
79cd139dc7SShuo Chen  printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());
80cd139dc7SShuo Chen  Test t(5);
81cd139dc7SShuo Chen  t.run(100);
82cd139dc7SShuo Chen  t.joinAll();
83cd139dc7SShuo Chen
84cd139dc7SShuo Chen  printf("number of created threads %d\n", muduo::Thread::numCreated());
85cd139dc7SShuo Chen}
86