Boost.Coroutine协程库的简单使用

  绕了很久,看了很多的资料,总算对协程(coroutine)是有点眉目了。

  由于C++原生支持多进程多线程,可以被操作系统直接调度,所以感觉对协程的支持没有那么的急迫,不过现在网上搜到很多资料,说是建议要把协程推到标准库中,可见协程还是蛮有用的。从原理上看,协程保存了执行当前位置,后续可以切换回来,像是一个用户态的线程,但是和一般的线程不同的是不是抢占式的(pre-emptive)切换,而是一种协作式的(cooperative)推拉;而对于用户来说,可以类似用符合思维习惯的同步手法,写出具有异步功能的高效代码,而不用像传统异步开发设置各种回调函数把代码割离弄的支离破碎的;最后还是得意于协程比线程更加的轻量级,切换过程也不会陷入内核态,增加系统的运行效率。
coroutine
  同时最近发现了Tecent Phxteam开源出来的phxsql项目,里面就有协程相关的使用,可见协程是可以用在高性能需求的生产环境上的。

  Boost库中的协程支持两种方式:一种是封装了Boost.Coroutine的spawn,是一个stackful类型的协程;一种是asio作者写出的stackless协程。下面就两类分别罗列出相关特性。

一、stackless协程

  在C++中有函数对象的概念后,只要类提供operator()的接口,那么类对象就可以当作函数调用,同时类的其他成员可以保存相关的状态信息。其实stackless就是通过class coroutine这个类本身来实现当前协程的状态保护的,其实其内部就是用的一个int来保留下次resume的行号的,同时提供is_child()、is_parent()、is_complete()三个函数来辅助控制协程的行为。
  要支持协程的函数类必须是可拷贝构造和赋值构造的,其既可以作为实现类的基类派生,也可以作为实现类的一个成员变量,甚至是lambda、bind的参数。其定义了几个C++标准之外的伪关键字方便使用,通过包含就可以使用。
(1) reenter

1
2
reenter(this) {} //继承形式
reenter(coro_) {} //成员变量形式

  当reenter被执行的时候,控制流会跳转到最后yield或者fork的位置。
  需要注意的是reenter宏是通过switch实现的,意味着当在协程体中使用局部变量的时候,当重入协程体时候不能忽略局部变量的定义。如果当前需要局部变量,那么用下面的方式使用符合的语句块。
(2) yield statement
  常常用在异步操作的时候,比如

1
yield socket_->async_read_some(buffer(*buffer_), *this);

  其执行的逻辑为:yield保存了当前协程的状态;其表达式初始化了异步操作;定义恢复点为statement后的一条语句;控制流被转移到了协程体的结尾。
  当异步操作结束的时候,函数对象重新被唤醒,然后reenter使得执行流转移到了恢复点。当然statement表达式也可以是复合表达式,比如:

1
2
3
4
yield{ 
mutable_buffers_1 b = buffer(*buffer_);
socket_->async_read_some(b, *this);
}

(3) yield return expression ;
  通常用于生成器的环境下使用,其return后面的值作为函数的返回值传递出来,比如

1
2
3
4
5
6
7
8
9
10
struct interleave : coroutine
{
istream& is1; istream& is2;
char operator()(char c) {
reenter (this) for (;;) {
yield return is1.get();
yield return is2.get();
}
}
};

  上面的例子会交替的从is1和is2中产生字符,其会使得return后面表达式的值被返回。
(4) yield ;
  用于显式的控制执行的流程,通常在多个协程交替的运行完成协作工作。
(5) yield break ;
  主要用来终止协程的,yield首先设置协程的终止状体,然后流程被转移到了协程体的结尾。
  一旦终止,使用is_complete()就会返回true,同时协程不能够被再次reenter了。当然不一定要yield break,当流程执行到了协程体结尾,这些协程也会自动terminate了。
  突然意识到为啥要break了,因为reenter本来就是用switch实现的嘛。
(6) fork statement
  可以创建多个协程的拷贝,常用的情况是在服务端,协程被fork出来用于处理客户端的请求。父协程和子协程通过is_parent()、is_child()进行界定。

1
2
3
4
5
6
7
8
reenter (this) {
do{
socket_.reset(new tcp::socket(io_service_));
yield acceptor->async_accept(*socket_, *this);
fork server(*this)();
} while (is_parent());
... client-specific handling follows ...
}

  其fork语句会创建当前协程的一个拷贝,然后可能会立即执行或者被后面再调度执行,或者使用io_service::post()调度执行。
  关于stackless协程的设计和实现思路,可以查看参考文献的第一篇文章的介绍,其内部真的是用一个switch实现,使用一个变量记录代码行号的哦!

