首页 > C/C++ > 从头编写高性能服务程序9-多进程非阻塞epoll-prefork-hook

从头编写高性能服务程序9-多进程非阻塞epoll-prefork-hook

整个基础结构已经基本确定了
接下来做一些细节工作
首先把一些函数抽取出来.
例如prefork独立出来.socket->bind->listen独立出来

这里我们引入一个新的思路
原先由统一的函数在epoll_wait之后对events里面的fd进行处理
但是每个fd可能需要处理的方式都不同.
怎么样针对不同的fd来调用特定的函数呢?

首先在epoll_event结构中有data成员
而data的定义如下

typedef union epoll_data {
                void *ptr;
                int fd;
                __uint32_t u32;
                __uint64_t u64;
        } epoll_data_t;

        struct epoll_event {
                __uint32_t events;      /* Epoll events */
                epoll_data_t data;      /* User data variable */
        };

可见既可以在events里面放data.fd
也可以使用data.ptr来指向一个指针
当fd有消息时内核将对应的ev变量塞入events数组的时候
如果我们只是用fd来指向注册的,那么获取数据的时候只能得到对应的fd
这样使用什么函数来处理这个fd就需要另行判断

那么如果使用ptr来指向一个结构
而结构内保存了fd以及处理这个fd所使用的函数指针
那当我们得到events数组内的事件时
就可以直接调用ptr指向的函数指针了.
这就类似Nginx中的hook函数.
在Nginx中几乎任何一种事件都会绑定其处理函数
而由模块实现距离的函数,然后在hook上去.

那么下面的代码我们就模拟这个方法:
我们建立一个数据结构来保存每个fd以及对应的处理函数

struct event_handle{
    int fd;
    int (* handle)(int fd);
};

handle_hook是我们为每个fd注册的处理函数
当accept获得新的accept_fd之后
我们使用

ev_handles[accept_handles].handle = handle_hook

来将对应的函数注册到对应的events内
在fd得到通知的时候
使用

(*current_handle)(current_fd)

来进行处理

