基于Linux的异步I/O实现(使用Linux异步I/O实现高效数据读取)

Linux中的异步I/O实现主要有两种方式:AIO(Asynchronous I/O)和IO多路复用(IO multiplexing)。

图片[1]-基于Linux的异步I/O实现(使用Linux异步I/O实现高效数据读取)-不念博客

AIO(Asynchronous I/O):

AIO是Linux中的一种异步I/O实现,它允许应用程序在不阻塞调用线程的情况下启动I/O操作。

在AIO操作完成后,应用程序可以通过一种称为事件通知的机制获得通知。

Linux中的AIO主要使用libaio库来实现。

以下是一个简单的libaio示例:

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <libaio.h>
#define BUFFER_SIZE 1024
#define FILE_PATH "testfile.txt"
int main() {
int fd;
char buffer[BUFFER_SIZE];
struct iocb io;
struct iocb *io_list[1];
io_context_t ctx;
memset(&ctx, 0, sizeof(ctx));
io_queue_init(1, &ctx);
fd = open(FILE_PATH, O_RDONLY | O_DIRECT);
if (fd < 0) {
perror("Failed to open file");
return 1;
}
io_prep_pread(&io, fd, buffer, BUFFER_SIZE, 0);
io_list[0] = &io;
if (io_submit(ctx, 1, io_list) != 1) {
perror("io_submit");
return 1;
}
struct io_event event;
int ret = io_getevents(ctx, 1, 1, &event, NULL);
if (ret != 1) {
perror("io_getevents");
return 1;
}
printf("Read content:\n%s\n", buffer);
io_queue_release(ctx);
close(fd);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <libaio.h>

#define BUFFER_SIZE 1024
#define FILE_PATH "testfile.txt"

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    struct iocb io;
    struct iocb *io_list[1];
    io_context_t ctx;

    memset(&ctx, 0, sizeof(ctx));
    io_queue_init(1, &ctx);

    fd = open(FILE_PATH, O_RDONLY | O_DIRECT);
    if (fd < 0) {
        perror("Failed to open file");
        return 1;
    }

    io_prep_pread(&io, fd, buffer, BUFFER_SIZE, 0);
    io_list[0] = &io;

    if (io_submit(ctx, 1, io_list) != 1) {
        perror("io_submit");
        return 1;
    }

    struct io_event event;
    int ret = io_getevents(ctx, 1, 1, &event, NULL);
    if (ret != 1) {
        perror("io_getevents");
        return 1;
    }

    printf("Read content:\n%s\n", buffer);

    io_queue_release(ctx);
    close(fd);
    return 0;
}
#include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <unistd.h> #include <string.h> #include <errno.h> #include <libaio.h> #define BUFFER_SIZE 1024 #define FILE_PATH "testfile.txt" int main() { int fd; char buffer[BUFFER_SIZE]; struct iocb io; struct iocb *io_list[1]; io_context_t ctx; memset(&ctx, 0, sizeof(ctx)); io_queue_init(1, &ctx); fd = open(FILE_PATH, O_RDONLY | O_DIRECT); if (fd < 0) { perror("Failed to open file"); return 1; } io_prep_pread(&io, fd, buffer, BUFFER_SIZE, 0); io_list[0] = &io; if (io_submit(ctx, 1, io_list) != 1) { perror("io_submit"); return 1; } struct io_event event; int ret = io_getevents(ctx, 1, 1, &event, NULL); if (ret != 1) { perror("io_getevents"); return 1; } printf("Read content:\n%s\n", buffer); io_queue_release(ctx); close(fd); return 0; }

IO多路复用(IO multiplexing):

IO多路复用是另一种异步I/O实现,主要使用select、poll和epoll来实现。

这些技术允许应用程序监视多个文件描述符的I/O状态,当其中一个或多个描述符准备好I/O操作时,应用程序会收到通知。

