Fork me on GitHub

webRTC使用记录2

副标题:高性能网络服务器的demo 级别实现

  1. fork
  2. select
  3. epoll
  4. IO事件库,以 libevent 举例。

c 语言实现高性能网络服务器

以 fork 方式

(父进程)每收到一个连接,就 fork 一个子进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 在 tcp_server.c 的基础上进行改写

// create tcp socket
...
// bind socket
...
// listen
...

for(;;){
// accept
...
pid = fork() // pid == 0 子进程,pid>0 父进程
if(pid == 0){
// recv & close_accept
...
}
}
if(pid != 0){
// close_socket
...
}

问题:

  1. 资源被长期占用
  2. 分配子进程花费时间长

select 实现

  • 属于异步 IO,即以事件触发的机制对 IO 操作进行处理。
  • 系统开销小,不必创建进程或者线程,也不必维护。
  • 默认只有 1024 个连接,epoll 无此限制

步骤:

  1. 遍历文件描述符集中所有描述符,找到有变化的描述符。

    1. os 底层发现socket 有数据过来时,select 自己还需要做判断,确定到底有没有数据过来。
  2. 监听 socket 跟数据处理 socket 要区别对待。

  3. socket 必须设置为非阻塞方式工作,但 select 是阻塞的。

重要 API:

api FD_ZERO FD_SET FD_ISSET FD_CLEAR
文件描述符集中的所有描述符全部清掉 将某描述符设置到集内 判断某描述符是否在集内
api flag fcntl(fd, F_SETFL/F_GETFL,flag) events select(nfds, readfds, writefds, exceptfds, timeout)
文件描述符控制函数,用于控制阻塞/ 非阻塞

tips:

  1. events——select 找出哪些事件被触发了
  2. nfds——最大的文件描述符+1(约定俗成,意义不明)
  3. readfds、writefds、exceptfds——读、写、异常的文件描述符集
  4. timeout——超时时间,select 以阻塞方式执行,所以达到超时时间后,select 结束掉,进入下一个循环。一般 500 毫秒。

select demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 在 tcp_server.c 的基础上进行改写
// 重点就是文件描述符集fd_sets,首先制造 socket_fd用于监听
// 在遍历 fd_sets 时,发现有连接,就制造 accept_fd;有数据,就使用accept_fd 传输数据。

int accept_fds[FD_SIZE] = {-1,}; // FD_SIZE 默认 1024
// create tcp socket
...
flags = fcntl(socket_fd, F_GETFL, 0); // 获取 socket_fd 的 flag
fcntl(socket_fd, F_SETFL, flags | O_NONBLOCK); // 设置为非阻塞
max_fd=socket_fd;
// bind socket
...
// listen
...

for(;;){
FD_ZERO(&fd_sets); // 清空fd_sets
FD_SET(socket_fd, &fd_sets); // 将当前 socket (监听 socket)设置到 fd_sets 文件描述符集内
for(int i=0; i<FD_SIZE; i++){
if(accept_fd[i] != -1){
if(accept_fds[i] > max_fd){
max_fd = accept_fds[i]; //使 max_fd 保持为最大文件描述符
}
FD_SET(accept_fds[i], &fd_sets);
}
}
events = select(max_fd+1, &fd_sets,NULL,NULL,NULL); // 读文件描述符集被 select
if(events < 0){
// 调用失败
break;
}else if(events == 0){
// 超时
continues;
}else if(events){
if(FD_ISSET(socket_fd, &fd_sets)){
// socket_fd 发生了变动,说明来了新的连接,可以进行 accept

for(int i=0; FD_SIZE; i++){
if(accept_fds[i] == -1){
curpos = i;
break; // 找到空槽
}
}
socklen_t addr_len = sizeof(struct sockaddr);
accept_fd = accept(socket_fd,
(struct sockaddr *)&remoteaddr,
&addr_len); //创建一个accept_fd
flags = fcntl(accept_fd, F_GETFL, 0); // 获取 socket_fd 的 flag
fcntl(accept_fd, F_SETFL, flags | O_NONBLOCK); // 设置为非阻塞
accept_fd[curpos]=accept_fd; // 插入槽中
}
// 触发的是数据事件
for(int j=0; j < FD_SIZE; j++ ){
if( (accept_fds[j] != -1) && FD_ISSET(accept_fds[j], &fd_sets)){
//有事件时,读数据
printf("accept event :%d, accept_fd: %d\n",j, accept_fds[j]);
char in_buf[MESSAGE_SIZE];
memset(in_buf, 0, MESSAGE_SIZE);
int ret = recv(accept_fds[j], &in_buf, MESSAGE_SIZE, 0);
if(ret == 0){
close(accept_fds[j]);
accept_fds[j] = -1;
}

printf( "receive message:%s\n", in_buf );
send(accept_fds[j], (void*)in_buf, MESSAGE_SIZE, 0);
}
}
}
}
// close_socket
...

