1#include "../BlockingQueue.h" 2#include "../CountDownLatch.h" 3#include "../Thread.h" 4 5#include <boost/bind.hpp> 6#include <boost/ptr_container/ptr_vector.hpp> 7#include <string> 8#include <stdio.h> 9 10class Test 11{ 12 public: 13 Test(int numThreads) 14 : latch_(numThreads), 15 threads_(numThreads) 16 { 17 for (int i = 0; i < numThreads; ++i) 18 { 19 char name[32]; 20 snprintf(name, sizeof name, "work thread %d", i); 21 threads_.push_back(new muduo::Thread( 22 boost::bind(&Test::threadFunc, this), std::string(name))); 23 } 24 for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1)); 25 } 26 27 void run(int times) 28 { 29 printf("waiting for count down latch\n"); 30 latch_.wait(); 31 printf("all threads started\n"); 32 for (int i = 0; i < times; ++i) 33 { 34 char buf[32]; 35 snprintf(buf, sizeof buf, "hello %d", i); 36 queue_.put(buf); 37 printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size()); 38 } 39 } 40 41 void joinAll() 42 { 43 for (size_t i = 0; i < threads_.size(); ++i) 44 { 45 queue_.put("stop"); 46 } 47 48 for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); 49 } 50 51 private: 52 53 void threadFunc() 54 { 55 printf("tid=%d, %s started\n", 56 muduo::CurrentThread::tid(), 57 muduo::CurrentThread::name()); 58 59 latch_.countDown(); 60 bool running = true; 61 while (running) 62 { 63 std::string d(queue_.take()); 64 printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size()); 65 running = (d != "stop"); 66 } 67 68 printf("tid=%d, %s stopped\n", 69 muduo::CurrentThread::tid(), 70 muduo::CurrentThread::name()); 71 } 72 73 muduo::BlockingQueue<std::string> queue_; 74 muduo::CountDownLatch latch_; 75 boost::ptr_vector<muduo::Thread> threads_; 76}; 77 78int main() 79{ 80 printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid()); 81 Test t(5); 82 t.run(100); 83 t.joinAll(); 84 85 printf("number of created threads %d\n", muduo::Thread::numCreated()); 86} 87