1前言
一个后台实时处理的业务平台,通常我们会根据数据的输入与输出,依据时间轴进行分解成不同阶段或不同粒度的逻辑任务,而每一个待处理的数据我们称为任务或者消息。任务之间的关系可以分为两类:a 上下游父子关系,b 可以并行运行的兄弟关系。具有上下游关系的任务集合具有逻辑或数据依赖关系,即上游任务执行完后,才能执行下游任务;具有兄弟关系的任务间逻辑上互不影响,可以并行运行。
无论是上面任一情况的业务场景,我们都需要一种管理类,其职责:管理着一堆线程及其待执行的同类型任务集合。线程会等待去执行喂给它的任务,当任务集合大于线程集合的个数时,任务会在队列排队等待;而当线程集合个数大于任务集合时,线程会挂起处于阻塞等待状态,执行器也相应地处于不饱和状态。在jdk里面有现成的管理类ThreadPoolExecutor,那么在c++里面看看类似的实现吧:
2任务与任务池
2.1任务
无论是消息或业务数据,可以抽象地表达为:
struct data_pair
{
char *data;
int len;
}
2.2 任务池
任务的缓存用队列表达:
std::queue
2.3 任务提交入口
int CQueueThread::writeData(void *data, int len)
{
if (data == NULL || len <= 0) {
return EXIT_FAILURE;
}
data_pair *item = new data_pair();
item->data = (char*) malloc(len);
assert(item->data != NULL);
memcpy(item->data, data, len);
item->len = len;
_mutex.lock();
_queue.push(item);
_mutex.signal();
_mutex.unlock();
return EXIT_SUCCESS;
}
3线程池
3.1 线程封装
c++里面类似jdk里面Thread类的封装CThread
{
class CThread {
public:
/**
* 构造函数
*/
CThread() {
tid = 0;
pid = 0;
}
/**
* 起一个线程,开始运行
*/
bool start(Runnable *r, void *a) {
runnable = r;
args = a;
return 0 == pthread_create(&tid, NULL, CThread::hook, this);
}
/**
* 等待线程退出
*/
void join() {
if (tid) {
pthread_join(tid, NULL);
tid = 0;
pid = 0;
}
}
/**
* 得到Runnable对象
*
* @return Runnable
*/
Runnable *getRunnable() {
return runnable;
}
/**
* 得到回调参数
*
* @return args
*/
void *getArgs() {
return args;
}
/***
* 得到线程的进程ID
*/
int getpid() {
return pid;
}
/**
* 线程的回调函数
*
*/
static void *hook(void *arg) {
CThread *thread = (CThread*) arg;
thread->pid = gettid();
if (thread->getRunnable()) {
thread->getRunnable()->run(thread, thread->getArgs());
}
return (void*) NULL;
}
private:
/**
* 得到tid号
*/
#ifdef _syscall0
static _syscall0(pid_t,gettid)
#else
static pid_t gettid() { return static_cast<pid_t>(syscall(__NR_gettid));}
#endif
private:
pthread_t tid; // pthread_self() id
int pid; // 线程的进程ID
Runnable *runnable;
void *args;
};
}
View Code
3.2 线程池
并行处理的能力有线程池的个数决定,定义如下:
CThread *_thread;
int _threadCount; |
4执行器
4.1 执行启动
int CDefaultRunnable::start() {
if (_thread != NULL || _threadCount < 1) {
TBSYS_LOG(ERROR, "start failure, _thread: %p, threadCount: %d", _thread, _threadCount);
return 0;
}
_thread = new CThread[_threadCount];
if (NULL == _thread)
{
TBSYS_LOG(ERROR, "create _thread object failed, threadCount: %d", _threadCount);
return 0;
}
int i = 0;
for (; i<_threadCount; i++)
{
if (!_thread[i].start(this, (void*)((long)i)))
{
return i;
}
}
return i;
}
4.2 执行
执行器包含了具体业务的执行:
void CQueueThread::run(CThread *thread, void *args)
{
int threadIndex = (int)((long)(args));
_mutex.lock();
while(!_stop) {
while(_stop == 0 && _queue.empty()) {
_mutex.wait();
}
if (_stop) {
break;
}
data_pair *item = _queue.front();
_queue.pop();
_mutex.unlock();
if (item != NULL) {
if (_handler) {
_handler->handleQueue(item->data, item->len, threadIndex, _args);
}
if (item->data) {
free(item->data);
}
free(item);
}
_mutex.lock();
}
_mutex.unlock();
5 样例代码
CMyHandler handler;
CQueueThread queueThread(3, &handler, NULL);
queueThread.start();
char data[1024];
for(int i=1; i<=mWriteCount; i++) {
int len = sprintf(data, "data_%05d", i);
queueThread.writeData(data, len+1);
}
queueThread.wait();
参考:
http://code.taobao.org/p/tfs/src/
原文链接: https://www.cnblogs.com/gisorange/p/4891163.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/223303
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!