二、stackful协程

  其实现使用的Boost.Context来进行上下文的切换。使用需要包含头文件,位于名字空间boost::coroutines。
  其实现原理是每个协程都有自己的stack和control-block(boost::contexts::fcontext_t),在协程需要暂停的时候,当前协程的所有非易失的寄存器(包括ESP、EIP)都会被保存在control-block当中,而新激活的协程会从其相关的control-block中加载回复相关的寄存器信息,称之为上下文切换,相关的上下文切换不需要系统特权。
  Boost.Context提供的协程包括两类:非对称型协程asymmetric_coroutine的和对称型协程symmetric_coroutine,前者协程知道唤醒自己的协程,当需要暂停的时候控制流转换给那个特定的协程;对称协程中所有的协程都是相等的,协程可以把控制流给任何一个其它的协程。所以对称协程主要是表达并发编程中的多个独立的执行单元,而非对称协程常常用于对数据进行顺序处理的过程。
  stackful协程可以从嵌套的stackframe中暂停执行,在恢复的时候可以在其暂停的地方继续执行,而stackless协程只有顶层的函数(top-level routine)可以被暂停,所有顶层函数调用的函数都不允许被暂停,也就是不允许嵌套使用携程。
  stackful的协程可以被嵌套使用,但是要求协程是可移动(move)但不可拷贝(copy)的,因为其为RAII实现的,结束的时候资源会被自动清理释放,但是拷贝会导致内部的状态不可控。同时使用时候在context_switch切换到正在执行的相同协程的行为是未定义的。

2.1 Asymmetric coroutine

1
2
asymmetric_coroutine<>::pull_type
asymmetric_coroutine<>::push_type

  其提供了单向的数据传输操作,在数据传输的过程中伴随着Context的切换。模板参数的类型决定了数据传输的类型,如果不需要传递数据只进行Context切换,可以使用void。
(1) pull_type
  从另外的一个context获取数据,其构造函数的参数是一个cor-function函数对象,cor-function的参数是一个push_type的引用。初始化pull_type的时候,执行流被切换到了cor-function,并且synthesize一个push_type并将引用传递给协程函数。其同时还提供operator(),只进行context切换,不传输数据(即构造函数参数为空而不是模板类型指定的实参)。
  pull_type提供迭代器和std::begin()、std::end()重载,从而可以增量的切换context并进行数据的传输。pull_type的提供的成员函数get()可以从别的context中拉取数据,但是不会造成context切换,如需切换需要手动调用operator()。

1
2
3
4
5
6
7
8
9
10
11
12
13
boost::coroutines::asymmetric_coroutine<int>::pull_type source(
[&](boost::coroutines::asymmetric_coroutine<int>::push_type& sink){
int first=1, second=1;
sink(first); sink(second);
for(int i=0;i<8;++i){
int third=first+second;
first=second; second=third;
sink(third);
}
});

for(auto i:source)
std::cout << i << " ";

(2) push_type
  用于将数据传输到别的执行context,其构造函数接收的cor-function参数类型是pull_type类型的引用。在初始化push_type的时候,不同的是执行流没有转移到cor-function,而是先执行push_type::operator()去synthesize一个pull_type并将其引用传递给协程函数。其push_type::operator(T)成员函数用于推送数据给对应的context。
(3) coroutine-function
  通过pull_type::operator bool可以判断协程是否还有效(即协程函数是否已经terminated),除非第一个模板参数是void,否则返回true的同时也意味着其还可以提供数据的。
  从pull-coroutine向main-context传递数据的例子:

1
2
3
4
5
6
7
8
9
10
boost::coroutines::asymmetric_coroutine<int>::pull_type source( 
[&](boost::coroutines::asymmetric_coroutine<int>::push_type& sink){
sink(1); // push {1} back to main-context
sink(2); // push {2} back to main-context
});

while(source){ // test if pull-coroutine is valid
int ret=source.get(); // access data value
source(); // context-switch to coroutine-function
}

  从main-context向push-coroutine传递数据的例子:

1
2
3
4
5
6
7
8
9
10
11
12
// constructor does NOT enter cor-function
boost::coroutines::asymmetric_coroutine<int>::push_type sink(
[&](boost::coroutines::asymmetric_coroutine<int>::pull_type& source){
for (int i:source) {
std::cout << i << " ";
}
});

std::vector<int> v{1,1,2,3,5,8,13,21,34,55};
for( int i:v) {
sink(i); // push {i} to coroutine-function
}

2.2 Symmetric coroutine

  其caller和callee的关系是不固定的,symmetric的协程可以把执行控制转移给任意的symmetric协程,而不一定是自己的caller。

1
2
symmetric_coroutine<>::call_type
symmetric_coroutine<>::yield_type

(1) call_type
  call_type其构造函数是一个coroutine-function函数对象,协程函数接受一个yield_type的引用作为参数。实例化call_type不会将执行流传递到协程函数,其会先调用operator()去强制合成一个yield_type并将其引用传递给协程函数。
  call_type不提供get()成员函数,即不可以从其他的执行context中获取数据。
(2) yield_type
  通过调用yield_type::operator()并使用其它call_type对象作为参数,可以把数据和执行流传递给其他的context。
  其模板参数规定了传输的数据类型,通过yield_type::get()可以访问该数据。如果实例化模板使用void类型,那么可以只用作控制流传递,而不进行数据的传递。

