BlockingQueue_test.cc revision 9d9bda4c
1cd139dc7SShuo Chen#include "../BlockingQueue.h" 2cd139dc7SShuo Chen#include "../CountDownLatch.h" 3cd139dc7SShuo Chen#include "../Thread.h" 4cd139dc7SShuo Chen 5cd139dc7SShuo Chen#include <boost/bind.hpp> 6cd139dc7SShuo Chen#include <boost/ptr_container/ptr_vector.hpp> 7cd139dc7SShuo Chen#include <string> 8cd139dc7SShuo Chen#include <stdio.h> 9cd139dc7SShuo Chen 10cd139dc7SShuo Chenclass Test 11cd139dc7SShuo Chen{ 12cd139dc7SShuo Chen public: 13cd139dc7SShuo Chen Test(int numThreads) 14cd139dc7SShuo Chen : latch_(numThreads), 15cd139dc7SShuo Chen threads_(numThreads) 16cd139dc7SShuo Chen { 17cd139dc7SShuo Chen for (int i = 0; i < numThreads; ++i) 18cd139dc7SShuo Chen { 19cd139dc7SShuo Chen char name[32]; 20cd139dc7SShuo Chen snprintf(name, sizeof name, "work thread %d", i); 21cd139dc7SShuo Chen threads_.push_back(new muduo::Thread( 22cd139dc7SShuo Chen boost::bind(&Test::threadFunc, this), std::string(name))); 23cd139dc7SShuo Chen } 24cd139dc7SShuo Chen for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1)); 25cd139dc7SShuo Chen } 26cd139dc7SShuo Chen 27cd139dc7SShuo Chen void run(int times) 28cd139dc7SShuo Chen { 29cd139dc7SShuo Chen printf("waiting for count down latch\n"); 30cd139dc7SShuo Chen latch_.wait(); 31cd139dc7SShuo Chen printf("all threads started\n"); 32cd139dc7SShuo Chen for (int i = 0; i < times; ++i) 33cd139dc7SShuo Chen { 34cd139dc7SShuo Chen char buf[32]; 35cd139dc7SShuo Chen snprintf(buf, sizeof buf, "hello %d", i); 36cd139dc7SShuo Chen queue_.put(buf); 379d9bda4cSShuo Chen printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size()); 38cd139dc7SShuo Chen } 39cd139dc7SShuo Chen } 40cd139dc7SShuo Chen 41cd139dc7SShuo Chen void joinAll() 42cd139dc7SShuo Chen { 43cd139dc7SShuo Chen for (size_t i = 0; i < threads_.size(); ++i) 44cd139dc7SShuo Chen { 45cd139dc7SShuo Chen queue_.put("stop"); 46cd139dc7SShuo Chen } 47cd139dc7SShuo Chen 48cd139dc7SShuo Chen for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); 49cd139dc7SShuo Chen } 50cd139dc7SShuo Chen 51cd139dc7SShuo Chen private: 52cd139dc7SShuo Chen 53cd139dc7SShuo Chen void threadFunc() 54cd139dc7SShuo Chen { 55cd139dc7SShuo Chen printf("tid=%d, %s started\n", 56cd139dc7SShuo Chen muduo::CurrentThread::tid(), 57cd139dc7SShuo Chen muduo::CurrentThread::name()); 58cd139dc7SShuo Chen 59cd139dc7SShuo Chen latch_.countDown(); 60cd139dc7SShuo Chen bool running = true; 61cd139dc7SShuo Chen while (running) 62cd139dc7SShuo Chen { 63cd139dc7SShuo Chen std::string d(queue_.take()); 649d9bda4cSShuo Chen printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size()); 65cd139dc7SShuo Chen running = (d != "stop"); 66cd139dc7SShuo Chen } 67cd139dc7SShuo Chen 68cd139dc7SShuo Chen printf("tid=%d, %s stopped\n", 69cd139dc7SShuo Chen muduo::CurrentThread::tid(), 70cd139dc7SShuo Chen muduo::CurrentThread::name()); 71cd139dc7SShuo Chen } 72cd139dc7SShuo Chen 73cd139dc7SShuo Chen muduo::BlockingQueue<std::string> queue_; 74cd139dc7SShuo Chen muduo::CountDownLatch latch_; 75cd139dc7SShuo Chen boost::ptr_vector<muduo::Thread> threads_; 76cd139dc7SShuo Chen}; 77cd139dc7SShuo Chen 78cd139dc7SShuo Chenint main() 79cd139dc7SShuo Chen{ 80cd139dc7SShuo Chen printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid()); 81cd139dc7SShuo Chen Test t(5); 82cd139dc7SShuo Chen t.run(100); 83cd139dc7SShuo Chen t.joinAll(); 84cd139dc7SShuo Chen 85cd139dc7SShuo Chen printf("number of created threads %d\n", muduo::Thread::numCreated()); 86cd139dc7SShuo Chen} 87