【libuv高效编程】libuv学习超详细教程9——libuv async异步句柄解读

Jacinthe ·
更新时间:2024-09-20
· 972 次阅读

文章目录libuv系列文章async handle数据结构APIuv_async_init()uv_async_send()async的处理过程example参考例程代码获取 libuv系列文章

【libuv高效编程】libuv学习超详细教程1——libuv的编译与安装

【libuv高效编程】libuv学习超详细教程2——libuv框架初窥

【libuv高效编程】libuv学习超详细教程3——libuv事件循环

【libuv高效编程】libuv学习超详细教程4——libuv idle空闲句柄解读

【libuv高效编程】libuv学习超详细教程5——libuv prepare准备句柄解读

【libuv高效编程】libuv学习超详细教程6——libuv check 检查句柄解读

【libuv高效编程】libuv学习超详细教程7——libuv thread 线程句柄解读

【libuv高效编程】libuv学习超详细教程8——libuv signal 信号句柄解读

async handle

async handle可译为异步句柄,它主要是用于提供异步唤醒的功能,比如在用户线程中唤醒主事件循环线程,并且触发对应的回调函数。

从事件循环线程的处理过程可知,它在io循环时会进入阻塞状态,而阻塞的具体时间则通过计算得到,那么在某些情况下,我们想要唤醒事件循环线程,就可以通过ansyc去操作,比如当线程池的线程处理完事件后,执行的结果是需要交给事件循环线程的,这时就需要用到唤醒事件循环线程,当然方法也是很简单,调用一下uv_async_send()函数通知事件循环线程即可。libuv线程池中的线程就是利用这个机制和主事件循环线程通讯。

数据结构

通过 uv_async_t 可以定义一个async handle的实例。

typedef struct uv_async_s uv_async_t;

uv_async_t是属于handle的子类,并且还包含了一个UV_ASYNC_PRIVATE_FIELDS数据类型,里面包括它的回调函数async_cb、队列、以及一个pending成员变量。

结构比较简单,async_cb 保存回调函数指针,queue 作为队列节点插入 loop->async_handles,pending 字段的作用主要是用于保护操作,在唤醒的时候使用,初始化为 0, 在调用唤醒函数的时候会被设置为 1,为什么要这样子做呢,因为async handle是异步句柄,这可能不止一个线程会尝试唤醒事件循环,这一个async handle不能同时被多个线程操作,因此需要进行原子保护,当它为1的时候表示有其他线程在操作这个async handle。

struct uv_async_s { UV_HANDLE_FIELDS UV_ASYNC_PRIVATE_FIELDS }; #define UV_ASYNC_PRIVATE_FIELDS \ uv_async_cb async_cb; \ void* queue[2]; \ int pending; \

uv_async_cb回调函数:

typedef void (*uv_async_cb)(uv_async_t* handle); API

async handle相关的API非常简单,就一个初始化uv_async_init(),还有一个异步通知的函数uv_async_send()

uv_async_init()

函数原型:

UV_EXTERN int uv_async_init(uv_loop_t*, uv_async_t* async, uv_async_cb async_cb);

参数:

uv_loop_t:传入了事件循环的句柄。 async:指定初始化的async handle。 async_cb:指定async handle的回调函数。

uv_async_init()初始化函数不同于其他handle的初始化函数,因为它会立即将async handle设置为活跃状态,所以 async handle 没有 start 相关的函数。

