Libevent学习笔记(一):基本使用

一、前言

  其实,现在高性能网络服务器基本都是异步I/O模式构建的,而Libevent就是对select/poll/epoll这类异步模式接口的封装,通过设置回调函数的方式,在监听文件描述符和套接字读写事件的同时,还兼任定时器和信号接收的管理工作。所以这货对高性能服务器后台开发、跨平台开发、网络开发都具有很大的参考学习价值。官方主页显示很多的项目都用到了Libevent库,而且还可作为主机内部进程间通信和数据交互。这货也考虑到pthread线程模型的同步问题,保证关键数据结构在多线程并行下的数据安全,但是如果能够封装一个线程池模型就更爽了!
  更正:现代服务端的开发,线程池不一定是最合适的服务端模型,像Nginx的实现上,几个进程(当然也可以几个线程)在异步模式下就可以撑起很大的并发量了,协程也是近年来开发的热点,相对来说异步事件的支持下,线程的代价还是略显高了。
  深入了解的第一步就是先学会用它。其实Libevent的主要维护者Nick的博客有一本很好的教程libevent book,看完它后再加上Libevent本身附赠的HTTP和DNS服务器的例子sample(Libevent本身封装了evhttp和evdns),基本就可以耍起来啦。看过后会发现,如果对网络开发本身比较熟悉,Libevent还是比较容易理解和上手的。除此之外,Libevent还有一个比较特色的东西,就是封装产生了Bufferevent和evbuffer结构类型,而两者的关系呢,算是Bufferevent是基于evbuffer封装了I/O事件、I/O调度等内容,而evbuffer则是Bufferevent底层的数据承载。
  需要注意的是由于手册的作者就是维护者,所以手册的内容十分的新,有些手册内容在稳定发布版本2.0.22是没有的,代码切换到稳定分支可以使用git branch stable release-2.0.22-stable建立一个稳定分支。
libevent

二、服务端使用步骤

  这里通过手册描述的过程,对Libevent整个使用过程进行一个梳理。其实,实际使用很多步骤是不用考虑的,因为Libevent在设计上还算是比较智能——当你没有提供参数或者设置的时候,系统会自动给你一个最优的或者常用的配置,比如底层的异步模式。

2.1 配置系统,产生event_base对象

  event_base算是Libevent最基础、最重要的对象,因为修改配置、添加事件等,基本都需要将它作为参数传递进去。
  event算是Libevent最常用的元素,对于event在其生命周期有initialized、pending、active这几种状态,当通过event_new创建了事件并关联到event_base上之后,其状态是initialized;然后通过event_add之后,这个事件便是pending的状态,开始侦听了;然后当条件满足之后,其变为active状态,对应的callback函数被调用。
  这个对象通过event_base_new创建,在创建之前还可以设定某些参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct event_config *cfg;
struct event_base *base;

cfg = event_config_new();
event_config_avoid_method(cfg, "select"); //避免使用低效率select
event_config_require_features(cfg, EV_FEATURE_ET); //使用边沿触发类型
//event_config_set_flag(cfg, EVENT_BASE_FLAG_PRECISE_TIMER);
//event_base_new(void); 为简单版本,会根据系统选择最快最合适的类型
base = event_base_new_with_config(cfg);
event_config_free(cfg);
//显示当前使用的异步类型
st_d_print("Current Using Method: %s", event_base_get_method(base)); // epoll

//可选设置优先级数目,然后通过event_priority_set设置事件的优先级
//0为最高,n_priority-1为最低,此后创建的事件默认优先级为中间优先级
event_base_priority_init(base, 3);

2.2 针对服务器端和客户端类型的操作

  对于网络开发部分,Linux和Windows在网络方面的操作是有差异的,为此Libevent创建了evutil统一的接口来屏蔽两个平台底层的网络开发差异(后悔当时移植程序怎么没有参考这个有价值的东西)。
  由于服务端开发和客户端开发一个主动一个被动,这里分开进行示例。

2.2.1 服务器端操作

  服务端流程:创建套接字、设置套接字参数(nonblocking等)、绑定地址端口、侦听新连接。
  这么多操作,Libevent封装到了evconnlistener_new_bind中,并创建了连接事件的相应函数accept_conn_cb,同时还可以设置错误回调函数accept_error_cb。

1
2
3
4
5
6
7
8
9
10
11
struct evconnlistener *listener;
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0); //绑定0.0.0.0地址
sin.sin_port = htons(8080); /* Port 8080 */

