Linux中的异步I/O实现主要有两种方式:AIO(Asynchronous I/O)和IO多路复用(IO multiplexing)。
![基于Linux的异步I/O实现(使用Linux异步I/O实现高效数据读取) 图片[1]-基于Linux的异步I/O实现(使用Linux异步I/O实现高效数据读取)-不念博客](https://www.bunian.cn/wp-content/uploads/2023/04/weixintupian20230423120248.png)
AIO(Asynchronous I/O):
AIO是Linux中的一种异步I/O实现,它允许应用程序在不阻塞调用线程的情况下启动I/O操作。
在AIO操作完成后,应用程序可以通过一种称为事件通知的机制获得通知。
Linux中的AIO主要使用libaio库来实现。
以下是一个简单的libaio示例:
#include <stdio.h>#include <stdlib.h>#include <fcntl.h>#include <unistd.h>#include <string.h>#include <errno.h>#include <libaio.h>#define BUFFER_SIZE 1024#define FILE_PATH "testfile.txt"int main() {int fd;char buffer[BUFFER_SIZE];struct iocb io;struct iocb *io_list[1];io_context_t ctx;memset(&ctx, 0, sizeof(ctx));io_queue_init(1, &ctx);fd = open(FILE_PATH, O_RDONLY | O_DIRECT);if (fd < 0) {perror("Failed to open file");return 1;}io_prep_pread(&io, fd, buffer, BUFFER_SIZE, 0);io_list[0] = &io;if (io_submit(ctx, 1, io_list) != 1) {perror("io_submit");return 1;}struct io_event event;int ret = io_getevents(ctx, 1, 1, &event, NULL);if (ret != 1) {perror("io_getevents");return 1;}printf("Read content:\n%s\n", buffer);io_queue_release(ctx);close(fd);return 0;}#include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <string.h> #include <errno.h> #include <libaio.h> #define BUFFER_SIZE 1024 #define FILE_PATH "testfile.txt" int main() { int fd; char buffer[BUFFER_SIZE]; struct iocb io; struct iocb *io_list[1]; io_context_t ctx; memset(&ctx, 0, sizeof(ctx)); io_queue_init(1, &ctx); fd = open(FILE_PATH, O_RDONLY | O_DIRECT); if (fd < 0) { perror("Failed to open file"); return 1; } io_prep_pread(&io, fd, buffer, BUFFER_SIZE, 0); io_list[0] = &io; if (io_submit(ctx, 1, io_list) != 1) { perror("io_submit"); return 1; } struct io_event event; int ret = io_getevents(ctx, 1, 1, &event, NULL); if (ret != 1) { perror("io_getevents"); return 1; } printf("Read content:\n%s\n", buffer); io_queue_release(ctx); close(fd); return 0; }#include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <string.h> #include <errno.h> #include <libaio.h> #define BUFFER_SIZE 1024 #define FILE_PATH "testfile.txt" int main() { int fd; char buffer[BUFFER_SIZE]; struct iocb io; struct iocb *io_list[1]; io_context_t ctx; memset(&ctx, 0, sizeof(ctx)); io_queue_init(1, &ctx); fd = open(FILE_PATH, O_RDONLY | O_DIRECT); if (fd < 0) { perror("Failed to open file"); return 1; } io_prep_pread(&io, fd, buffer, BUFFER_SIZE, 0); io_list[0] = &io; if (io_submit(ctx, 1, io_list) != 1) { perror("io_submit"); return 1; } struct io_event event; int ret = io_getevents(ctx, 1, 1, &event, NULL); if (ret != 1) { perror("io_getevents"); return 1; } printf("Read content:\n%s\n", buffer); io_queue_release(ctx); close(fd); return 0; }
IO多路复用(IO multiplexing):
IO多路复用是另一种异步I/O实现,主要使用select、poll和epoll来实现。
这些技术允许应用程序监视多个文件描述符的I/O状态,当其中一个或多个描述符准备好I/O操作时,应用程序会收到通知。
以下是一个简单的使用epoll的TCP回显服务器示例:
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <fcntl.h>#include <arpa/inet.h>#include <sys/epoll.h>#include <errno.h>#define PORT 8080#define MAX_EVENTS 10#define BUFFER_SIZE 1024int main() {int listener, conn, epoll_fd;struct sockaddr_in addr;struct epoll_event ev, events[MAX_EVENTS];listener = socket(AF_INET, SOCK_STREAM, 0);if (listener < 0) {perror("socket");exit(1);}addr.sin_family = AF_INET;addr.sin_port = htons(PORT);addr.sin_addr.s_addr = htonl(INADDR_ANY);if (bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) {perror("bind");exit(1);}if (listen(listener, 10) < 0) {perror("listen");exit(1);}epoll_fd = epoll_create1(0);if (epoll_fd < 0) {perror("epoll_create1");exit(1);}ev.events = EPOLLIN;ev.data.fd = listener;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) {perror("epoll_ctl");exit(1);}while (1) {int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);if (nfds < 0) {perror("epoll_wait");exit(1);}for (int i = 0; i < nfds; i++) {if (events[i].data.fd == listener) {conn = accept(listener, NULL, NULL);if (conn < 0) {perror("accept");exit(1);}ev.events = EPOLLIN;ev.data.fd = conn;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn, &ev) < 0) {perror("epoll_ctl");exit(1);}} else {int fd = events[i].data.fd;char buffer[BUFFER_SIZE];ssize_t n = read(fd, buffer, BUFFER_SIZE);if (n <= 0) {if (n < 0) {perror("read");}close(fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);} else {write(fd, buffer, n);}}}}close(listener);close(epoll_fd);return 0;}#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <fcntl.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <errno.h> #define PORT 8080 #define MAX_EVENTS 10 #define BUFFER_SIZE 1024 int main() { int listener, conn, epoll_fd; struct sockaddr_in addr; struct epoll_event ev, events[MAX_EVENTS]; listener = socket(AF_INET, SOCK_STREAM, 0); if (listener < 0) { perror("socket"); exit(1); } addr.sin_family = AF_INET; addr.sin_port = htons(PORT); addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) { perror("bind"); exit(1); } if (listen(listener, 10) < 0) { perror("listen"); exit(1); } epoll_fd = epoll_create1(0); if (epoll_fd < 0) { perror("epoll_create1"); exit(1); } ev.events = EPOLLIN; ev.data.fd = listener; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) { perror("epoll_ctl"); exit(1); } while (1) { int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); if (nfds < 0) { perror("epoll_wait"); exit(1); } for (int i = 0; i < nfds; i++) { if (events[i].data.fd == listener) { conn = accept(listener, NULL, NULL); if (conn < 0) { perror("accept"); exit(1); } ev.events = EPOLLIN; ev.data.fd = conn; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn, &ev) < 0) { perror("epoll_ctl"); exit(1); } } else { int fd = events[i].data.fd; char buffer[BUFFER_SIZE]; ssize_t n = read(fd, buffer, BUFFER_SIZE); if (n <= 0) { if (n < 0) { perror("read"); } close(fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); } else { write(fd, buffer, n); } } } } close(listener); close(epoll_fd); return 0; }#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <fcntl.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <errno.h> #define PORT 8080 #define MAX_EVENTS 10 #define BUFFER_SIZE 1024 int main() { int listener, conn, epoll_fd; struct sockaddr_in addr; struct epoll_event ev, events[MAX_EVENTS]; listener = socket(AF_INET, SOCK_STREAM, 0); if (listener < 0) { perror("socket"); exit(1); } addr.sin_family = AF_INET; addr.sin_port = htons(PORT); addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) { perror("bind"); exit(1); } if (listen(listener, 10) < 0) { perror("listen"); exit(1); } epoll_fd = epoll_create1(0); if (epoll_fd < 0) { perror("epoll_create1"); exit(1); } ev.events = EPOLLIN; ev.data.fd = listener; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) { perror("epoll_ctl"); exit(1); } while (1) { int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); if (nfds < 0) { perror("epoll_wait"); exit(1); } for (int i = 0; i < nfds; i++) { if (events[i].data.fd == listener) { conn = accept(listener, NULL, NULL); if (conn < 0) { perror("accept"); exit(1); } ev.events = EPOLLIN; ev.data.fd = conn; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn, &ev) < 0) { perror("epoll_ctl"); exit(1); } } else { int fd = events[i].data.fd; char buffer[BUFFER_SIZE]; ssize_t n = read(fd, buffer, BUFFER_SIZE); if (n <= 0) { if (n < 0) { perror("read"); } close(fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); } else { write(fd, buffer, n); } } } } close(listener); close(epoll_fd); return 0; }
这个示例中的TCP回显服务器在监听端口8080。
当收到客户端的连接请求时,它会接受这个连接并将新的套接字添加到epoll事件集合。
当epoll检测到有数据可读时,服务器会读取数据并将其回显到客户端。
如果检测到客户端关闭连接或发生错误,服务器将关闭相应的套接字并从epoll事件集合中删除它。
© 版权声明
本站文章由不念博客原创,未经允许严禁转载!
THE END