Linux并发IO详解(下)(linux并发执行命令的方式)

Demo

本部分代码实现参考可能是最接地气的 I/O 多路复用小结 (
https://mengkang.net/726.html?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io)

阻塞式网络编程接口

#include <stdio.h>

#include <unistd.h>

#include <sys/types.h>

#include <sys/socket.h>

#include <arpa/inet.h>

#include <netinet/in.h>

#include <string.h>

#define SERV_PORT 8031

#define BUFSIZE 1024

int main(void)

{

int lfd, cfd;

struct sockaddr_in serv_addr,clin_addr;

socklen_t clin_len;

char recvbuf[BUFSIZE];

int len;

lfd = socket(AF_INET,SOCK_STREAM,0);

serv_addr.sin_family = AF_INET;

serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);

serv_addr.sin_port = htons(SERV_PORT);

bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));

listen(lfd, 128);

while(1){

clin_len = sizeof(clin_addr);

cfd = accept(lfd, (struct sockaddr *)&clin_addr, &clin_len);

while(len = read(cfd,recvbuf,BUFSIZE)){

write(STDOUT_FILENO,recvbuf,len);//把客户端输入的内容输出在终端

// 只有当客户端输入 stop 就停止当前客户端的连接

if (strncasecmp(recvbuf,"stop",4) == 0){

close(cfd);

break;

}

}

}

close(lfd);

return 0;

}

编译运行之后,开启两个终端使用命令nc 10.211.55.4 8031(假如服务器的 ip 为 10.211.55.4)。如果首先连上的客户端一直不输入stop加回车,那么第二个客户端输入任何内容都不会被客户端接收。如下图所示

输入abc的是先连接上的,在其输入stop之前,后面连接上的客户端输入123并不会被服务端收到。也就是说一直阻塞在第一个客户端那里。当第一个客户端输入stop之后,服务端才收到第二个客户端的发送过来的数据。

select

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <errno.h>

#include <sys/types.h>

#include <sys/socket.h>

#include <arpa/inet.h>

#include <netinet/in.h>

#include <fcntl.h>

#include <sys/select.h>

#include <sys/time.h>

#include <string.h>

#define SERV_PORT 8031

#define BUFSIZE 1024

#define FD_SET_SIZE 128

int main(void) {

int lfd, cfd, maxfd, scokfd, retval;

struct sockaddr_in serv_addr, clin_addr;

socklen_t clin_len; // 地址信息结构体大小

char recvbuf[BUFSIZE];

int len;

fd_set read_set, read_set_init;

int client[FD_SET_SIZE];

int i;

int maxi = -1;

if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {

perror("套接字描述符创建失败");

exit(1);

}

int opt = 1;

setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

memset(&serv_addr, 0, sizeof(serv_addr));

serv_addr.sin_family = AF_INET;

serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);

serv_addr.sin_port = htons(SERV_PORT);

if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) {

perror("绑定失败");

exit(1);

}

if (listen(lfd, FD_SET_SIZE) == -1) {

perror("监听失败");

exit(1);

}

maxfd = lfd;

for (i = 0; i < FD_SET_SIZE; ++i) {

client[i] = -1;

}

FD_ZERO(&read_set_init);

FD_SET(lfd, &read_set_init);

while (1) {

// 每次循环开始时,都初始化 read_set

read_set = read_set_init;

// 因为上一步 read_set 已经重置,所以需要已连接上的客户端 fd (由上次循环后产生)重新添加进 read_set

for (i = 0; i < FD_SET_SIZE; ++i) {

if (client[i] > 0) {

FD_SET(client[i], &read_set);

}

}

printf("select 等待\n");

// 这里会阻塞,直到 read_set 中某一个 fd 有数据可读才返回,注意 read_set 中除了客户端 fd 还有服务端监听的 fd

retval = select(maxfd + 1, &read_set, NULL, NULL, NULL);

if (retval == -1) {

perror("select 错误\n");

} else if (retval == 0) {

printf("超时\n");

continue;

}

printf("select 返回\n");

//------------------------------------------------------------------------------------------------

// 用 FD_ISSET 来判断 lfd (服务端监听的fd)是否可读。只有当新的客户端连接时,lfd 才可读

if (FD_ISSET(lfd, &read_set)) {

clin_len = sizeof(clin_addr);

if ((cfd = accept(lfd, (struct sockaddr *) &clin_addr, &clin_len)) == -1) {

perror("接收错误\n");

continue;

}

for (i = 0; i < FD_SET_SIZE; ++i) {

if (client[i] < 0) {

// 把客户端 fd 放入 client 数组

client[i] = cfd;

printf("接收client[%d]一个请求来自于: %s:%d\n", i, inet_ntoa(clin_addr.sin_addr), ntohs(clin_addr.sin_port));

break;

}

}

// 最大的描述符值也要重新计算

maxfd = (cfd > maxfd) ? cfd : maxfd;

// maxi 用于下面遍历所有有效客户端 fd 使用,以免遍历整个 client 数组

maxi = (i >= maxi) ? ++i : maxi;

}

//------------------------------------------------------------------------------------------------

for (i = 0; i < maxi; ++i) {

if (client[i] < 0) {

continue;

}

// 如果客户端 fd 中有数据可读,则进行读取

if (FD_ISSET(client[i], &read_set)) {

// 注意:这里没有使用 while 循环读取,如果使用 while 循环读取,则有阻塞在一个客户端了。

// 可能你会想到如果一次读取不完怎么办?

// 读取不完时,在循环到 select 时 由于未读完的 fd 还有数据可读,那么立即返回,然后到这里继续读取,原来的 while 循环读取直接提到最外层的 while(1) + select 来判断是否有数据继续可读

len = read(client[i], recvbuf, BUFSIZE);

if (len > 0) {

write(STDOUT_FILENO, recvbuf, len);

}else if (len == 0){

// 如果在客户端 ctrl+z

close(client[i]);

printf("clinet[%d] 连接关闭\n", i);

FD_CLR(client[i], &read_set);

client[i] = -1;

break;

}

}

}

}