listener = evconnlistener_new_bind(base, accept_conn_cb, NULL,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1/*backlog 无限制*/,
(struct sockaddr*)&sin, sizeof(sin));
evconnlistener_set_error_cb(listener, accept_error_cb);

  对于上面引用的两个回调函数,其实现的模板为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void accept_conn_cb(struct evconnlistener *listener,
evutil_socket_t fd, struct sockaddr *address, int socklen, void *ctx)
{
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

getnameinfo (address, socklen,
hbuf, sizeof(hbuf),sbuf, sizeof(sbuf),
NI_NUMERICHOST | NI_NUMERICSERV);
st_print("Welcome (host=%s, port=%s)\n", hbuf, sbuf);

//为新的客户连接socket fd创建Bufferevent事件侦听
struct event_base *base = evconnlistener_get_base(listener);
struct bufferevent *bev = bufferevent_socket_new(
base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_priority_set(bev, 2);

bufferevent_setcb(bev, bufferread_cb, NULL, bufferevent_cb, NULL);
bufferevent_enable(bev, EV_READ|EV_WRITE); //默认EV_WRITE是使能的,但EV_READ不是
}

static void accept_error_cb(struct evconnlistener *listener, void *ctx)
{
struct event_base *base = evconnlistener_get_base(listener);
int err = EVUTIL_SOCKET_ERROR();

st_d_print( "Got an error %d (%s) on the listener. "
"Shutting down.\n", err, evutil_socket_error_to_string(err));
event_base_loopexit(base, NULL);
}

  对于上面的accept_conn_cb函数中,为accept创建的新fd建立EV_READ|EV_WRITE事件侦听。但是上面的回调函数只设置了bufferread_cb和bufferevent_cb,而没有对写设置回调函数。其实这也是现实中常用的情况,程序大多数都阻塞在读的任务上,而一般的写任务也都是基于读到的结果产生对应的写内容,如果为写任务设置回调函数,那么系统检测到输出缓存区可用,便一直调用写回调函数,这可能不是你想要的。
  可以从accept_error_cb中学习Libevent常见的错误处理方式。在Linux中,所有的错误都是通过全局的errno来检测错误信息的,但是Windows使用WSAGetLastError()这种函数得到网络类的错误信息,所以需要使用封装后的EVUTIL_SOCKET_ERROR()和evutil_socket_error_to_string()来实现。
  对于bufferread_cb,就是通用的网络I/O操作,跟服务器端和客户端没有什么差异,这里贴出demo的代码来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void bufferread_cb(struct bufferevent *bev, void *ptr)
{
char *msg = "SERVER MESSAGE: WOSHINICOL 桃子大人";
char buf[1024]; int n;
struct evbuffer *input = bufferevent_get_input(bev);
struct evbuffer *output = bufferevent_get_output(bev);

while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0)
{
fwrite("BUFFERREAD_CB:", 1, strlen("BUFFERREAD_CB:"), stderr);
fwrite(buf, 1, n, stderr);
}

//bufferevent_write(bev, msg, strlen(msg));
evbuffer_add(output, msg, strlen(msg));
}

  由上面可以看见,对于evbuffer操作,既可以调用Bufferevent层的封装函数,也可以调用底层的evbuffer的函数接口,Bufferevent接口简单,但是evbuffer类接口比较的底层,但是函数功能很多。具体的细节后文再行描述。

2.2.2 客户端操作

  客户端的开发比较的简单,主要就是建立套接字,连接服务端,就可以进行I/O操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct bufferevent *bev;
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
inet_aton("192.168.1.161", &sin.sin_addr.s_addr);
sin.sin_port = htons(8080); /* Port 8080 */

//int sockfd = socket(AF_INET, SOCK_STREAM, 0);
//evutil_make_listen_socket_reuseable(sockfd);
//evutil_make_socket_nonblocking(sockfd);
//bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, bufferread_cb, bufferwrite_cb, bufferevent_cb, base);
bufferevent_enable(bev, EV_READ|EV_WRITE);
bufferevent_socket_connect(bev, (struct sockaddr *)&sin, sizeof(sin))

  如上面注释的代码,对客户端的操作,有两个方法:
  (1)首先创建套接字,然后设置套接字参数,在将这个套接字传递给bufferevent_socket_new函数创建Bufferevent事件,并调用bufferevent_socket_connect链接客户端。
  (2)直接调用bufferevent_socket_new并将套接字参数设置为-1,那么系统会自动创建套接字并完成相应的设置。

2.3 侦听信号事件

