Boost-Asio网络开发基础知识(三):Strand序列化执行用户回调

请参看Boost.Asio的官方文档,上面清楚的描述道:
  strand可以保证严格序列化地(而不会并行地)执行event handlers,通过使用strand可以保证安全的在多线程环境下执行程序,而不需要使用者显式的使用类似mutex等方式来进行保护和同步操作。strand的可以以隐式(implicit)或者显式(explicit)的方式存在:
(1). 如果只在一个线程中调用io_service::run(),那么所有的event handlers都是在一个隐式strand中序列化执行的,因为所有的回调都在这个线程的run()中执行的;
(2). 对于一个连接如果只有一个单链异步操作的时候,其不可能并发的执行event handlers,这也是隐式strand的。(文中提到了比如HTTP的单双工协议,我的理解是HTTP只会进行客户端请求、服务端返回数据这种形式,没有并行执行event handler的可能性);
(3). 可以显式实例化一个strand对象,然后所有的event handlers必须使用io_service::strand::wrap()进行包裹,或者使用该strand对象显式调用post()/dispatch()发布异步操作请求;
(4). 对于composed asynchronous operations的操作,比如async_read()或者async_read_until(),其中间会多次调用async_read_some(),而且会保存部分读取的中间数据,所以为了保证对象的安全,其所有的intermediate handlers都必须在同一个strand中被串行化执行。虽然看似麻烦,其实使用中只要使用io_service::strand::wrap()这个函数包裹需要的handler就可以了。
(5). 序列化是以strand对象为单位的,系统保证同一个strand对象的各个handler会被串行化执行,但是某个strand对象包裹的handler和其他strand对象包裹的handler,以及和其他未被包裹的handler还是并行执行的。

基本上strand对象创建之后,就从另外一个层次替换了io_service的行为,因为strand提供了和ioservice极其类似的成员函数:
(1). dispatch(CompletionHandler handler); 请求strand去执行handler,strand对象保证通过该strand添加的handlers不会被并行的执行,如果这个函数是在相同strand对象对应的handler中被调用的,那么该handler将会在函数中被立即执行(s.running_in_this_thread() == true);
(2). post(CompletionHandler handler); 请求strand去执行handler并立即返回;
(3). get_io_service()
(4). running_in_this_thread(); 如果当前线程正在执行由该strand提交的handler,则返回true;
(5). wrap(Handler handler); 创建一个函数对象,当被invoke的时候会自动传递到strand的dispatch函数;

strand串行化执行的顺序问题:
在strand的文档中,对提交进取的多个handler需要串行化执行,执行的顺序有部分的保证。在以下情况下会保证asio_handler_invoke(a1, &a1)会先于执行asio_handler_invoke(b1, &b1):
(1). s.post(a)先于执行s.post(b);
(2). s.post(a)先于执行s.dispatch(b),同时后者在strand外执行的;
(3). s.dispatch(a)先于执行s.post(b),并且前者是在strand外执行的;
(4). s.dispatch(a)先于执行s.dispatch(b),同时他们都是在strand外执行的;
(5). async_op_1(…, s.wrap(a)); async_op_2(…, s.wrap(b));这两者的执行顺序没有任何的保证,而strand所能给予的保证是a1和b1不会并行执行,如果s.wrap(x1)先被执行,那么x1也会先被执行;
其实,其要诀就是:如果在strand中,那么dispatch会直接在调用函数中执行,否则按照添加到队列中的顺序来排队执行。举个例子,比如假设

1
2
3
4
auto wrapped_handler1 = strand.wrap(handler1);
auto wrapped_handler2 = strand.wrap(handler2);
socket.async_read_some(buffer1, wrapped_handler1); // op1
socket.async_read_some(buffer2, wrapped_handler2); // op2

由于op1先于op2启动,所以保证buffer1在stream中接收到的数据是先于buffer2的,但是wrapped_handler1和wrapped_handler2的调用顺序是没有保证的,strand做出的保证是:
(1). handler1和handler2不会并发的执行;
(2). 如果wrapped_handler1先于wrapped_handler2被执行,那么handler1先于handler2被执行,反之亦然。

strand类定义在[strand.hpp]boost::asio::io_service::strand,这个类只是个空壳,主要包括两个数据成员detail::strandservice& service和detail::strand_service::implementationtype impl两个成员,具体实现需要查看strand_service类的实现细节,该类有几个重要的成员变量:
(1). io_service_impl& ioservice; 构造strand的时候传递进来的ioservice对象;
(2). detail::mutex mutex
; 主要用来保护下面的locked等内部变量;
(3). bool locked
; 如果当前有其他的handler正在被执行或正在被调度执行,那么这个变量是true,如果此时有新的handler需要被加入就需要等待;
(4). op_queue waitingqueue; 正在等待strand但是除非等到下次strand调度,否则不应当被运行,修改时候需要mutex_保护;
(5). op_queue readyqueue; 已经拥有了lock_,即将被运行的队列;