epoll 实现

特点:

  1. 没有文件描述符的限制,数量远远超过 1024。
  2. 时间复杂度 O(1),不受文件描述符数量而影响效率。
  3. epoll 经过内核级优化。
  4. 触发模式:
    1. 水平触发Level trigger,默认,如果数据没有处理完,下次过来时会告知。 select 也属于水平触发的情况。一般用于比较重要的场合。
    2. 边缘触发edge trigger,效率更高,但开发难度高。仅触发一次,不会管你有无处理完所有数据。

重要 api:

api int epoll_create() int epoll_ctl(epfd,op,fd,struct epoll_event *event) int epoll_wait(epfd,events, maxevents,timeout)
创建一个 epoll 的文件描述符 向 epoll 添加事件fd 等待事件

tips:

1. epfd:由 epoll_create 创建出的 fd
 2. events:发生改变的事件数组
 3. maxevents:每次返回时最大返回的事件数量
 4. timeout:超时时间,没有事件时,会等待到timeout 后停止 epoll,然后再来一轮 ,500 毫秒即可

epoll 的事件:

  1. EPOLLET:可以用来设置成边缘触发模式 。
  2. EPOLLIN:数据来的事件。
  3. EPOLLOUT:数据往外写的事件。
  4. EPOLLPRI:出现问题、中断的事件。
  5. EPOLLERR:读写出现问题的事件。
  6. EPOLLHUP:挂起的事件。

epoll_ctl 相关操作:

  1. EPOLL_CTL_ADD:将文件描述符 add 到 epoll 里
  2. EPOLL_CTL_MOD: 修改 epoll 里的文件描述符
  3. EPOLL_CTL_DEL: 删除 epoll 里的文件描述符

epoll 重要结构体

