Libevent学习笔记(二):Memcached中Libevent和线程池使用初探

  这些天想弄一下缓存,减少程序查询数据库的压力,而这方面的王者基本就是memcached和redis了。克隆了一份memcached的源码,发现是基于Libevent+线程池的实现方式,大致看了一下感觉很有启发。正好前两天看的Libevent手册,而且相比自己写的线程池模型,也很好奇企业级线程模型的实现方式,就顺着memcached初始化的流程了解梳理一下了。

memcached

1. main [memcached.c]

  memcached启动时候执行memcached.c中的main函数,在加载了好长的初始化配置之后,定义并初始化event_base;

1
2
static struct event_base *main_base;
main_base = event_init();

  然后通过调用memcached_thread_init,创建工作者线程

1
memcached_thread_init(settings.num_threads, main_base);

  创建定时器clock_handler(0, 0, 0);,这个基于Libevent创建的定时器每一秒钟执行一次,用以更新current_time这个表示自从进程启动后的时间长度。
  然后针对服务端参数指定的侦听(ip:port/unix socket)类型,分别调用server_socket_unix/server_sockets函数,绑定指定地址,并为创建的socket添加connect事件,核心代码如下

1
2
3
4
5
6
// unix socket
listen_conn = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1,
local_transport, main_base)))
// tcp
listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1,
transport, main_base))

  这个conn_new不仅仅在这里用以侦听套接字分配资源、创建事件侦听,之后所有客户端连接的套接字也会用这个函数。这个函数最终回调的响应函数是event_handler,然后最终调用一个碉堡了名字的函数drive_machine,这个函数内部是一个复杂的有限状态机,会处理所有与套接字相关的连接、关闭、读写等操作。
  listen套接字当接收到客户请求的时候,如果连接OK,并且没有超过最大连接数目,就调用dispatch_conn_new接收请求。这个函数中,会轮询选择要添加的工作线程,然后创建一个等待item,并添加到对应线程的new_conn_queue队列上去,然后向这个线程的读取队列里面写入’c’一个字节表明有一个新的请求,然后对应线程管道读事件就会被触发,执行处理回调函数。
  主线ain_base进入Libevent事件循环中

1
2
3
4
/* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}

2. memcached_thread_init [thread.c]

  上面我们关注的核心在于调用memcached_thread_init这个函数创建nthreads个工作者线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
} LIBEVENT_THREAD;


void memcached_thread_init(int nthreads, struct event_base *main_base) {
...
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();

for (i = 0; i < nthreads; i++) {
int fds[2];

threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];

setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}

/* Create threads after we've done all the libevent setup. */
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
...
}

  上面把非核心的代码剔除掉,就可以看清memcached_thread_init所做的具体工作了。
  (1) 为每个线程创建LIBEVENT_THREAD结构体,并把自我分发线程的信息记录在dispatcher_thread中;
  (2) 对每个线程的结构体LIBEVENT_THREAD初始化,然后通过pipe创建匿名管道,pipefd[0]指向读端,而pipefd[1]指向写端,然后每个线程就通过这个匿名管道同其他线程进行通信;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void setup_thread(LIBEVENT_THREAD *me) {
...
me->base = event_init();

/* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
event_add(&me->notify_event, 0);

me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->new_conn_queue);

me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*), NULL, NULL);
...
}

  其中的setup_thread函数中,为每一个线程创建一个event_base,然后添加之前管道的读写时间侦听;同时每个线程还创建了一个等待队列,所有的新请求会添加到这个等待队列上面去。
  在匿名管道的读写事件的相应函数thread_libevent_process上,会尝试读取一个字节,如果是上面写入的’c’,就表明有待处理的请求,然后就从等待队列new_conn_queue中取出一个item,然后处理。处理的方式就是确认这个连接,分配相应的资源,然后再丢到上面的那个event_handler->drive_machine的状态机中去!
  (3) 调用create_worker(worker_libevent, &threads[i]);实行真正创建线程操作,其内部就是一个pthread_create;

3.总结

  Memcached工作方式可以描述如下:
  软件启动的时候,创建event_base,并且根据设置类型创建侦听的tcp/udp socket或者unix socket,然后为这些套接字创建读侦听事件,加入到event_base上,等待客户端连接;
  创建工作线程池,每个工作线程创建自己的event_base;创建一个等待队列,新连接的客户请求都会挂在这个队列上;创建一个匿名管道,并为管道创建读写侦听事件;
  当新的客户端连接上来有请求时候,主线程的侦听事件回调函数会被激活,条件满足后接受这个连接,然后选取一个工作线程,创建等待item挂到其队列上,然后向其管道写入一个c,对应线程管道读事件被激活,读取一个c,并从队列中取出一个请求处理;
  Memcache对所有socket的处理都是event_handler->drive_machine中处理的。
  可以说,memcached在线程池在等待连接和事件处理中都充分利用了Libevent的异步事件,所以效率是非常之高的。自己的那个线程池,主要是将所有的任务都放到一个链表队列中,当线程发现没有任务的时候,就会用pthread_cond_wait阻塞睡眠,当主线程发现等待的任务太多,就会用pthread_cond_signal唤醒睡眠线程(不会惊群)。总体会让人感觉,把任务事先分给各个队列,吞吐量要大一些,让任务阻塞在select、poll、epoll上,会比自己控制睡眠唤醒要高效可靠!

本文完!