其实深入看uv__async_start()源码你就会发现,它实际上也是通过pipe管道进行唤醒的,因为主线程的io循环其实是在观察是否有可读写的io,libuv将管道等都抽象为io观察者了,在io循环中观察管道的读取端,当有数据到来则唤醒,越是深入了解libuv,你就会发现其实就是各个线程、进程间的通信,一种异步的通信,只不过libuv处理的很好,抽象了很多数据结构,并且衍生出了很多子类。

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { int err; /* 其实这个函数的内部主要做了以下操作:创建一个管道,并且将管道注册到loop->async_io_watcher,并且start */ err = uv__async_start(loop); if (err) return err; /* 设置UV_HANDLE_REF标记,并且将async handle 插入loop->handle_queue */ uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); /* 注册回调函数、将 pending 字段初始化为0 */ handle->async_cb = async_cb; handle->pending = 0; /* queue 作为队列节点插入 loop->async_handles */ QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); uv__handle_start(handle); return 0; } static int uv__async_start(uv_loop_t* loop) { int pipefd[2]; int err; if (loop->async_io_watcher.fd != -1) return 0; /* 创建管道 */ #ifdef __linux__ err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (err < 0) return UV__ERR(errno); pipefd[0] = err; pipefd[1] = -1; #else err = uv__make_pipe(pipefd, UV__F_NONBLOCK); if (err async_io_watcher */ uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]); uv__io_start(loop, &loop->async_io_watcher, POLLIN); loop->async_wfd = pipefd[1]; return 0; } uv_async_send()

函数原型:

int uv_async_send(uv_async_t* handle)

参数:

handle:要唤醒指定的async handle。

uv_async_send()函数发送消息唤醒事件循环线程并触发回调函数调用,其实我们不难想象出来,它的唤醒就是将消息写入管道中,让io观察者发现管道有数据从而唤醒事件循环线程,并随之处理这个async handle。

其实真正的唤醒操作是uv__async_send()函数,它往管道中写入消息就行了。

int uv_async_send(uv_async_t* handle) { if (ACCESS_ONCE(int, handle->pending) != 0) return 0; /* 将handle->pending设置为1,表示此时有线程在操作这个async handle */ if (cmpxchgi(&handle->pending, 0, 1) != 0) return 0; /* 唤醒事件循环线程 */ uv__async_send(handle->loop); /* 操作完成 */ if (cmpxchgi(&handle->pending, 1, 2) != 1) abort(); return 0; } static void uv__async_send(uv_loop_t* loop) { const void* buf; ssize_t len; int fd; int r; buf = ""; len = 1; fd = loop->async_wfd; #if defined(__linux__) if (fd == -1) { static const uint64_t val = 1; buf = &val; len = sizeof(val); fd = loop->async_io_watcher.fd; /* eventfd */ } #endif do r = write(fd, buf, len); while (r == -1 && errno == EINTR); if (r == len) return; if (r == -1) if (errno == EAGAIN || errno == EWOULDBLOCK) return; abort(); } async的处理过程

前面所介绍的都是初始化与通知的方式,那么在事件循环中怎么去处理async handle呢?

我们注意到uv_async_init()函数可能多次被调用,初始化多个async handle,但是 loop->async_io_watcher 只有一个,那么问题来了,那么多个async handle都共用一个io观察者(假设loop是一个),那么在 loop->async_io_watcher 上有 I/O 事件时,并不知道是哪个 async handle 发送的,因此我们要知道async handle是如何处理这些的。

我们也知道从uv__io_init()函数中已经注册了一个uv__async_io()函数用于处理loop->async_io_watcher 的 I/O 事件,那么我们就看uv__async_io()函数的处理过程即可:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ssize_t r; QUEUE queue; QUEUE* q; uv_async_t* h; assert(w == &loop->async_io_watcher); for (;;) { /* 不断的读取 w->fd 上的数据到 buf 中直到为空,buf 中的数据无实际用途 */ r = read(w->fd, buf, sizeof(buf)); if (r == sizeof(buf)) continue; /* 读取到数据跳出循环 */ if (r != -1) break; if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; abort(); } QUEUE_MOVE(&loop->async_handles, &queue); /* 遍历队列 */ while (!QUEUE_EMPTY(&queue)) { q = QUEUE_HEAD(&queue); /* 获取async handle */ h = QUEUE_DATA(q, uv_async_t, queue); QUEUE_REMOVE(q); /* 重新插入队列 */ QUEUE_INSERT_TAIL(&loop->async_handles, q); if (0 == uv__async_spin(h)) continue; /* Not pending. */ if (h->async_cb == NULL) continue; /* 调用async 的回调函数 */ h->async_cb(h); } } example

接着来一个例子吧,都是非常简单的,创建1个线程,在事件循环中等待async handle。

#include #include #include void wake_entry(void *arg) { sleep(5); printf("wake_entry running, wake async!\n"); uv_async_send((uv_async_t*)arg); uv_stop(uv_default_loop()); } void my_async_cb(uv_async_t* handle) { printf("my async running!\n"); } int main() { uv_thread_t wake; uv_async_t async; uv_async_init(uv_default_loop(), &async, my_async_cb); uv_thread_create(&wake, wake_entry, &async); uv_run(uv_default_loop(), UV_RUN_DEFAULT); uv_thread_join(&wake); return 0; } 参考

libuv官方文档

例程代码获取

libuv-learning-code


作者:_杰杰_



libuv async 句柄 教程

需要 登录 后方可回复, 如果你还没有账号请 注册新账号