【libuv高效编程】libuv学习超详细教程1——libuv的编译与安装
【libuv高效编程】libuv学习超详细教程2——libuv框架初窥
【libuv高效编程】libuv学习超详细教程3——libuv事件循环
【libuv高效编程】libuv学习超详细教程4——libuv idle空闲句柄解读
【libuv高效编程】libuv学习超详细教程5——libuv prepare准备句柄解读
【libuv高效编程】libuv学习超详细教程6——libuv check 检查句柄解读
【libuv高效编程】libuv学习超详细教程7——libuv thread 线程句柄解读
【libuv高效编程】libuv学习超详细教程8——libuv signal 信号句柄解读
async handleasync handle可译为异步句柄,它主要是用于提供异步唤醒的功能,比如在用户线程中唤醒主事件循环线程,并且触发对应的回调函数。
从事件循环线程的处理过程可知,它在io循环时会进入阻塞状态,而阻塞的具体时间则通过计算得到,那么在某些情况下,我们想要唤醒事件循环线程,就可以通过ansyc去操作,比如当线程池的线程处理完事件后,执行的结果是需要交给事件循环线程的,这时就需要用到唤醒事件循环线程,当然方法也是很简单,调用一下uv_async_send()函数通知事件循环线程即可。libuv线程池中的线程就是利用这个机制和主事件循环线程通讯。
数据结构通过 uv_async_t 可以定义一个async handle的实例。
typedef struct uv_async_s uv_async_t;
uv_async_t是属于handle的子类,并且还包含了一个UV_ASYNC_PRIVATE_FIELDS数据类型,里面包括它的回调函数async_cb、队列、以及一个pending成员变量。
结构比较简单,async_cb 保存回调函数指针,queue 作为队列节点插入 loop->async_handles,pending 字段的作用主要是用于保护操作,在唤醒的时候使用,初始化为 0, 在调用唤醒函数的时候会被设置为 1,为什么要这样子做呢,因为async handle是异步句柄,这可能不止一个线程会尝试唤醒事件循环,这一个async handle不能同时被多个线程操作,因此需要进行原子保护,当它为1的时候表示有其他线程在操作这个async handle。
struct uv_async_s {
UV_HANDLE_FIELDS
UV_ASYNC_PRIVATE_FIELDS
};
#define UV_ASYNC_PRIVATE_FIELDS \
uv_async_cb async_cb; \
void* queue[2]; \
int pending; \
uv_async_cb回调函数:
typedef void (*uv_async_cb)(uv_async_t* handle);
API
async handle相关的API非常简单,就一个初始化uv_async_init(),还有一个异步通知的函数uv_async_send()
uv_async_init()函数原型:
UV_EXTERN int uv_async_init(uv_loop_t*,
uv_async_t* async,
uv_async_cb async_cb);
参数:
uv_loop_t:传入了事件循环的句柄。 async:指定初始化的async handle。 async_cb:指定async handle的回调函数。uv_async_init()初始化函数不同于其他handle的初始化函数,因为它会立即将async handle设置为活跃状态,所以 async handle 没有 start 相关的函数。
其实深入看uv__async_start()源码你就会发现,它实际上也是通过pipe管道进行唤醒的,因为主线程的io循环其实是在观察是否有可读写的io,libuv将管道等都抽象为io观察者了,在io循环中观察管道的读取端,当有数据到来则唤醒,越是深入了解libuv,你就会发现其实就是各个线程、进程间的通信,一种异步的通信,只不过libuv处理的很好,抽象了很多数据结构,并且衍生出了很多子类。
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;
/* 其实这个函数的内部主要做了以下操作:创建一个管道,并且将管道注册到loop->async_io_watcher,并且start */
err = uv__async_start(loop);
if (err)
return err;
/* 设置UV_HANDLE_REF标记,并且将async handle 插入loop->handle_queue */
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
/* 注册回调函数、将 pending 字段初始化为0 */
handle->async_cb = async_cb;
handle->pending = 0;
/* queue 作为队列节点插入 loop->async_handles */
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
return 0;
}
static int uv__async_start(uv_loop_t* loop) {
int pipefd[2];
int err;
if (loop->async_io_watcher.fd != -1)
return 0;
/* 创建管道 */
#ifdef __linux__
err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (err < 0)
return UV__ERR(errno);
pipefd[0] = err;
pipefd[1] = -1;
#else
err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
if (err async_io_watcher */
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
uv_async_send()
函数原型:
int uv_async_send(uv_async_t* handle)
参数:
handle:要唤醒指定的async handle。uv_async_send()函数发送消息唤醒事件循环线程并触发回调函数调用,其实我们不难想象出来,它的唤醒就是将消息写入管道中,让io观察者发现管道有数据从而唤醒事件循环线程,并随之处理这个async handle。
其实真正的唤醒操作是uv__async_send()函数,它往管道中写入消息就行了。
int uv_async_send(uv_async_t* handle) {
if (ACCESS_ONCE(int, handle->pending) != 0)
return 0;
/* 将handle->pending设置为1,表示此时有线程在操作这个async handle */
if (cmpxchgi(&handle->pending, 0, 1) != 0)
return 0;
/* 唤醒事件循环线程 */
uv__async_send(handle->loop);
/* 操作完成 */
if (cmpxchgi(&handle->pending, 1, 2) != 1)
abort();
return 0;
}
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
int r;
buf = "";
len = 1;
fd = loop->async_wfd;
#if defined(__linux__)
if (fd == -1) {
static const uint64_t val = 1;
buf = &val;
len = sizeof(val);
fd = loop->async_io_watcher.fd; /* eventfd */
}
#endif
do
r = write(fd, buf, len);
while (r == -1 && errno == EINTR);
if (r == len)
return;
if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
abort();
}
async的处理过程
前面所介绍的都是初始化与通知的方式,那么在事件循环中怎么去处理async handle呢?
我们注意到uv_async_init()
函数可能多次被调用,初始化多个async handle,但是 loop->async_io_watcher 只有一个,那么问题来了,那么多个async handle都共用一个io观察者(假设loop是一个),那么在 loop->async_io_watcher 上有 I/O 事件时,并不知道是哪个 async handle 发送的,因此我们要知道async handle是如何处理这些的。
我们也知道从uv__io_init()函数中已经注册了一个uv__async_io()函数用于处理loop->async_io_watcher 的 I/O 事件,那么我们就看uv__async_io()函数的处理过程即可:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;
assert(w == &loop->async_io_watcher);
for (;;) {
/* 不断的读取 w->fd 上的数据到 buf 中直到为空,buf 中的数据无实际用途 */
r = read(w->fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
/* 读取到数据跳出循环 */
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
QUEUE_MOVE(&loop->async_handles, &queue);
/* 遍历队列 */
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
/* 获取async handle */
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
/* 重新插入队列 */
QUEUE_INSERT_TAIL(&loop->async_handles, q);
if (0 == uv__async_spin(h))
continue; /* Not pending. */
if (h->async_cb == NULL)
continue;
/* 调用async 的回调函数 */
h->async_cb(h);
}
}
example
接着来一个例子吧,都是非常简单的,创建1个线程,在事件循环中等待async handle。
#include
#include
#include
void wake_entry(void *arg)
{
sleep(5);
printf("wake_entry running, wake async!\n");
uv_async_send((uv_async_t*)arg);
uv_stop(uv_default_loop());
}
void my_async_cb(uv_async_t* handle)
{
printf("my async running!\n");
}
int main()
{
uv_thread_t wake;
uv_async_t async;
uv_async_init(uv_default_loop(), &async, my_async_cb);
uv_thread_create(&wake, wake_entry, &async);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_thread_join(&wake);
return 0;
}
参考
libuv官方文档
例程代码获取libuv-learning-code