close(lfd);

return 0;

}

epoll

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <errno.h>

#include <sys/types.h>

#include <sys/socket.h>

#include <arpa/inet.h>

#include <netinet/in.h>

#include <fcntl.h>

#include <sys/epoll.h>

#include <sys/time.h>

#include <string.h>

#define SERV_PORT 8031

#define MAX_EVENT_NUMBER 1024

#define BUFFER_SIZE 10

/* 将文件描述符 fd 上的 EPOLLIN 注册到 epollfd 指示的 epoll 内核事件表中 */

void addfd(int epollfd, int fd) {

struct epoll_event event;

event.data.fd = fd;

event.events = EPOLLIN | EPOLLET;

epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);

int old_option = fcntl(fd, F_GETFL);

int new_option = old_option | O_NONBLOCK;

fcntl(fd, F_SETFL, new_option);

}

void et(struct epoll_event *events, int number, int epollfd, int listenfd) {

char buf[BUFFER_SIZE];

for (int i = 0; i < number; ++i) {

int sockfd = events[i].data.fd;

if (sockfd == listenfd) {

struct sockaddr_in client_address;

socklen_t length = sizeof(client_address);

int connfd = accept(listenfd, (struct sockaddr *) &client_address, &length);

printf("接收一个请求来自于: %s:%d\n", inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port));

addfd(epollfd, connfd);

} else if (events[i].events & EPOLLIN) {

/* 这段代码不会被重复触发,所以我们循环读取数据,以确保把 socket 缓存中的所有数据读取*/

while (1) {

memset(buf, '\0', BUFFER_SIZE);

int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);

if (ret < 0) {

/* 对非阻塞 I/O ,下面的条件成立表示数据已经全部读取完毕。此后 epoll 就能再次触发 sockfd 上的 EPOLLIN 事件,以驱动下一次读操作 */

if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {

printf("read later\n");

break;

}

close(sockfd);

break;

} else if (ret == 0) {

printf("断开一个连接\n");

close(sockfd);

} else {

printf("get %d bytes of content: %s\n", ret, buf);

}

}

}

}

}

int main(void) {

int lfd, epollfd,ret;

struct sockaddr_in serv_addr;

if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {

perror("套接字描述符创建失败");

exit(1);

}

int opt = 1;

setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

memset(&serv_addr, 0, sizeof(serv_addr));

serv_addr.sin_family = AF_INET;

serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);

serv_addr.sin_port = htons(SERV_PORT);

if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) {

perror("绑定失败");

exit(1);

}

if (listen(lfd, 5) == -1) {

perror("监听失败");

exit(1);

}

struct epoll_event events[MAX_EVENT_NUMBER];

if ((epollfd = epoll_create(5)) == -1) {

perror("创建失败");

exit(1);

}

// 把服务器端 lfd 添加到 epollfd 指定的 epoll 内核事件表中,添加一个 lfd 可读的事件

addfd(epollfd, lfd);

while (1) {

// 阻塞等待新客户端的连接或者客户端的数据写入,返回需要处理的事件数目

if ((ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1)) < 0) {

perror("epoll_wait失败");

exit(1);

}

et(events, ret, epollfd, lfd);

}

close(lfd);

return 0;

}

原文链接:,转发请注明来源!