下载: code.txt
  1. #include <sys/socket.h>
  2. #include <sys/wait.h>
  3. #include <netinet/in.h>
  4. #include <sys/epoll.h>
  5. #include <sys/sendfile.h> 
  6. #include <unistd.h>
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <string.h>
  10. #include <strings.h>
  11. #include <fcntl.h>
  12.  
  13. int create_listen_fd(int port){
  14.     int listen_fd;
  15.     struct sockaddr_in my_addr;
  16.     if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
  17.         perror("create socket error");
  18.         exit(1);
  19.     }
  20.     int flag;
  21.     if (setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR
  22.     ,(char *)&flag,sizeof(flag)) == -1){
  23.         perror("setsockopt error");
  24.     }
  25.     int flags = fcntl(listen_fd, F_GETFL, 0);
  26.     fcntl(listen_fd, F_SETFL, flags|O_NONBLOCK);
  27.     my_addr.sin_family = AF_INET;
  28.     my_addr.sin_port = htons(port);
  29.     my_addr.sin_addr.s_addr = INADDR_ANY;
  30.     bzero(&(my_addr.sin_zero), 8);
  31.     if (bind(listen_fd, (struct sockaddr *)&my_addr,
  32.     sizeof(struct sockaddr_in)) == -1) {
  33.         perror("bind error");
  34.         exit(1);
  35.     }
  36.     if (listen(listen_fd,1) == -1){
  37.         perror("listen error");
  38.         exit(1);
  39.     }
  40.     return listen_fd;
  41. }
  42.  
  43. int create_accept_fd(int listen_fd){
  44.     int addr_len = sizeof( struct sockaddr_in );
  45.     struct sockaddr_in remote_addr;
  46.     int accept_fd = accept( listen_fd,
  47.         (struct sockaddr *)&remote_addr, &addr_len );
  48.     int flags = fcntl(accept_fd, F_GETFL, 0);
  49.     fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  50.     return accept_fd;
  51. }
  52.  
  53. int fork_process(int process_num){
  54.     int i;
  55.     int pid=-1;
  56.     for(i = 0; i < process_num; i++){
  57.         if(pid != 0){
  58.             pid = fork();
  59.         }
  60.     }
  61.     return pid;
  62. }
  63.  
  64. int handle_normal(int socket_fd){
  65.     char in_buf[1024];
  66.     memset(in_buf, 0, 1024);
  67.     int recv_num = recv( socket_fd, &in_buf, 1024, 0 );
  68.     if( recv_num ==0 ){
  69.         close(socket_fd);
  70.         printf("ProcessID:%d,EPOLLIN,fd:%d,closed\n", getpid(), socket_fd);
  71.     }
  72.     else{
  73.         printf("ProcessID:%d,EPOLLIN,fd:%d,recv:%s\n", getpid(), socket_fd, in_buf);
  74.     }
  75.     return recv_num;
  76. }
  77.  
  78. int handle_hook(int socket_fd){
  79.     char in_buf[1024];
  80.     memset(in_buf, 0, 1024);
  81.     int recv_num = recv( socket_fd, &in_buf, 1024, 0 );
  82.     if( recv_num ==0 ){
  83.         close(socket_fd);
  84.         printf("ProcessID:%d,EPOLLIN,fd:%d,closed\n", getpid(), socket_fd);
  85.     }
  86.     else{
  87.         printf("ProcessID:%d,EPOLLIN,fd:%d,recv_num:%d;recv:", getpid(), socket_fd, recv_num);
  88.         for (int i = 0; i<recv_num; i++){
  89.         printf("%02x ",in_buf[i]);
  90.         }
  91.         printf("\n");
  92.     }
  93.     return recv_num;
  94. }
  95.  
  96. struct event_handle{
  97.     int fd;
  98.     int (* handle)(int fd);
  99. };
  100. typedef int (* EVENT_HANDLE)(int);
  101. typedef struct event_handle * EH;
  102.  
  103. int main(){
  104.     int listen_fd = create_listen_fd(3389);
  105.     int pid = fork_process(3);
  106.     if(pid == 0){
  107.         int accept_handles = 0;
  108.         struct epoll_event ev,events[20];
  109.         int epfd = epoll_create(256);
  110.         int ev_s = 0;
  111.        
  112.         ev.data.fd = listen_fd;
  113.         ev.events = EPOLLIN|EPOLLET;
  114.         epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
  115.         struct event_handle ev_handles[256];
  116.         for(;;){
  117.             ev_s = epoll_wait( epfd,events, 20, 500 );
  118.             int i = 0;
  119.             for(i = 0; i<ev_s; i++){
  120.                 if(events[i].data.fd == listen_fd){
  121.                     int max_process_accept = 3;
  122.                     if(accept_handles < max_process_accept){
  123.                         accept_handles++;
  124.                         int accept_fd = create_accept_fd(listen_fd);
  125.                         ev_handles[accept_handles].fd = accept_fd;
  126.                         ev_handles[accept_handles].handle = handle_hook;
  127.                         ev.data.ptr = &ev_handles[accept_handles];
  128.                         ev.events = EPOLLIN|EPOLLET;
  129.                         epoll_ctl(epfd,EPOLL_CTL_ADD,accept_fd,&ev);
  130.                         printf("ProcessID:%d,EPOLLIN,fd:%d,accept:%d\n", getpid(), listen_fd, accept_fd);
  131.                     }
  132.                 }
  133.                 else if(events[i].events&EPOLLIN){
  134.                     EVENT_HANDLE current_handle = ((EH)(events[i].data.ptr))->handle;
  135.                     int current_fd = ((EH)(events[i].data.ptr))->fd;
  136.                     if( (*current_handle)(current_fd)  == 0){ 
  137.                         accept_handles--;
  138.                     }
  139.                 }
  140.                 else if(events[i].events&EPOLLOUT){
  141.                     //need add write event process
  142.                 }
  143.             }
  144.         }
  145.     }
  146.     else{
  147.         //manager the process
  148.         int child_process_status;
  149.         wait(&child_process_status);
  150.     }
  151.    
  152.     return 0;
  153. }
分类: C/C++ 标签:
  1. hoterran
    2010年7月4日18:26 | #1

    epoll_data里不是有fd了么?handle还fd干嘛?

    current_fd = events[i].data.fd 即可

  1. 本文目前尚无任何 trackbacks 和 pingbacks.