想必看到上面的成员之后,对strand的串行化原理会猜个八九分了,但是如我们之前所跟的,一个async_read_some()调用的话,descriptor_state和reactor_op会多次加入到io_service上面,对于这个二段式的操作,其序列化需求还是跟之前的io_service直接调度有些区别的吧。strand其提供最常用的接口包括dispatch、post、wrap,我们可以从这些函数中了解strand串行化的机制:
(1). dispatch
如果当前的线程正在执行strand,那么就直接调用这个handler:

1
2
3
4
5
if (call_stack<strand_impl>::contains(impl)) {
fenced_block b(fenced_block::full);
boost_asio_handler_invoke_helpers::invoke(handler, handler);
return;
}

其中的invoke调用代码为:

1
2
using boost::asio::asio_handler_invoke;
asio_handler_invoke(function, boost::asio::detail::addressof(context));

可见这个asio_handler_invoke就是在strand介绍文档中看到的,其默认操作就是直接调用用户提供的handler。关于这个asio_handler_invoke,如果要深究下去东西也很多,可以参见参考中的When to use asio_handler_invoke?,其主要思想是提供了一个可供记录的上下文环境context,因为比如composed operation中, intermediate handler可能会被创建零或者多次,这个状态必须在外层的wrap中保留下来才可以。不过在上面的例子中,貌似没有做什么额外的事情。

否则上面的dispatch()会调用strand_service::do_dispatch(),这里的判断就更明显了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (can_dispatch && !impl->locked_){
impl->locked_ = true;
impl->mutex_.unlock();
return true;
}

if (impl->locked_){
impl->waiting_queue_.push(op);
impl->mutex_.unlock();
}
else {
impl->locked_ = true;
impl->mutex_.unlock();
impl->ready_queue_.push(op);
io_service_.post_immediate_completion(impl, false);
}

如果当前是执行strand的线程并且没有lock,那么就返回true,效果是直接执行handler;否则如果lock了就添加到waitingqueue上面;再则没有lock_就添加到readyqueue队列上面,此时通过post_immediate_completion添加到ioservice.opqueue上面被调度执行;
如果上面返回是true,此处就是在do_dispatch()函数内部执行了,通过设置on_dispatch_exit的RAII,在此次调用完后会把waitingqueue的handler全部移动到readyqueue,如果readyqueue中真的有元素,就设置lock_为ture,并将队列中的handler添加到ioservice.opqueue上面去。
上面介绍了都是善后工作,真正的函数内调用代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
completion_handler<Handler>::do_complete(
&io_service_, o, boost::system::error_code(), 0);

static void do_complete(io_service_impl* owner, operation* base,
const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
{
// Take ownership of the handler object.
completion_handler* h(static_cast<completion_handler*>(base));
ptr p = { boost::asio::detail::addressof(h->handler_), h, h };
...
Handler handler(BOOST_ASIO_MOVE_CAST(Handler)(h->handler_));
p.h = boost::asio::detail::addressof(handler);
p.reset();

// Make the upcall if required.
if (owner) {
fenced_block b(fenced_block::half);
BOOST_ASIO_HANDLER_INVOCATION_BEGIN(());
boost_asio_handler_invoke_helpers::invoke(handler, handler);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
}

这里调用的时候owner不是空的,所以会调用用户提供的handler。

(2). post
post的逻辑就比上面要简单很多了,如果locked_==true,那么就直接添加到waitingqueue,否则的话说明当前strand没有运行,就设置lock_=true并添加到readyqueue队列上,同时添加到ioservice.opqueue上面等待调度执行;

(3). wrap
wrap函数算是最常用的函数了,当原来所有的asyncxxxx所传入的handler,都可以直接使用strand.wrap进行包装,就可以保证在多线程环境下序列化调用安全了。其wrap成员函数使用了detail::wrappedhandler进行包装,类成员也就dispatcher和handler_两个成员变量。

1
2
3
4
wrap(Handler handler) {
return detail::wrapped_handler<io_service::strand, Handler,
detail::is_continuation_if_running>(*this, handler);
}

由于strand在我们操作接口中的角色就是添加了一个wrap,基本的业务流程还是在io_service中进行的,所以这里预测,有无strand.wrap的差异也就是在需要调用的handler的期间:
当socket的IO操作完成之后,会继续调用o->complete(*this, ec, task_result);,此时会按照如下的调用链:

1
2
3
4
5
6
7
8
9
10
task_io_service_operation::complete() -> boost_asio_handler_invoke_helpers::invoke(handler, handler.handler_);
-> asio_handler_invoke(function, boost::asio::detail::addressof(context));

asio_handler_invoke(Function& function,
wrapped_handler<Dispatcher, Handler, IsContinuation>* this_handler)
{
this_handler->dispatcher_.dispatch(
rewrapped_handler<Function, Handler>(
function, this_handler->handler_));
}

这里看出了dispatcher_.dispatch(),就跟上面分析的dispatch联系起来了,也就是上面的成员函数添加真正的handler。由之前的分析知道,用户提供的handler是在实际的IO执行完成之后才会回掉的,所以可以看出这里的strand不会保护底层的IO操作,只会保护用户提供的回调handler的串行化执行。

参考