以下是一个简单的使用epoll的TCP回显服务器示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#define PORT 8080
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024
int main() {
int listener, conn, epoll_fd;
struct sockaddr_in addr;
struct epoll_event ev, events[MAX_EVENTS];
listener = socket(AF_INET, SOCK_STREAM, 0);
if (listener < 0) {
perror("socket");
exit(1);
}
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("bind");
exit(1);
}
if (listen(listener, 10) < 0) {
perror("listen");
exit(1);
}
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1");
exit(1);
}
ev.events = EPOLLIN;
ev.data.fd = listener;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) {
perror("epoll_ctl");
exit(1);
}
while (1) {
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds < 0) {
perror("epoll_wait");
exit(1);
}
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == listener) {
conn = accept(listener, NULL, NULL);
if (conn < 0) {
perror("accept");
exit(1);
}
ev.events = EPOLLIN;
ev.data.fd = conn;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn, &ev) < 0) {
perror("epoll_ctl");
exit(1);
}
} else {
int fd = events[i].data.fd;
char buffer[BUFFER_SIZE];
ssize_t n = read(fd, buffer, BUFFER_SIZE);
if (n <= 0) {
if (n < 0) {
perror("read");
}
close(fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
} else {
write(fd, buffer, n);
}
}
}
}
close(listener);
close(epoll_fd);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>

#define PORT 8080
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024

int main() {
    int listener, conn, epoll_fd;
    struct sockaddr_in addr;
    struct epoll_event ev, events[MAX_EVENTS];

    listener = socket(AF_INET, SOCK_STREAM, 0);
    if (listener < 0) {
        perror("socket");
        exit(1);
    }

    addr.sin_family = AF_INET;
    addr.sin_port = htons(PORT);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    if (bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
        perror("bind");
        exit(1);
    }

    if (listen(listener, 10) < 0) {
        perror("listen");
        exit(1);
    }

    epoll_fd = epoll_create1(0);
    if (epoll_fd < 0) {
        perror("epoll_create1");
        exit(1);
    }

    ev.events = EPOLLIN;
    ev.data.fd = listener;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) {
        perror("epoll_ctl");
        exit(1);
    }

    while (1) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds < 0) {
            perror("epoll_wait");
            exit(1);
        }

        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == listener) {
                conn = accept(listener, NULL, NULL);
                if (conn < 0) {
                    perror("accept");
                    exit(1);
                }

                ev.events = EPOLLIN;
                ev.data.fd = conn;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn, &ev) < 0) {
                    perror("epoll_ctl");
                    exit(1);
                }
            } else {
                int fd = events[i].data.fd;
                char buffer[BUFFER_SIZE];
                ssize_t n = read(fd, buffer, BUFFER_SIZE);
                if (n <= 0) {
                    if (n < 0) {
                        perror("read");
                    }
                    close(fd);
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
                } else {
                    write(fd, buffer, n);
                }
            }
        }
    }

    close(listener);
    close(epoll_fd);
    return 0;
}
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <fcntl.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <errno.h> #define PORT 8080 #define MAX_EVENTS 10 #define BUFFER_SIZE 1024 int main() { int listener, conn, epoll_fd; struct sockaddr_in addr; struct epoll_event ev, events[MAX_EVENTS]; listener = socket(AF_INET, SOCK_STREAM, 0); if (listener < 0) { perror("socket"); exit(1); } addr.sin_family = AF_INET; addr.sin_port = htons(PORT); addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) { perror("bind"); exit(1); } if (listen(listener, 10) < 0) { perror("listen"); exit(1); } epoll_fd = epoll_create1(0); if (epoll_fd < 0) { perror("epoll_create1"); exit(1); } ev.events = EPOLLIN; ev.data.fd = listener; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) { perror("epoll_ctl"); exit(1); } while (1) { int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); if (nfds < 0) { perror("epoll_wait"); exit(1); } for (int i = 0; i < nfds; i++) { if (events[i].data.fd == listener) { conn = accept(listener, NULL, NULL); if (conn < 0) { perror("accept"); exit(1); } ev.events = EPOLLIN; ev.data.fd = conn; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn, &ev) < 0) { perror("epoll_ctl"); exit(1); } } else { int fd = events[i].data.fd; char buffer[BUFFER_SIZE]; ssize_t n = read(fd, buffer, BUFFER_SIZE); if (n <= 0) { if (n < 0) { perror("read"); } close(fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); } else { write(fd, buffer, n); } } } } close(listener); close(epoll_fd); return 0; }

这个示例中的TCP回显服务器在监听端口8080。

当收到客户端的连接请求时,它会接受这个连接并将新的套接字添加到epoll事件集合。

当epoll检测到有数据可读时,服务器会读取数据并将其回显到客户端。

如果检测到客户端关闭连接或发生错误,服务器将关闭相应的套接字并从epoll事件集合中删除它。

© 版权声明
THE END