1
2
3
4
5
6
7
ct event *ev_signal;
//evsignal_new(base, signum, cb, arg)为简洁版本
ev_signal = event_new(base, SIGUSR1, EV_SIGNAL|EV_PERSIST, sigusr1_cb, base);
event_priority_set(ev_signal, 2);
event_add(ev_signal, NULL);

void sigusr1_cb(evutil_socket_t fd, short what, void *arg);

  对于信号事件,手册说明:对于一个进程,如果有多个event_base,那么请只使用一个event_base处理所有的信号事件,一个程序只有一个event_base能接收到信号事件。

2.4 建立定时器回调

1
2
3
4
5
6
7
8
9
10
struct event *ev_timer;
struct timeval one_sec = { 1, 0 }; //1s
int n_calls = 0;
//evtimer_new(base, callback, arg)为简洁版本
//EV_TIMEOUT的参数实际是可被忽略的,不传递也是可以的
ev_timer = event_new(base, -1, EV_PERSIST, timer_cb, &one_sec);
event_priority_set(ev_timer, 2);
event_add(ev_timer, &one_sec);

void timer_cb(evutil_socket_t fd, short what, void *arg);

  对于上面的signal和timer事件,其实都没有关联到某一个具体的socket或者fd,其实可以公用同一个callback,然后在处理的callback中,使用what参数来区分到底是由于信号、定时器哪个事件激活了这个回调函数。

2.5 进入事件循环

  就像是通常的epoll_wait在一个大的循环里,Libevent提供如下函数进行事件循环检测

1
2
3
event_base_loop(base, 0); //进入事件循环直到没有pending的事件就返回
//EVLOOP_ONCE  阻塞直到有event激活,执行回调函数后返回
//EVLOOP_NONBLOCK 非阻塞类型,立即检查event激活,如果有运行最高优先级的那一类,完毕后退出循环

  如果后面的flags参数为0,那么等价于调用event_base_dispatch。
  默认情况下,如果event_base有pending的事件,就不会结束循环,可以通过调用event_base_loopbreak、event_base_loopexit等函数来跳出终止循环。还需要注意的是,event_base_free只会调用event_del接触event和本身的关系,不会释放event相关的资源,所以如果优雅地写代码的话,需要调用event_free、evconnlistener_free等函数来善后。

三、Bufferevent和evbuffer

  如上面介绍的,如果上面的内容是让系统的各类事件和对应回调函数建立关联,助力于整个系统的设计和架构的话,Bufferevent和evbuffer则是关注于整个系统的数据承载,以完成实际的I/O通信。Bufferevent可以看作是基于evbuffer实现对EV_READ|EV_WRITE事件的侦听,而evbuffer是底层实际数据的承载。

3.1 Bufferevent

  Bufferevent支持的类型有:socket-based、asynchronous-IO、filtering、paired类型。socket-based算是最常见的类型,asynchronous-IO主要是Windows下的完成端口异步非阻塞通信类型,filtering和paired常常是针对特殊通信需求的情况。
  Bufferevent创建时候支持的重要标志有:BEV_OPT_CLOSE_ON_FREE当bufferevent被释放的时候,底层的传输也会被释放,比如关闭套接字、释放底层bufferevent等;BEV_OPT_THREADSAFE为Bufferevent创建锁结构,以保证线程安全的,当用户提供的回调函数被执行的时候,会持有这个锁结构;BEV_OPT_DEFER_CALLBACKS延迟执行,事件的回调函数会被排队,当常规event回调执行完之后,才会执行其回调函数。
  Bufferevent的操作在上面已经有示例了,这里将其数据接口整理出来

1
2
3
4
5
6
7
8
// 将data指向的数据添加到bufev的输出缓冲区尾部
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
// 将buf的整个数据移除移动到bufev输出缓冲区的尾部
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);

// 将bufev的输入缓冲区的数据移动到目标位置,注意bufferevent_read返回的是实际读取的数目
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);

  然后,当创建Bufferevent的时候如果使用了BEV_OPT_THREADSAFE参数的时候,初始化这个bufferevent会给这个结构本身以及input、output、underlying等结构产生锁,而且在读写callback以及defer延迟的callback时候,都会调用_bufferevent_incref_and_lock、_bufferevent_decref_and_unlock等来获得和释放锁。只有当显式指明BEV_OPT_UNLOCK_CALLBACKS的时候,才会在调用BEV_OPT_DEFER_CALLBACKS的时候不加锁结构。或许也这种情况下才是用户空间锁操作锁的时候,为了更小的锁粒度获得效率的提升?

1
2
void bufferevent_lock(struct bufferevent *bufev);
void bufferevent_unlock(struct bufferevent *bufev);

