内存共享实现聊天室程序(附带一个问题求解)
聊天室程序中至少要求每个用户的发言能立即呈现给其它用户,为了提高效率,每个用户连接在服务端都对应一个子进程处理该用户连接。所有用户的发言数据记录在一个用户共享内存中,假设A用户发言了那么共享内存中某段数据t对应A的发言数据,用户B对应的子进程是pid_b处理用户B,那么pid_b只要到到共享内存位置t读取A的发言数据并发送给B,则聊天室逻辑就成立了。为了达到该设计需求,服务端主进程监听端口遇见有用户连接请求就fork一个子进程处理该用户连接。子进程收到其对应的用户发言数据了就通过管道告诉主进程我(这个子进程)有话要说,此时主进程将这些话传送给其它子进程要求这些子进程将这些话发送给它们对用的客户。而这些发言数据通过共享内存实现只需要传递些指针即可,且每个子进程只需要向其对应的位置写数据所以不会出现交叉写的竞态。
服务端程序:(文章最后有个问题希望有人能指教)
#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <signal.h>#include <sys/wait.h>#include <sys/mman.h>#include <sys/stat.h>#include <fcntl.h>#define USER_LIMIT 5//最大客户数#define BUFFER_SIZE 1024//缓冲区#define FD_LIMIT 65535//最大支持文件数目#define MAX_EVENT_NUMBER 1024//最大事件数#define PROCESS_LIMIT 65536//最大子进程数struct client_data//客户数据{ sockaddr_in address;//客户端地址 int connfd;//客户连接描述符 pid_t pid;//处理该连接的子进程号 int pipefd[2];//管道描述符用户主进程向子进程之间传递数据是socketpair全双工管道,pipefd[0]用于主进程向子进程写入数据,pipefd[1]用于子进程监听主进程是否有数据发送到来};static const char* shm_name = "/my_shm";int sig_pipefd[2];//传递信号值的管道(用于将信号事件与IO事件统一监听)int epollfd;//事件表描述符int listenfd;//服务端监听端口int shmfd;//共享内存标示符char* share_mem = 0;//共享内存地址client_data* users = 0;//客户数据数组int* sub_process = 0;//每个子进程pid对应处理的那个客户编号,采用pid作为数组索引下标得到客户编号int user_count = 0;//当前连接用户bool stop_child = false;//终止子进程int setnonblocking( int fd )//将文件描述符设置为非阻塞{ int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option;}void addfd( int epollfd, int fd )//向注册表添加新的可读事件{ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd );}void sig_handler( int sig )//信号处理函数{ int save_errno = errno; int msg = sig; send( sig_pipefd[1], ( char* )&msg, 1, 0 );//通过管道sig_pipefd向主进程发送信号值 errno = save_errno;}void addsig( int sig, void(*handler)(int), bool restart = true )//安装信号{ struct sigaction sa; memset( &sa, '\0', sizeof( sa ) ); sa.sa_handler = handler; if( restart ) { sa.sa_flags |= SA_RESTART;//是否重启由于信号中断的系统调用 } sigfillset( &sa.sa_mask ); assert( sigaction( sig, &sa, NULL ) != -1 );}void del_resource()//清理资源{ close( sig_pipefd[0] ); close( sig_pipefd[1] ); close( listenfd ); close( epollfd ); shm_unlink( shm_name ); delete [] users; delete [] sub_process;}void child_term_handler( int sig )//设置终止子进程标志{ stop_child = true;}int run_child( int idx, client_data* users, char* share_mem )//子进程运行逻辑,idx是子进程处理的客户连接描述符,users是所有用户数据数组名,share_mem是共享内存地址{ epoll_event events[ MAX_EVENT_NUMBER ]; int child_epollfd = epoll_create( 5 ); assert( child_epollfd != -1 ); int connfd = users[idx].connfd;//获取该子进程对应处理的那个客户连接描述符 addfd( child_epollfd, connfd );//监听客户连接可读事件(客户端发送数据到来) int pipefd = users[idx].pipefd[1];//获取该连接上子进程与主进程通信管道的读端 addfd( child_epollfd, pipefd );//监听管道可读事件即主进程向子进程发送数据(这个数据是个子进程号pid是处理某个客户连接子进程pid意味着该pid对应的客户发言了,其它客户(本子进程处理的客户就要接收发言)该接收这个发言) int ret; addsig( SIGTERM, child_term_handler, false );//子进程终止信号安装,就是将子进程终止标志stop_child置为真 while( !stop_child ) { int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );//无限期等待客户连接是否有数据发送到来和主进程是否通过管道发送pid来 if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) )//若该子进程对应的客户连接有数据发送到来 { memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE ); ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );//接收 if( ret < 0 ) { if( errno != EAGAIN )//非阻塞EAGAIN非网络出错而是设备当前不可用即数据读完了 { stop_child = true; } } else if( ret == 0 )//关闭连接 { stop_child = true; } else { send( pipefd, ( char* )&idx, sizeof( idx ), 0 );//通过全双工管道向主进程通知该子进程处理的客户有数据发送到来 } } else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) )//若管道可读则是父进程通知子进程其它某个用户发言了需要将该发言数据发送给子进程对应的客户 { int client = 0; ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );//获取那个用户发言了 if( ret < 0 ) { if( errno != EAGAIN ) { stop_child = true; } } else if( ret == 0 )// { stop_child = true; } else { send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );//读取client那个发言用户的数据发送给该子进程对应的客户 } } else { continue; } } } close( connfd ); close( pipefd ); close( child_epollfd ); return 0;}int main( int argc, char* argv[] ){ if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); user_count = 0; users = new client_data [ USER_LIMIT+1 ];//用户数组 sub_process = new int [ PROCESS_LIMIT ];//子进程对应的客户编号,便于通过子进程pid作为下标索引到子进程处理的客户连接描述符 for( int i = 0; i < PROCESS_LIMIT; ++i ) { sub_process[i] = -1;//初始化为不可能的描述符号 } epoll_event events[ MAX_EVENT_NUMBER ]; epollfd = epoll_create( 5 );//创建主进程事件表 assert( epollfd != -1 ); addfd( epollfd, listenfd );//将监听端口加入事件表用于与客户端建立连接 ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );//创建管道用于信号处理函数将信号值传给主进程 assert( ret != -1 ); setnonblocking( sig_pipefd[1] ); addfd( epollfd, sig_pipefd[0] ); addsig( SIGCHLD, sig_handler );//注册信号回调函数 addsig( SIGTERM, sig_handler ); addsig( SIGINT, sig_handler ); addsig( SIGPIPE, SIG_IGN ); bool stop_server = false;//是否停止服务端 bool terminate = false;//终止服务端 shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );//创建一个共享内存对象 assert( shmfd != -1 ); ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE ); //设置共享内存大小 assert( ret != -1 ); share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );//将共享内存映射到share_mem中 assert( share_mem != MAP_FAILED ); close( shmfd );//关闭共享标识符 while( !stop_server ) { int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );//主进程等待客户端连接事件和子进程发送某个用户发言事件 if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd;//获取事件对应的描述符 if( sockfd == listenfd )//监听端口事件表明有客户端请求建立连接 { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); if ( connfd < 0 ) { printf( "errno is: %d\n", errno ); continue; } if( user_count >= USER_LIMIT )//客户连接已满拒绝连接 { const char* info = "too many users\n"; printf( "%s", info ); send( connfd, info, strlen( info ), 0 ); close( connfd ); continue; } users[user_count].address = client_address;//初始化客户连接数据 users[user_count].connfd = connfd; ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );//用于该客户连接对应的子进程与主进程通信 assert( ret != -1 ); pid_t pid = fork(); if( pid < 0 ) { close( connfd ); continue; } else if( pid == 0 )//子进程 { close( epollfd );//关闭继承来的描述符 close( listenfd ); close( users[user_count].pipefd[0] ); close( sig_pipefd[0] ); close( sig_pipefd[1] ); run_child( user_count, users, share_mem );//运行子进程逻辑 munmap( (void*)share_mem, USER_LIMIT * BUFFER_SIZE );//释放共享内存 exit( 0 ); } else { close( connfd );//父进程不需要这个客户连接描述符了 close( users[user_count].pipefd[1] );//关闭pipefd主进程中的写端 addfd( epollfd, users[user_count].pipefd[0] );//将pipefd读端加入事件侦听,若侦听到事件表明某个子进程收到客户发来的数据主进程此时要将该数据发送给其它用户(聊天室所有发言所有用户可见) users[user_count].pid = pid; sub_process[pid] = user_count; user_count++;//用户数增加 } } else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )//信号管道有事件就绪 { int sig; char signals[1024]; ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );//信号值为1B所以每个字节是一个信号 if( ret == -1 ) { continue; } else if( ret == 0 ) { continue; } else { for( int i = 0; i < ret; ++i )//挨个处理信号值 { switch( signals[i] ) { case SIGCHLD: { pid_t pid; int stat; while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ) { int del_user = sub_process[pid]; sub_process[pid] = -1; if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) ) { printf( "the deleted user was not change\n" ); continue; } epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );//删除注册的事件 close( users[del_user].pipefd[0] ); users[del_user] = users[--user_count]; sub_process[users[del_user].pid] = del_user; printf( "child %d exit, now we have %d users\n", del_user, user_count ); } if( terminate && user_count == 0 ) { stop_server = true; } break; } case SIGTERM: case SIGINT: { printf( "kill all the clild now\n" ); //addsig( SIGTERM, SIG_IGN ); //addsig( SIGINT, SIG_IGN ); if( user_count == 0 ) { stop_server = true; break; } for( int i = 0; i < user_count; ++i ) { int pid = users[i].pid; kill( pid, SIGTERM );//杀死子进程 } terminate = true; break; } default: { break; } } } } } else if( events[i].events & EPOLLIN )//某个子进程向父进程发送通知客户有发言 { int child = 0; ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 );//接收子进程号 printf( "read data from child accross pipe\n" ); if( ret == -1 ) { continue; } else if( ret == 0 ) { continue; } else { for( int j = 0; j < user_count; ++j )//除去该子进程child,其余的子进程都要向这些子进程对应的客户发送child对应用户发言的数据 { if( users[j].pipefd[0] != sockfd ) { printf( "send data to child accross pipe\n" ); send( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );//向子进程通知发送数据 } } } } } } del_resource(); return 0;}