BoundedBlockingQueue_test.cc revision ecd08fd9
19d9bda4cSShuo Chen#include "../BoundedBlockingQueue.h"
29d9bda4cSShuo Chen#include "../CountDownLatch.h"
39d9bda4cSShuo Chen#include "../Thread.h"
49d9bda4cSShuo Chen
59d9bda4cSShuo Chen#include <boost/bind.hpp>
69d9bda4cSShuo Chen#include <boost/ptr_container/ptr_vector.hpp>
79d9bda4cSShuo Chen#include <string>
89d9bda4cSShuo Chen#include <stdio.h>
99d9bda4cSShuo Chen
109d9bda4cSShuo Chenclass Test
119d9bda4cSShuo Chen{
129d9bda4cSShuo Chen public:
139d9bda4cSShuo Chen  Test(int numThreads)
149d9bda4cSShuo Chen    : queue_(20),
159d9bda4cSShuo Chen      latch_(numThreads),
169d9bda4cSShuo Chen      threads_(numThreads)
179d9bda4cSShuo Chen  {
189d9bda4cSShuo Chen    for (int i = 0; i < numThreads; ++i)
199d9bda4cSShuo Chen    {
209d9bda4cSShuo Chen      char name[32];
219d9bda4cSShuo Chen      snprintf(name, sizeof name, "work thread %d", i);
229d9bda4cSShuo Chen      threads_.push_back(new muduo::Thread(
239d9bda4cSShuo Chen            boost::bind(&Test::threadFunc, this), std::string(name)));
249d9bda4cSShuo Chen    }
259d9bda4cSShuo Chen    for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
269d9bda4cSShuo Chen  }
279d9bda4cSShuo Chen
289d9bda4cSShuo Chen  void run(int times)
299d9bda4cSShuo Chen  {
309d9bda4cSShuo Chen    printf("waiting for count down latch\n");
319d9bda4cSShuo Chen    latch_.wait();
329d9bda4cSShuo Chen    printf("all threads started\n");
339d9bda4cSShuo Chen    for (int i = 0; i < times; ++i)
349d9bda4cSShuo Chen    {
359d9bda4cSShuo Chen      char buf[32];
369d9bda4cSShuo Chen      snprintf(buf, sizeof buf, "hello %d", i);
379d9bda4cSShuo Chen      queue_.put(buf);
389d9bda4cSShuo Chen      printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size());
399d9bda4cSShuo Chen    }
409d9bda4cSShuo Chen  }
419d9bda4cSShuo Chen
429d9bda4cSShuo Chen  void joinAll()
439d9bda4cSShuo Chen  {
449d9bda4cSShuo Chen    for (size_t i = 0; i < threads_.size(); ++i)
459d9bda4cSShuo Chen    {
469d9bda4cSShuo Chen      queue_.put("stop");
479d9bda4cSShuo Chen    }
489d9bda4cSShuo Chen
499d9bda4cSShuo Chen    for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
509d9bda4cSShuo Chen  }
519d9bda4cSShuo Chen
529d9bda4cSShuo Chen private:
539d9bda4cSShuo Chen
549d9bda4cSShuo Chen  void threadFunc()
559d9bda4cSShuo Chen  {
569d9bda4cSShuo Chen    printf("tid=%d, %s started\n",
579d9bda4cSShuo Chen           muduo::CurrentThread::tid(),
589d9bda4cSShuo Chen           muduo::CurrentThread::name());
599d9bda4cSShuo Chen
609d9bda4cSShuo Chen    latch_.countDown();
619d9bda4cSShuo Chen    bool running = true;
629d9bda4cSShuo Chen    while (running)
639d9bda4cSShuo Chen    {
64ecd08fd9SShuo Chen      std::string d(queue_.take());
659d9bda4cSShuo Chen      printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size());
669d9bda4cSShuo Chen      running = (d != "stop");
679d9bda4cSShuo Chen    }
689d9bda4cSShuo Chen
699d9bda4cSShuo Chen    printf("tid=%d, %s stopped\n",
709d9bda4cSShuo Chen           muduo::CurrentThread::tid(),
719d9bda4cSShuo Chen           muduo::CurrentThread::name());
729d9bda4cSShuo Chen  }
739d9bda4cSShuo Chen
749d9bda4cSShuo Chen  muduo::BoundedBlockingQueue<std::string> queue_;
759d9bda4cSShuo Chen  muduo::CountDownLatch latch_;
769d9bda4cSShuo Chen  boost::ptr_vector<muduo::Thread> threads_;
779d9bda4cSShuo Chen};
789d9bda4cSShuo Chen
799d9bda4cSShuo Chenint main()
809d9bda4cSShuo Chen{
819d9bda4cSShuo Chen  printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());
829d9bda4cSShuo Chen  Test t(5);
839d9bda4cSShuo Chen  t.run(100);
849d9bda4cSShuo Chen  t.joinAll();
859d9bda4cSShuo Chen
869d9bda4cSShuo Chen  printf("number of created threads %d\n", muduo::Thread::numCreated());
879d9bda4cSShuo Chen}
88