3.2 evbuffer

  evbuffer是一个被优化为前端删除,后端添加的bytes queue,用于方便高效的网络IO。其代码实现在buffer.c中,而头文件evbuffer-internel.h为其内部数据结构定义的地方,而用户调用接口的声明主要在event2/buffer.h中。结构上,每个evbuffer内部有多个evbuffer_chain结构构成的链表组成,所以数据在其内部不一定是物理连续的。

3.2.1 线程安全

1
2
3
int evbuffer_enable_locking(struct evbuffer *buf, void *lock);
void evbuffer_lock(struct evbuffer *buf);
void evbuffer_unlock(struct evbuffer *buf);

  多个线程访问evbuffer是不安全的,所以如果要在多个线程中访问,首先需要使用evbuffer_enable_locking来让evbuffer支持锁结构。通过查看文档和代码,对于evbuffer的底层函数(比如evbuffer_read、evbuffer_write),都是自动加了锁的,如果函数只调用了这些操作一次,那么不需要额外的加锁结构,如果在函数某个阶段有多次的evbuffer操作,那么需要使用上面的evbuffer_lock/evbuffer_unlock来加解锁保护。

3.2.2 evbuffer常用接口罗列

1
2
size_t evbuffer_get_length(const struct evbuffer *buf);
size_t evbuffer_get_contiguous_space(const struct evbuffer *buf);

  evbuffer_get_length返回evbuffer整体保存了的数据的字节数。
  evbuffer_get_contiguous_space返回第一个evbuffer_chain的offset位置,而offset=buffer+misalign+实际负载,实际就是开头空余空间+实际的负载字节数,也就是末尾空闲空间开始的位置。

1
2
3
4
5
int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);
int evbuffer_add_printf(struct evbuffer *buf, const char *fmt, ...);
int evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap);
int evbuffer_prepend(struct evbuffer *buf, const void *data, size_t size);
int evbuffer_prepend_buffer(struct evbuffer *dst, struct evbuffer* src);

  add类函数都是将数据添加到evbuffer结尾的操作,而prepend类函数是将数据添加到evbuffer开始的操作。

1
2
int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src);
int evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, size_t datlen);

  都是将evbuffer的数据从src移动到dst中,这里的函数都是优化过的,如果可能就只有evbuffer_chain结构的转移,不会有底层实际数据的拷贝。

1
unsigned char *evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size);

  重新排列evbuffer,使得前size个字节保证在同一个evbuffer_chain的连续位置,当size<0的时候,会对所有的数据进行重排,如果size等于0或者大于实际的datalen,不会进行任何操作,返回NULL;否则返回重排后实际数据的开始地址。

1
2
3
int evbuffer_drain(struct evbuffer *buf, size_t len);
int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);
ev_ssize_t evbuffer_copyout(struct evbuffer *buf, void *data, size_t datlen);

  evbuffer_remove会从开头将数据拷贝到data指向的位置,evbuffer_drain会从开头直接释放len长度的buf数据。两者当len的参数大于实际的数据长度的时候,会对所有的数据进行操作,返回实际拷贝/删除的字节数。
  evbuffer_copyout会将数据拷贝到data,但是不会drain删除evbuffer的数据。

1
2
3
4
5
6
enum evbuffer_eol_style {
EVBUFFER_EOL_ANY, //不建议使用
EVBUFFER_EOL_CRLF, //"\r\n"或者"\n"
EVBUFFER_EOL_CRLF_STRICT, //"\r\n"
EVBUFFER_EOL_LF //"\n" };
char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, enum evbuffer_eol_style eol_style);

  这是基于某些程序基于行处理而设计的,比如HTTP协议的头部信息等。其会从头提取并drain到一行内容,然后通过malloc分配的空间,拷贝出不带换行符的信息出来(换行符会在evbuffer中删除掉),并在结尾添加’\0’结束符,拷贝的数据长度会保存在n_read_out参数中。用完之后,用户记得free返回的内存空间。

1
2
3
4
5
6
7
8
9
10
11
struct evbuffer_ptr {
ev_ssize_t pos;
/* Do not alter the values of fields. */
struct {
void *chain;
size_t pos_in_chain;
} _internal;
};
struct evbuffer_ptr evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start);
struct evbuffer_ptr evbuffer_search_range(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start, const struct evbuffer_ptr *end);
struct evbuffer_ptr evbuffer_search_eol(struct evbuffer *buffer, struct evbuffer_ptr *start, size_t *eol_len_out, enum evbuffer_eol_style eol_style);

  提供在evbuffer中搜索特定长度len字符串what的功能。Libevent使用了struct evbuffer_ptr这么一个结构,将evbuffer内部离散的evbufferchain的buffer映射成pos这么一个连续的偏移空间。如果搜索到,那么pos为其位置,否则-1表示没有搜索到。