1
2
3
4
5
6
7
8
9
10
11
typedef union epoll_data{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;

struct epoll_event{
uint32_t events;
epoll_data_t data; //用户数据
}

epoll demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 在 tcp_server.c 的基础上进行改写

struct epoll_event ev, events[MAX_EVENTS]; // MAX_EVENTS可以设为 20,同时刻的并发数
// create tcp socket
...
flags = fcntl(socket_fd, F_GETFL, 0); // 获取 socket_fd 的 flag
fcntl(socket_fd, F_SETFL, flags | O_NONBLOCK); // 设置为非阻塞

epoll_fd = epoll_create(256);
ev.events = EPOLLIN; //监听一般设置为水平触发
ev.fd = socket_fd;
epoll_ctl(epoll_fd,EPOLL_CTL_ADD, socket_fd, &ev);

for(;;){
// epoll_wait用于等待IO事件。如果当前没有可用的事件,这个函数会阻塞调用线程。
event_number = epoll_wait(epoll_fd, events, MAX_EVENTS, TIMEOUT);
for(int i = 0;i<event_number; i++){
if(events[i].data.fd == socket_fd){
//监听的 socket事件,所以需要创建连接
int addr_len = sizeof(struct sockaddr_in);
accept_fd = accept(socket_fd,
(struct sockaddr *)&remoteaddr,
&addr_len); //创建一个accept_fd
flags = fcntl(accept_fd, F_GETFL, 0);
fcntl(accept_fd, F_SETFL, flags | O_NONBLOCK); // 设置为非阻塞
ev.events = EPOLLIN | EPOLLET; //边缘触发
ev.data.fd = accept_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_fd, &ev); // accept_fd 交由 epoll 托管
} else if(events[i].events & EPOLLIN){
do{
memset(in_buf, 0, MESSAGE_SIZE);
int ret = recv(events[i,.data.fd, (void*)in_buf, MESSAGE_SIZE, 0);
if(ret == 0){
close(events[i].data.fd);
}
if(ret == MESSAGE_LEN){ // 缓冲区满了
// 进一步处理
}
}while(ret < -1 && errno == EINTR);
if(ret < 0){
switch(errno){
case EAGAIN: // 数据没了,等下一次再传数据
break;
default:
break;
}
}
} else if(写操作){...略}
}
}

close(socket_fd);
return 0;

epoll + fork

实现demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 在执行 epoll 前(即上文 for 死循环前)执行:
for(int i = 0;i<MAX_PROCESS;i++){ //4 核 cpu 时,MAX_PROCESS可以设为4
if(pid != 0){
pid = fork(); // 创建预设的子进程
}
}
if(pid == 0){
//子进程做的事,一直包含到 return0 之前的“epoll执行”部分
// epoll_wait
...
//accept_fd
...
//recv
...
}else{
// 父进程
do{
pid = waitpid(-1,NULL,0); // 等待所有子进程结束
}while(pid != -1);
}
return 0;

libevent

重要函数:

  1. event_base_new:创建实例,并初始化

  2. event_base_dispatch: 事件触发器

  3. event_new: 创建 event 队列,还有event_add、event_del、event_free 等都属于较底层 api。

  4. evconnlistener_new_bind: 综合了socket监听和触发的功能,属于较高级的 api

1
2
3
4
5
6
7
8
9
10
11
12
typedef void (*evconnlistener_cb) (struct evconnlistener *,
evutil_socket_t,
struct sockaddr *,
int socklen, void *);
struct evconnlistener *
evconnlistener_new_bind(struct event_base *base,
evconnlistener_cb cb,
void *ptr,
unsigned flags,
int backlog,
const struct sockaddr *sa,
int socklen);

tips:

  1. event_base :通过event_base_new生成。
  2. evconnlistener_cb:是一种回调函数,当有连接进来时,会触发这个回调函数
  3. *ptr 是一个指针,指向 evconnlistener_cb函数的参数
  4. flags:事件的标识,libevent 提供有重复触发。。。各种标识。
  5. backlog:socket 相关,相当于排队请求的。。。(不明)
  6. sockaddr:服务请求的地址,包括 ip 和 port
  7. socklen:socket 的长度

libevent demo(使用高级 api):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 初始化 event_base
// 监听&触发,使用evconnlistener_new_bind
// 开始循环,使用event_base_dispatch

int main(int argc, char **argv){
struct event_base *base;
struct evconnlistener *listener;
struct sockaddr_in sin;
int port = 8111;

base = event_base_new(); /* 初始化event_base */
if (!base) {
puts("Couldn't open event base");
return 1;
}

/*初始化绑定地址*/
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);

/* 初始化evconnlistener(绑定地址、设置回调函数以及连接属性) */
listener = evconnlistener_new_bind(base, accept_conn_cb, NULL, LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,(struct sockaddr*)&sin, sizeof(sin));
if (!listener) {
perror("Couldn't create listener");
return 1;
}

/* 设置Listen错误回调函数 */
evconnlistener_set_error_cb(listener, accept_error_cb);

/* 开始accept进入循环 */
event_base_dispatch(base);
return 0;
}

以上涉及了accept_conn_cbaccept_error_cb这些回调函数,需要另外实现。

接收数据事件的回调函数accept_conn_cb:

1
2
3
4
5
6
7
8
9
10
// 举例:接收数据事件的回调函数:
static void accept_conn_cb(struct evconnlistener *listener,evutil_socket_t fd, struct sockaddr *address, int socklen,void *ctx){
/* 初始化一个bufferevent用于数据的写入和读取,首先需要从Listerner中获取event_base */
struct event_base *base = evconnlistener_get_base(listener);
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
/*设置buferevent的回调函数,这里设置了读和事件的回调函数*/
bufferevent_setcb(bev, echo_read_cb, NULL, echo_event_cb, NULL);
/* 启用该bufevent写和读 */
bufferevent_enable(bev, EV_READ|EV_WRITE);
}

以上又涉及了echo_read_cb,echo_event_cb等回调函数,此处略去不表。

-------------The End-------------