1
2
3
4
5
6
7
8
9
10
boost::coroutines::symmetric_coroutine<int>::call_type coro( 
[&](boost::coroutines::symmetric_coroutine<int>::yield_type& yield){
for (;;) {
std::cout << yield.get() << " ";
yield(); // jump back to starting context
}
});

coro(1); // transfer {1} to coroutine-function
coro(2); // transfer {2} to coroutine-function

2.3 spawn

  如果是这么写程序,不是蛋疼,而是要蛋碎了。幸好Boost.Coroutine库给了一个高度的封装,其使用yield_context来保存协程的运行环境,然后允许程序以同步的方式执行各种异步函数操作,而这个yield_context对象是spawn函数自动产生的。当然要想知道其内部封装的实现,还是需要另外花一份功夫的。

1
2
3
4
5
6
7
template<typename Function>
void spawn(boost::asio::io_service::strand strand, Function function);
template<typename Function>
void spawn(boost::asio::io_service & io_service, Function function);

// Function需要的签名
void coroutine(boost::asio::yield_context yield);

  要求spawn的第二个参数Function的签名必须是上面的类型,如果不符合,需要使用bind/lambda方式来包装。
  此后,对于所有的异步操作,只需要把yield作为参数传递在原来需要callback的位置就可以了,当异步操作完成的时候,此处的程序会resume继续执行。
对于异步操作的函数,其前面可能是

1
2
3
4
5
void handler(boost::system::error_code ec);
void handler(boost::system::error_code ec, result_type result);
// std::size_t length = my_socket.async_read_some( boost::asio::buffer(data), yield);
// boost::system::error_code ec;
// std::size_t length = my_socket.async_read_some( boost::asio::buffer(data), yield[ec]);

  对于有result_type的类型,其result的值已经作为参数返回了(比如上面的std::size_t),而如果出错,下面的调用方法会直接抛出异常,如果想使用原先返回错误的方式而不是抛出system_error异常的方式,可以使用yield[ec]的方式调用,operator[]用于外部获取发生的错误码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void do_echo(boost::asio::yield_context yield)
{
char data[128];
for (;;)
{
std::size_t length = my_socket.async_read_some(
boost::asio::buffer(data), yield);

boost::asio::async_write(my_socket,
boost::asio::buffer(data, length), yield);
}
}

boost::asio::spawn(my_strand, do_echo);

2.4 再举个栗子

  下面写一个小例子,看看封装后的协程写异步程序是多么爽的一件事,至于为什么爽是因为同步编程才是符合人类的思维习惯的。以前设置异步读取操作后,数据的处理都必须在回调函数中处理,现在可以直接在异步操作后接着处理啦!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <string>
#include <ctime>
#include <iostream>
#include <boost/enable_shared_from_this.hpp>

using namespace boost::asio;
using std::cerr; using std::endl;

io_service io_service_;

class session: public boost::enable_shared_from_this<session>
{
public:
explicit session(ip::tcp::socket socket):
sock_(std::move(socket)),
strand_(io_service_),
uuid_(std::rand())
{}

~session() { cerr << "~sessoin ->" << uuid_ << endl; }

void go()
{
auto self(shared_from_this());
boost::asio::spawn(strand_,
boost::bind(&session::do_echo, self, _1));
}

void do_echo(yield_context yield)
{
char data[128];
std::size_t n = sock_.async_read_some(boost::asio::buffer(data), yield);
cerr << "RECVED:【" << data << "】->" << uuid_ <<endl;
std::time_t now = std::time(nullptr);
std::string time_str = std::ctime(&now);
async_write(sock_, buffer(time_str), yield);
sock_.shutdown(ip::tcp::socket::shutdown_send);
}

private:
ip::tcp::socket sock_;
io_service::strand strand_;
std::size_t uuid_;
};


void start_accept(yield_context yield)
{
ip::tcp::acceptor acceptor(io_service_, ip::tcp::endpoint(ip::tcp::v4(), 2016));

for (;;) {
boost::system::error_code ec;
ip::tcp::socket socket(io_service_);

acceptor.async_accept(socket, yield[ec]);
if(!ec)
boost::make_shared<session>(std::move(socket))->go();
}
}

int main(int argc, char* argv[])
{
boost::asio::spawn(io_service_, start_accept);
io_service_.run();
}

编译后就可以看出运行效果了:

1
➜  ~ g++ -std=c++11 test.cpp -lboost_system -lboost_coroutine -lboost_context -o test

  其实,感觉现实中协程更多的是对编程方式的改变,对控制流的操控可以用同步的结构写出异步的效果,但是协程是用户态的而不是原生的多线程,所以并不能并行执行提高并发率。但是协程能够在各个协程间进行高效的切换,这一点可以做到比传统依赖于异步调度的效率更高,这才体现出协作的本质吧!

参考文献