1
2
3
4
5
6
enum evbuffer_ptr_how {
EVBUFFER_PTR_SET,
EVBUFFER_PTR_ADD
};
int evbuffer_ptr_set(struct evbuffer *buffer, struct evbuffer_ptr *pos, size_t position, enum evbuffer_ptr_how how);
evbuffer_ptr_set(buf, &p, 0, EVBUFFER_PTR_SET);

  类似文件系统seek的方式来操作pos位置,由于evbuffer内部不一定是连续的位置,所以不能简单的修改pos的位置,只能通过这种方式,将pos的更改更新到内部结构的evbufferchain和偏移上去。返回0表示修改成功,否则-1。

1
2
3
4
5
6
7
8
9
struct evbuffer_iovec {
void *iov_base;
size_t iov_len;
};
int evbuffer_peek(struct evbuffer *buffer, ev_ssize_t len, struct evbuffer_ptr *start_at, struct evbuffer_iovec *vec_out, int n_vec);

n = evbuffer_peek(buf, 4096, NULL, NULL, 0);
v = malloc(sizeof(struct evbuffer_iovec)*n);
n = evbuffer_peek(buf, 4096, NULL, v, n);

  高速网络的一个关键就是避免数据的拷贝,为此,Libevent创建了一个evbuffer_iovec的结构,然后通过evbuffer_peek,可以将evbuffer内部的evbuffer_chain结构的数据位置暴露到evbuffer_iovec,用户可以直接访问读取evbuffer的内部数据了。需要注意的是这里只作读取,修改数据会导致不可预料的结果。
  evbuffer_peek会在要么指定的字节数都映射了,或者传递evbuffer_iovec使用完了就会返回。通常使用方式如上文,是先调用evbuffer_peek决定需要多少个evbuffer_iovec结构数目,然后再进行映射操作,保证需要的字节数目都能映射完成。

1
2
int evbuffer_reserve_space(struct evbuffer *buf, ev_ssize_t size, struct evbuffer_iovec *vec, int n_vecs);
int evbuffer_commit_space(struct evbuffer *buf, struct evbuffer_iovec *vec, int n_vecs);

  这是一个evbuffer高速写入的方式,因为先前的evbuffer_add实际也是先将数据准备好,然后再memcpy拷贝到evbuffer内部的,而这里先通过evbuffer_reserve_space在evbuffer的结尾先预留出需要写的数据空间,然后将空间的地址通过evbuffer_iovec返回,应用程序就可以直接操作这些地址,最后通过evbuffer_commit_space提交就可以了。这里n_vecs可用的只有1、2两个数字,通常推荐2,因为1很有可能会导致数据的重排,降低效率。
  evbuffer_commit_space成功返回0,失败返回-1。
使用这些函数的时候必须格外的小心,在调用evbuffer_reserve_space之后和evbuffer_commit_space之前,不能调用任何重排或者追加evbuffer的操作,那样会导致之前evbuffer_reserve_space返回的地址不一致了。在多线程中也要注意用锁保护相应的数据。

1
2
3
int evbuffer_write(struct evbuffer *buffer, evutil_socket_t fd);
int evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd, ev_ssize_t howmuch);
int evbuffer_read(struct evbuffer *buffer, evutil_socket_t fd, int howmuch);

  实际的I/O网络操作,注意如果evbuffer被关联到了bufferevent,那么网络I/O是自动触发的,用户不需要使用这些函数。他们会返回实际读取或者写入的字节数目,需要注意的是,如果fd是非阻塞的套接字/文件描述符,需要检查错误的类型来决定是因为I/O当前无法完成还是别的错误类型。

1
int evbuffer_add_file(struct evbuffer *output, int fd, ev_off_t offset, size_t length);

  直接将打开的文件描述符fd作为参数,然后将文件中的数据用于读取、网络发送等操作。其内部运用sendfile/mmap等机制,避免数据拷贝到用户空间再拷贝到内核空间,增加了操作的效率。

1
2
int evbuffer_freeze(struct evbuffer *buf, int at_front);
int evbuffer_unfreeze(struct evbuffer *buf, int at_front);

  会禁止对evbuffer的头部/尾部的修改操作,通常在内部使用。

参考