首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

一个高在线(可以超过1024)多线程的socket echo server(pthreads 跟 libevent扩展)

2013-10-28 
一个高在线(可以超过1024)多线程的socket echo server(pthreads 和 libevent扩展)研究了3周吧,本来打算用p

一个高在线(可以超过1024)多线程的socket echo server(pthreads 和 libevent扩展)

研究了3周吧,本来打算用pthreads+event扩展的,结果event扩展太原始了,太多函数了,实在不知道怎么在外部随时发送数据给客户端,所以改用libevent,

改用libevent之后花了2个小时就运行起来了。

 

当然并不敢说稳定,而且有个地方用了一个“适应”bug的地方,避免bug

 

两个扩展都从pecl.php.net下载就可以了,

安装,不想写了,16:25了还没吃早饭 午饭

上代码,欢迎讨论:

 

我的QQ群:
PHPer&Webgame&移动开发,群号:95303036

 

<?phpclass statWorker extends Worker {public function __construct() {}public function run(){while(!$this->isShutdown()){sleep(3);}}}class sendAllWorker extends Worker {public function __construct(&$_logicWorker) {$this->logicWorker = $_logicWorker;}public function run(){}}class data extends Stackable{public function __construct() {echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);}public function run(){echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);}}class logicWorker extends Worker{public $data;public function __construct(&$_data) {echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);$this->data = $_data;}public function run(){$data = $this->data;$count=0;while(1){if($arr =(array) $this->shift()){echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);if(!$arr) continue;if(isset($arr[0])){echo $arr[0].' from '.$arr[1].chr(10);//$data[] = array('You say:'.$arr[0], $arr[1]%2==0?0:$arr[1]);$data[] = array('You say:'.$arr[0], 0);}if((++$count)%1000==0){printf("Work Mermory used %.3fMB RAM, time: %3f===> %d \n",  memory_get_peak_usage(true)/1048576, (microtime(true) - $stime), $count);$stime = microtime(true);}}else usleep(100000);}}}class listenerWorkerRunner extends Worker {public function __construct(&$_listener){$this->listener = $_listener;}public function run(){$this->listener->dispatch();}}class senderWorker extends Worker {public function __construct(&$_data, &$clients) {$this->data = $_data;$this->clients = $clients;}public function run(){echo __FILE__.'-'.__LINE__.'<br/>'.chr(10);$data = $this->data;$clients = $this->clients;while(1){$arr= $data->shift();if($arr){$id = $arr[1];$msg = $arr[0];echo '===================================senderWorker GOT: '.trim($msg).' from '.$id.chr(10);//var_dump($clients);if(is_array($id))foreach($id as $i)fwrite($clients[$i], $msg);else if($id===0){//必须这样,否则会报错,$clients的最后一个成员会变成 resource(2) of type (unknown) $_clients = (array) $clients;var_dump($_clients);foreach($_clients as $i=>$c)fwrite($clients[$i], $msg);}elseif($clients[$id]) fwrite($clients[$id], $msg);}else{//echo 'senderWorker IS RUNNING...'.chr(10);usleep(100000);}}}}class epoll{    private static $socket;    public static $connections;    private static $buffers;private static $worker;private static $clients;    function epoll($port, &$worker, &$clients){self::$clients = $clients;self::$worker = $worker;        if($port<1024) die("Port must be a number which bigger than 1024\n");        self::$socket = stream_socket_server ('tcp://0.0.0.0:'.$port, $errno, $errstr);        stream_set_blocking(self::$socket, 0);        $base = event_base_new();        $event = event_new();        event_set($event, self::$socket , EV_READ | EV_PERSIST, 'epoll::ev_accept', $base);        event_base_set($event, $base);        event_add($event);        event_base_loop($base);        self::$connections = array();        self::$buffers = array();    }    public static function ev_accept($socket, $flag, $base) {        static $id = 0;        $connection = stream_socket_accept($socket);        stream_set_blocking($connection, 0);        $id += 1;        $buffer = event_buffer_new($connection, 'epoll::ev_read', NULL, 'epoll::ev_error', $id);        event_buffer_base_set($buffer, $base);        event_buffer_timeout_set($buffer, 30, 30);        event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);        event_buffer_priority_set($buffer, 60);//超时自动断开时间        event_buffer_enable($buffer, EV_READ | EV_PERSIST);        // we need to save both buffer and connection outside        self::$connections[$id] = $connection;self::$clients[$id] = $connection;        self::$buffers[$id] = $buffer;        echo 'In-> $id='.$id.',$connection='.$connection."\n";    }    public static function ev_error($buffer, $error, $id) {        event_buffer_disable(self::$buffers[$id], EV_READ | EV_WRITE);        event_buffer_free(self::$buffers[$id]);        echo 'Ot-> $id='.$id."\n";        fclose(self::$connections[$id]);        unset(self::$buffers[$id], self::$connections[$id], self::$clients[$id]);    }    public static function ev_read($buffer, $id) {        static $ct=0;        while ($read = event_buffer_read($buffer, 256))        {            $ct+=strlen($read);            if(strpos($read,'ct')!==false) echo 'Ct=>'.count(self::$connections).'\n';self::$worker[] = array($read, $id);        }    }//--------------------------------------------//will not work!!public static function sendMsg($msg, $id){if(is_array($id))foreach($id as $i)fwrite(self::$clients[$i], $msg);else if($id===0)foreach($id as $i)fwrite(self::$clients[$i], $msg);elsefwrite(self::$clients[$id], $msg);}}//暂未使用,计划用于统计在线之类的$_statWorker = new statWorker();$_statWorker->start();/*$_logicWorker 处理数据后通过$data传给$_senderWorker下发因为worker stackable 似乎都不能共享变量(他们接收到的参数——通过worker[]=...,stackable=[],worker->stack几个方式收到的数据都是复制的,并且资源复制不成功,是null,通过public方法设定的参数同样是复制的,只有construct构造函数接收的参数才可以以引用的)我了解到的是要给worker共享数据,只能通过给其构造函数传递一个stackable(传worker)类型的变量*/$data = new data();//保存连接的客户端,epoll::sendMsg不会正常工作,在里面获取不到self::$clients,也只能通过向构造函数传递stackable变量的方法实现;是为什么我不知道,可能我的用法不对。$clients = new data();/*处理接收上来的数据,比如查看背包,收取邮件,或者战斗 ,行走,但里面不要进行任何io操作——读写数据库甚至是memcache都不可以logicWorker只做计算型工作,并且是主要的工作线程那么你可能要问,用户的数据从哪里来?如何保存?——答案是用别的线程,至少还需要2个线程:登陆线程,从数据库读数据;存档线程,把数据存放到数据。这样工作按线程分开了,能够有一定并发数,是多少,不知道,我目前的java程序就是这个方式,单服3000+在线,负载不到1,cpu 20%左右。php可能要差一点儿当然这个pthreads应该可以用很不稳定来形容,很容易coredump……,不过怎么都算是个进步吧,因为真的实现了多线程。*/$_logicWorker = new logicWorker($data);/*负责下发数据,其实,可以由$_logicWorker来执行,我这里主要是想测试数据共享,*/$_senderWorker = new senderWorker($data, $clients);//启动线程和监听$_logicWorker->start();$_senderWorker->start();new epoll(9808, $_logicWorker, $clients);echo 'Running ... ----------> ';


 

热点排行