您的位置 首页 > 腾讯云社区

libuv源码分析之unix域---theanarkh

unix域是一种基于单主机的进程间通信方式。实现模式类似tcp通信。今天先分析他的实现,后续会分析他的使用。在libuv中,unix域用uv_pipe_t表示。

struct uv_pipe_s { // uv_handle_s的字段 void* data; // 所属事件循环 uv_loop_t* loop; // handle类型 uv_handle_type type; // 关闭handle时的回调 uv_close_cb close_cb; // 用于插入事件循环的handle队列 void* handle_queue[2]; union { int fd; void* reserved[4]; } u; // 用于插入事件循环的closing阶段对应的队列 uv_handle_t* next_closing; // 各种标记 unsigned int flags; // 流拓展的字段 // 用户写入流的字节大小,流缓存用户的输入,然后等到可写的时候才做真正的写 size_t write_queue_size; // 分配内存的函数,内存由用户定义,主要用来保存读取的数据 uv_alloc_cb alloc_cb; // 读取数据的回调 uv_read_cb read_cb; // 连接成功后,执行connect_req的回调(connect_req在uv__xxx_connect中赋值) uv_connect_t *connect_req; // 关闭写端的时候,发送完缓存的数据,执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值) uv_shutdown_t *shutdown_req; // 流对应的io观察者,即文件描述符+一个文件描述符事件触发时执行的回调 uv__io_t io_watcher; // 流缓存下来的,待写的数据 void* write_queue[2]; // 已经完成了数据写入的队列 void* write_completed_queue[2]; // 完成三次握手后,执行的回调 uv_connection_cb connection_cb; // 操作流时出错码 int delayed_error; // accept返回的通信socket对应的文件描述符 int accepted_fd; // 同上,用于缓存更多的通信socket对应的文件描述符 void* queued_fds; // 标记管道是否能在进程间传递 int ipc; // 用于unix域通信的文件路径 const char* pipe_fname; }

unix域继承域handle和stream。下面看一下他的具体实现逻辑。

int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); handle->shutdown_req = NULL; handle->connect_req = NULL; handle->pipe_fname = NULL; handle->ipc = ipc; return 0; }

uv_pipe_init逻辑很简单,就是初始化uv_pipe_t结构体。刚才已经见过uv_pipe_t继承于stream,uv__stream_init就是初始化stream(父类)的字段。文章开头说过,unix域的实现类似tcp的实现。遵循网络socket编程那一套。服务端使用bind,listen等函数启动服务。

// name是unix域的文件路径 int uv_pipe_bind(uv_pipe_t* handle, const char* name) { struct sockaddr_un saddr; const char* pipe_fname; int sockfd; int err; pipe_fname = NULL; pipe_fname = uv__strdup(name); name = NULL; // unix域套接字 sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0); memset(&saddr, 0, sizeof saddr); strncpy(saddr.sun_path, pipe_fname, sizeof(saddr.sun_path) - 1); saddr.sun_path[sizeof(saddr.sun_path) - 1] = ''; saddr.sun_family = AF_UNIX; // 绑定到路径,tcp是绑定到ip和端口 if (bind(sockfd, (struct sockaddr*)&saddr, sizeof saddr)) { // ... } // 已经绑定 handle->flags |= UV_HANDLE_BOUND; handle->pipe_fname = pipe_fname; // 保存socket fd,用于后面监听 handle->io_watcher.fd = sockfd; return 0; }

uv_pipe_bind函数的逻辑也比较简单,就是类似tcp的bind行为。 1 申请一个socket套接字。 2 绑定unix域路径到socket中。 绑定了路径后,就可以调用listen函数开始监听。

int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { if (uv__stream_fd(handle) == -1) return UV_EINVAL; // uv__stream_fd(handle)得到bind函数中获取的socket if (listen(uv__stream_fd(handle), backlog)) return UV__ERR(errno); // 保存回调,有进程调用connect的时候时触发,由uv__server_io函数触发 handle->connection_cb = cb; // io观察者的回调,有进程调用connect的时候时触发(io观察者的fd在init函数里设置了) handle->io_watcher.cb = uv__server_io; // 注册io观察者到libuv,等待连接,即读事件到来 uv__io_start(handle->loop, &handle->io_watcher, POLLIN); return 0; }

uv_pipe_listen执行listen函数使得socket成为监听型的套接字。然后把socket对应的文件描述符和回调封装成io观察者。注册到libuv。等到有读事件到来(有连接到来)。就会执行uv__server_io函数,摘下对应的客户端节点。最后执行connection_cb回调。

这时候,使用unix域成功启动了一个服务。接下来就是看客户端的逻辑。

void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, const char* name, uv_connect_cb cb) { struct sockaddr_un saddr; int new_sock; int err; int r; // 判断是否已经有socket了,没有的话需要申请一个,见下面 new_sock = (uv__stream_fd(handle) == -1); // 客户端还没有对应的socket fd if (new_sock) { err = uv__socket(AF_UNIX, SOCK_STREAM, 0); if (err < 0) goto out; // 保存socket对应的文件描述符到io观察者 handle->io_watcher.fd = err; } // 需要连接的服务器信息。主要是unix域路径信息 memset(&saddr, 0, sizeof saddr); strncpy(saddr.sun_path, name, sizeof(saddr.sun_path) - 1); saddr.sun_path[sizeof(saddr.sun_path) - 1] = ''; saddr.sun_family = AF_UNIX; // 连接服务器,unix域路径是name do { r = connect(uv__stream_fd(handle),(struct sockaddr*)&saddr, sizeof saddr); } while (r == -1 && errno == EINTR); // 忽略错误处理逻辑 err = 0; // 设置socket的可读写属性 if (new_sock) { err = uv__stream_open((uv_stream_t*)handle, uv__stream_fd(handle), UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); } // 把io观察者注册到libuv,等到连接成功或者可以发送请求 if (err == 0) uv__io_start(handle->loop, &handle->io_watcher, POLLIN | POLLOUT); out: // 记录错误码,如果有的话 handle->delayed_error = err; // 连接成功时的回调 handle->connect_req = req; uv__req_init(handle->loop, req, UV_CONNECT); req->handle = (uv_stream_t*)handle; req->cb = cb; QUEUE_INIT(&req->queue); // 如果连接出错,在pending节点会执行req对应的回调。错误码是delayed_error if (err) uv__io_feed(handle->loop, &handle->io_watcher); }

本文大致分析了unix域在libuv中是如何封装的。大致的流程和网络编程一样。分为服务端和客户端两面。libuv在操作系统提供的api的基础上。和libuv的异步非阻塞结合。在libuv中为进程间提供了一种通信方式。后续会继续分析本文提到的内容。unix域在操作系统的实现可以参考unix域源码解析。

---来自腾讯云社区的---theanarkh

关于作者: 瞎采新闻

这里可以显示个人介绍!这里可以显示个人介绍!

热门文章

留言与评论(共有 0 条评论)
   
验证码: