PHP发送JSON POST
/**
* Curl版本
* 使用方法:
* $post_string = "app=request&version=beta";
* request_by_curl('http://www.waitalone.cn/restServer.php', $post_string);
*/
function json_post($url, $data = NULL)
{
$curl = curl_init();
curl_setopt($curl, CURLOPT_URL, $url);
curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, false);
if(!$data){
return 'data is null';
}
if(is_array($data))
{
$data = json_encode($data);
}
curl_setopt($curl, CURLOPT_POST, 1);
curl_setopt($curl, CURLOPT_POSTFIELDS, $data);
curl_setopt($curl, CURLOPT_HEADER, 0);
curl_setopt($curl, CURLOPT_HTTPHEADER,array(
'Content-Type: application/json; charset=utf-8',
'Content-Length:' . strlen($data),
'Cache-Control: no-cache',
'Pragma: no-cache'
));
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
$res = curl_exec($curl);
$errorno = curl_errno($curl);
if ($errorno) {
return $errorno;
}
curl_close($curl);
return $res;
}
PHP接受JSON POST
$data = json_decode(file_get_contents('php://input'), true);
月份:2021年3月
swoole 如何实现 websocket 客户端的功能?
workerman 的 websocket 客服端是这样实现的:(很简单,很容易就使用了,和前端差不多)
http://doc.workerman.net/315306
swoole 如何实现这样相同的功能呢?
看了一下的一些文档,始终无法理解:
https://github.com/matyhtf/framework/blob/master/libs/Swoole/Client/WebSocket.php
https://wiki.swoole.com/wiki/page/p-http_client.html
如果只是获取的话,比较简单:
<?php
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->on('connect', function($cli) {
$cli->send("GET / HTTP/1.1\r\n\r\n");
});
$client->on('receive', function($cli, $data) {
echo "Received: " . $data;
});
$client->on('error', function($cli) {
echo "Connect failed\r\n\r\n";
});
$client->on('close', function($cli) {
echo "Connection close\r\n\r\n";
});
$client->connect('xxx.xxx.xxx.xxx', 1234, true);
但是我这里想要实现的是类似 workerman 那样的,可以一直连接。
使用PHP客户端连接到websocket
这是我一直在使用的代码,已经在不同的论坛上广泛发布.但由于某种原因,我无法让它发挥作用.
任何帮助,将不胜感激.
$host = 'host'; //where is the websocket server
$port = 443; //ssl
$local = "http://www.example.com/"; //url where this script run
$data = '{"id": 2,"command": "server_info"}'; //data to be send
$head = "GET / HTTP/1.1"."\r\n".
"Upgrade: WebSocket"."\r\n".
"Connection: Upgrade"."\r\n".
"Origin: $local"."\r\n".
"Host: $host"."\r\n".
"Content-Length: ".strlen($data)."\r\n"."\r\n";
////WebSocket handshake
$sock = fsockopen($host, $port, $errno, $errstr, 2);
fwrite($sock, $head ) or die('error:'.$errno.':'.$errstr);
$headers = fread($sock, 2000);
fwrite($sock, "\x00$data\xff" ) or die('error:'.$errno.':'.$errstr);
$wsdata = fread($sock, 2000); //receives the data included in the websocket package "\x00DATA\xff"
$retdata = trim($wsdata,"\x00\xff"); //extracts data
////WebSocket handshake
fclose($sock);
echo $retdata;
我可能更喜欢使用现有的websocket客户端库(可能是 https://github.com/gabrielbull/php-websocket-client或 https://github.com/Devristo/phpws/tree/master/src/Devristo/Phpws/Client?)而不是自己滚动,但我至少通过使用以下方式连接:
$head = "GET / HTTP/1.1"."\r\n".
"Host: $host"."\r\n".
"Upgrade: websocket"."\r\n".
"Connection: Upgrade"."\r\n".
"Sec-WebSocket-Key: asdasdaas76da7sd6asd6as7d"."\r\n".
"Sec-WebSocket-Version: 13"."\r\n".
"Content-Length: ".strlen($data)."\r\n"."\r\n";
我的服务器正在使用TLS / SSL,所以我还需要:
$sock = fsockopen('tls://'.$host, $port, $errno, $errstr, 2);
WebSocket\Server
1.7.9增加了内置的WebSocket服务器支持,通过几行PHP代码就可以写出一个异步非阻塞多进程的WebSocket服务器。
$server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
$server->push($frame->fd, "this is server");
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
onRequest回调
WebSocket\Server 继承自 Http\Server
- 设置了
onRequest回调,WebSocket\Server也可以同时作为http服务器 - 未设置
onRequest回调,WebSocket\Server收到http请求后会返回http 400错误页面 - 如果想通过接收
http触发所有websocket的推送,需要注意作用域的问题,面向过程请使用global对WebSocket\Server进行引用,面向对象可以把WebSocket\Server设置成一个成员属性
1、面向过程代码
$server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
$server->push($frame->fd, "this is server");
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
global $server;//调用外部的server
// $server->connections 遍历所有websocket连接用户的fd,给所有用户推送
foreach ($server->connections as $fd) {
// 需要先判断是否是正确的websocket连接,否则有可能会push失败
if ($server->isEstablished($fd)) {
$server->push($fd, $request->get['message']);
}
}
});
$server->start();
2、面向对象代码
class WebsocketTest {
public $server;
public function __construct() {
$this->server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
$this->server->on('open', function (swoole_websocket_server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
});
$this->server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
$server->push($frame->fd, "this is server");
});
$this->server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$this->server->on('request', function ($request, $response) {
// 接收http请求从get获取message参数的值,给用户推送
// $this->server->connections 遍历所有websocket连接用户的fd,给所有用户推送
foreach ($this->server->connections as $fd) {
// 需要先判断是否是正确的websocket连接,否则有可能会push失败
if ($this->server->isEstablished($fd)) {
$this->server->push($fd, $request->get['message']);
}
}
});
$this->server->start();
}
}
new WebsocketTest();
客户端
Chrome/Firefox/高版本IE/Safari等浏览器内置了JS语言的WebSocket客户端- 微信小程序开发框架内置的
WebSocket客户端 - 异步的
PHP程序中可以使用Swoole\Http\Client作为WebSocket客户端 apache/php-fpm或其他同步阻塞的PHP程序中可以使用swoole/framework提供的同步WebSocket客户端- 非
WebSocket客户端不能与WebSocket服务器通信
swoole 的websocket 连接
介绍下两种连接websocket服务器的方法
1 通过访问http服务器然后访问我们的html页面连接我们的websouket服务器,这样我们就需要3个文件。1 http.php 2 ws.php 3 ws.html 简单贴下我的图,也是参考官方文档写的
http.php
<?php
/**
- Created by PhpStorm.
- User: xyj
- Date: 18-9-9
- Time: 下午4:31
*/
//实例化
$http_server = new swoole_http_server(‘0.0.0.0’,9501);
//服务器配置
$http_server->set(
[
‘enable_static_handler’ => true,
‘document_root’ => ‘/www/wwwroot/test_project/swoole’,
]
);
$http_server->on(‘request’,function($request ,$response){
//print_r($request->get);
//设置响应头信息
$response->cookie(‘xyj’,’hello’,86400);
//服务器返回信息
$response->end(‘http_server’ . json_encode($request->get));
});
$http_server->start();
ws.php
<?php
/**
- Created by PhpStorm.
- User: xyj
- Date: 18-9-9
- Time: 下午5:02
*/
//创建websocket服务器对象,监听0.0.0.0:9502端口
$ws = new swoole_websocket_server(“0.0.0.0”, 9502);
$ws->set(
[
‘enable_static_handler’ => true,
‘document_root’ => ‘/www/wwwroot/test_project/swoole’,
]
);
//监听WebSocket连接打开事件
$ws->on(‘open’, function ($ws, $request) {
var_dump($request->fd, $request->get, $request->server);
$ws->push($request->fd, “hello, welcome\n”);
});
//监听WebSocket消息事件
$ws->on(‘message’, function ($ws, $frame) {
echo “Message: {$frame->data}\n”;
$ws->push($frame->fd, “server: {$frame->data}”);
});
//监听WebSocket连接关闭事件
$ws->on(‘close’, function ($ws, $fd) {
echo “client-{$fd} is closed\n”;
});
$ws->start();
ws.html
Document
hello swoole 测试
如果是在本地测试环境运行那么wsServer = ‘ws://127.0.0.1:9502’,如果是外网 那么127.0.0.1 就要改成你浏览器上所访问的地址,这里的端口必须要和你websocket 服务器端口一致否则无法访问的
准备工作做好了那么我们就开始访问下websocket,首先在终端 运行http.php 和 ws.php, 正常情况下是这样如果遇到错误一般常见的就是端口被占用,如果是swoole扩展没装好或者是语法错误,那么请在仔细的读啃下官方文档。
如果是端口被占用我们可以使用 lsof -i:9501 命令查看 端口信息, 在kill之前请先认真的看下 该端口是否是重要端口,不要盲目的kill 。
还有一种就是我们开启过http.php 在关闭后 可能被占用的端口没有及时的释放,当我们再次php http.php的时候他也会抛出端口被占用的错误,这里我们可以用 这个命令 netstat -ntlp 来查看tcp监听的端口,其父进程 kill 然后在运行就ok了,继续使用这个命令来查看tcp端口如图
走到这里我们的两个服务器都已经开启了 。我们就可以通过浏览器访问了,在浏览器上输入地址:9501 , 此时这里的端口是要个http服务器的端口一致而不是websocket的端口,访问成功会出现如下图
————————————————
版权声明:本文为CSDN博主「君子有攸往」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/xiao199306/article/details/82621140
swoole的process模块创建和使用子进程
Version:1.0 StartHTML:000000210 EndHTML:000047154 StartFragment:000003232 EndFragment:000047098 StartSelection:000003301 EndSelection:000047086 SourceURL:https://www.cnblogs.com/jkko123/p/10918056.html swoole的process模块创建和使用子进程 – 怀素真 – 博客园
swoole中为我们提供了一个进程管理模块 Process,替换PHP的 pcntl 扩展,方便我们创建进程,管理进程,和进程间的通信。
swoole提供了2种进程间的通信:
1、基于 unix socket 的管道 pipe。
2、基于 sysvmsg 的消息队列。
我们可以通过 new swoole_process() 快速的创建一个进程,默认会创建一个 SOCK_DGRAM 类型的管道,用于进程间的通信,当然可以设置成其他类型,也可以不创建。
一、通过同步阻塞管道进行进程间通信?
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 | <?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {//创建子进程//默认为每个子进程创建一个管道,如果不想创建设置$pipe_type参数为false//注意管道默认是同步阻塞,半双工,如果读取不到数据就会阻塞$worker = new swoole_process(function (swoole_process $worker) {//注意,如果主进程中不写数据write(),那么子进程这里read()就会阻塞$task = json_decode($worker->read(), true); //进行计算任务$tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;} echo '子进程 PID : ', $worker->pid, ' 计算 ', $task['start'], ' - ', $task['end'], ' 结果 : ', $tmp, PHP_EOL;//往管道中写入计算的结果$worker->write($tmp);//子进程退出$worker->exit();}); //保存子进程$worker_process[$i] = $worker; //启动子进程$worker->start();} //往每个子进程管道中投递任务for ($i = 0; $i < $worker_process_nums; $i++) {$worker_process[$i]->write(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),]));} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}}); |
二、通过 swoole_event_add 将管道设为异步,来进行通信?
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 | <?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {$worker = new swoole_process(function ($worker) {//在子进程中给管道添加事件监听//底层会自动将该管道设置为非阻塞模式//参数二,是可读事件回调函数,表示管道可以读了swoole_event_add($worker->pipe, function ($pipe) use ($worker) {$task = json_decode($worker->read(), true); $tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;}echo "子进程 : {$worker->pid} 计算 {$task['start']} - {$task['end']} \n";//子进程把计算的结果,写入管道$worker->write($tmp);//注意,swoole_event_add与swoole_event_del要成对使用swoole_event_del($worker->pipe);//退出子进程$worker->exit();});}); $worker_process[$i] = $worker; //启动子进程$worker->start();} for ($i = 0; $i < $worker_process_nums; $i++) {$worker = $worker_process[$i]; $worker->write(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),])); //主进程中,监听子进程管道事件swoole_event_add($worker->pipe, function ($pipe) use ($worker) {$result = $worker->read();echo "子进程 : {$worker->pid} 计算结果 {$result} \n";swoole_event_del($worker->pipe);});} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}}); |
三、使用消息队列来完成进程间通信?
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 | <?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {//注意,这里将参数$pipe_type设为false,表示不创建管道$worker = new swoole_process(function ($worker) {$task = json_decode($worker->pop(), true); $tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;}echo "子进程 : {$worker->pid} 计算 {$task['start']} - {$task['end']} \n";$worker->push($tmp);$worker->exit();}, false, false); //使用消息队列,作为进程间的通信//注意,消息队列是共享的$worker->useQueue(); $worker_process[$i] = $worker; //启动子进程$worker->start();} for ($i = 0; $i < $worker_process_nums; $i++) {//只需用一个子进程发送消息即可,因为消息队列是共享的$worker_process[0]->push(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),]));} //注意,这里要暂停,防止加入队列的任务,立刻被主进程读出来。sleep(1); for ($i = 0; $i < $worker_process_nums; $i++) {$result = $worker_process[0]->pop();echo "计算结果 : {$result} \n";} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}}); |
四、进程可以通过 signal 监听信号,和 alarm 设置定时器。
我们可以在父进程上设置监听信号,当子进程退出时,重新挂起子进程。
也可以设置定时器,通过 swoole_process::kill($pid, 0); 定时检测进程是否存活。?
| 1234567891011121314151617181920212223 | <?php //每隔1秒触发SIGALAM信号//注意,alarm不能和Timer同时使用swoole_process::alarm(1000 * 1000, 0); swoole_process::signal(SIGALRM, function ($signo) {static $cnt = 0;$cnt++;echo "时钟定时信号\n"; if ($cnt > 10) {//清除定时器swoole_process::alarm(-1);}}); swoole_process::signal(SIGINT, function ($signo) {echo "我被ctrl+c了\n"; //退出主进程,不然将一直无法正常退出exit(0);}); |
https://www.cnblogs.com/jkko123/p/10918056.html
版权声明:博主文章,可以不经博主允许随意转载,随意修改,知识是用来传播的。
php+swoole多线程的操作
多个任务同时执行
比如,我们要对已知的用户数据进行判断,是否需要发送邮件和短信,如果需要发送则发送。
不使用多进程时,我们首先判断是否发送邮件,如果需要则发送;然后再判断是否需要发送短信,如果需要则发送。如果发送邮件耗时2s,发送短信耗时2s,那么我们完成任务大概需要4s左右的时间。
如果我们使用多线程的话,可以开两个线程,一个用于处理邮件,一个用于处理短信,则耗时一共需要2s左右,处理时间缩短了一半。
以下是代码实例:

<?php
/**
* Created by PhpStorm.
* User: cyw0413
* Date: 2018/10/20
* Time: 10:37
*/
$info = array(
“sendmail”=>1,
“mailto”=>”12345@qq.com”,
“sendsms”=>1,
“smsto”=>”123456”
);
echo “start:”.date(“Y-m-d H:i:s”).PHP_EOL;
$mail_process = new swoole_process(‘sendMail’,true);
$mail_process->start();
$sms_process = new swoole_process(‘sendSMS’,true);
$sms_process->start();
//主进程输出子进程范围内容
echo $mail_process->read();
echo PHP_EOL;
echo $sms_process->read();
echo PHP_EOL;
echo “end:”.date(“Y-m-d H:i:s”).PHP_EOL;
//并行函数
function sendMail(swoole_process $worker){
global $info;
if($info[‘sendmail’]==1){
sleep(2);
$worker->write(“send mail to “.$info[‘mailto’]);
}
}
function sendSMS(swoole_process $worker){
global $info;
if($info[‘sendmail’]==1){
sleep(2);
$worker->write(“send sms to “.$info[‘smsto’]);
}
}

大任务划分成多个小任务
假设我们现在有一个通过curl抓取网页内容的需求,需要抓取10个网页,url地址通过数组读取,每个curl耗时2s。如果我们通过for循环来抓取这10个网页,需要耗时20s,使用多进程我们可以将任务划分成5份,分别由5个进程执行,每个进程抓取2个url,并发执行,共耗时4s,效率提高5倍。
以下是代码实例

<?php
/**
* Created by PhpStorm.
* User: cyw0413
* Date: 2018/10/20
* Time: 10:51
*/
$url_arr = array();
for ($i=0;$i<10;$i++){
$url_arr[] = “www.baidu.com?wd=”.$i;
}
echo “start:”.date(“Y-m-d H:i:s”).PHP_EOL;
$workers = array();
for ($i=0;$i<5;$i++){
$process = new swoole_process(‘getContents’,true);
$process->start();
$process->write($i);
$workers[] = $process;
}
//主进程数据结果
foreach ($workers as $process){
echo $process->read();
echo PHP_EOL;
}
echo “end:”.date(“Y-m-d H:i:s”).PHP_EOL;
function getContents(swoole_process $worker){
$i = $worker->read();
global $url_arr;
$res1 = execCurl($url_arr[($i*2)]);
$res2 = execCurl($url_arr[($i*2+1)]);
echo $res1.PHP_EOL.$res2;
}
function execCurl($url){
sleep(2);
return “handle “.$url.” finished”;
}

总结
以上两种情况,本质上都是将逻辑上没有先后关系的任务,用多个进程程并发执行,提高效率。
php机制本身不提供多线程的操作,ptcl扩展提供了php操作linux多进程的接口。
Swoole| Swoole 中 Process
Version:1.0 StartHTML:000000201 EndHTML:000101815 StartFragment:000009570 EndFragment:000101777 StartSelection:000009570 EndSelection:000101777 SourceURL:https://www.jianshu.com/p/4b6326cdaaa7 Swoole| Swoole 中 Process – 简书
date: 2018-1-8 20:56:08
title: Swoole| Swoole 中 Process
这篇 blog 折腾了很久才写出来, 问题主要还是在 理解 上. 有时候就是这样,
理解了之后就很简单, 不理解就很难; 知道了就很简单, 不知道往往就很难. 所以 stay hungry stay foolish stay young 真的很重要
本来计划开发 swoft 框架 中的 Process 模块, 所以需要对 swoole 的 Process 模块要有比较深入的了解才行. 不过根据 swoole 官方 wiki 的实践过程中, 一直有未理解的部分. 之前虽然也做过多次 多进程编程, 但是当真正需要进行框架开发的时候, 就会发现以前学到的知识不够全面, 无法指导整体的设计. 好在一直在坚持, 奉上现在理解的程度.
内容一览:
- 进程相关基础操作: fork/exit/kill/wait
- 进程相关高级操作: 主进程退出子进程干完活后也退出; 子进程异常退出主进程自动重启
- 进程间通信(IPC) – 管道(pipe)
- 进程间通信(IPC) – 消息队列(message queue)
- swoole process 模块提供的更多功能
进程相关基础操作
进程是什么: 进程是运行者的程序
先来看看一个最简单的例子:
<?php
echo posix_getpid(); // 获取当前进程的 pid
swoole_set_process_name('swoole process master'); // 修改所在进程的进程名
sleep(100); // 模拟一个持续运行 100s 的程序, 这样就可以在进程中查看到它, 而不是运行完了就结束
通过 ps aux 查看进程:

未设置进程名

设置进程名
再来看看 swoole 中使用子进程的基础操作:
use Swoole\Process;
$process = new Process(function (Process $worker) {
if (Process::kill($worker->pid, 0)) { // kill操作常用来杀死进程, 传入 0 可以用来检测进程是否存在
$worker->exit(); // 退出子进程
}
});
$process->start(); // 启动子进程
Process::wait(); // 回收退出的子进程
new Process(): 通过回调函数来设置子进程将要执行的逻辑$process->start(): 调用fork()系统调用, 来生成子进程Process::kill(): kill操作给进程发送信号, 常用来杀死进程, 传入 0 可以用来检测进程是否存在Process::wait(): 调用wait()系统调用, 回收子进程, 如果不回收, 子进程会编程 僵尸进程, 浪费系统资源$worker->exit(): 子进程主动退出
我在这里有一个疑问:
主进程的生命周期是怎么样的? 子进程的生命周期是怎么样的?
有这样一个疑问也来自于我之前的思维惯性: 理解一个事物时从事物的生命周期进行理解. 结合 进程是运行着的程序 来一起理解:
new Process(): 只有回调函数的逻辑会在进程中执行- 除此之外的代码都是在主进程中执行
进程相关高级操作
- 主进程退出子进程干完活后也退出
- 子进程异常退出主进程自动重启
<?php
use Swoole\Process;
class MyProcess1
{
public $mpid = 0; // master pid, 即当前程序的进程ID
public $works = []; // 记录子进程的 pid
public $maxProcessNum = 1;
public $newIndex = 0;
public function __construct()
{
try {
swoole_set_process_name(__CLASS__. ' : master');
$this->mpid = posix_getpid();
$this->run();
$this->processWait();
} catch (\Exception $e) {
die('Error: '. $e->getMessage());
}
}
public function run()
{
for ($i=0; $i<$this->maxProcessNum; $i++) {
$this->createProcess();
}
}
public function createProcess($index = null)
{
if (is_null($index)) {
$index = $this->newIndex;
$this->newIndex++;
}
$process = new Process(function (Process $worker) use($index) { // 子进程创建后需要执行的函数
swoole_set_process_name(__CLASS__. ": worker $index");
for ($j=0; $j<3; $j++) { // 模拟子进程执行耗时任务
$this->checkMpid($worker);
echo "msg: {$j}\n";
sleep(1);
}
}, false, false); // 不重定向输入输出; 不使用管道
$pid = $process->start();
$this->works[$index] = $pid;
return $pid;
}
// 主进程异常退出, 子进程工作完后退出
public function checkMpid(Process $worker) // demo中使用的引用, 引用表示传的参数可以被改变, 由于传入 $worker 是 \Swoole\Process 对象, 所以不用使用 &
{
if (!Process::kill($this->mpid, 0)) { // 0 可以用来检测进程是否存在
$worker->exit();
$msg = "master process exited, worker {$worker->pid} also quit\n"; // 需要写入到日志中
file_put_contents('process.log', $msg, FILE_APPEND); // todo: 这句话没有执行
}
}
// 重启子进程
public function rebootProcess($pid)
{
$index = array_search($pid, $this->works);
if ($index !== false) {
$newPid = $this->createProcess($index);
echo "rebootProcess: {$index}={$pid}->{$newPid} Done\n";
return;
}
throw new \Exception("rebootProcess error: no pid {$pid}");
}
// 自动重启子进程
public function processWait()
{
while (1) {
if (count($this->works)) {
$ret = Process::wait(); // 子进程退出
if ($ret) {
$this->rebootProcess($ret['pid']);
}
} else {
break;
}
}
}
}
new MyProcess1();
说明以下几点:
- 子进程运行结束后就会退出, 通过
Process::wait()检测到子进程退出信号执行自动重启, 子进程就会一直执行下去 - 关于函数参数传 引用/指针, 一个很好的理解方式是: 参数可以被修改
运行并模拟主进程异常退出:

模拟主进程异常退出

输出
进程间通信(IPC) – 管道(pipe)
管道的几个关键词:
- 半双工: 数据单向流动, 一端只读, 一端只写.
- 同步 vs 异步: 默认为同步阻塞模式, 可以使用
swoole_event_add()添加管道到 swoole 的 event loop 中, 实现异步IO - 管道类型(数据格式):
SOCK_STREAM, 流式, 需要用户自己处理数据的封包/解包;SOCK_DGRAM, 数据报, 每次收发都是一次完整的数据包 (DGRAM/STREAM)
注意, swoole wiki – process->write() 中提到 SOCK_DGRAM 并不会乱序丢包
先来看一个简单的例子, php从shell管道中读取数据:
// get pip data
$fp = fopen('php://stdin', 'r');
if ($fp) {
while ($line = fgets($fp, 4096)) {
echo "php get pip data: ". $line;
}
fclose($fp);
}

从shell管道读取数据
swoole process中的管道很强大, 支持 子进程写, 主进程读 以及 主进程写, 子进程读:
use Swoole\Process;
// 子进程写, 父进程读
$process = new Process(function (Process $worker) {
$worker->write("worker");
});
$process->start();
$msg = $process->read();
echo "from process: $msg", "\n";
// 父进程写, 子进程读
$process = new Process(function (Process $worker) {
$msg = $worker->read();
echo "from master: $msg", "\n";
});
$process->start();
$process->write('master');

使用管道多次读写
注意区分 $worker->write() 和 $process->write(), 之前一直错误的以为这 2 个是相同的, 其实就是把 $process 误以为是子进程, 从而相当于 $process->write() 就是子进程写管道 — 其实这里是主进程内执行的逻辑, 是主进程写数据到管道, 供子进程读取
swoole中其他管道相关操作:
- 异步IO
use Swoole\Process;
use Swoole\Event;
// 异步IO
$process = new Process(function (Process $worker) {
$GLOBALS['worker'] = $worker;
Event::add($worker->pipe, function (int $pipe) { // 使用 swoole_event_add 添加管道到异步IO
/** @var Process $worker */
$worker = $GLOBALS['worker'];
$msg = $worker->read();
echo "from master: $msg \n";
$worker->write("hello master");
sleep(2);
$worker->exit(0);
});
});
$process->start();
$process->write("master msg 1");
$msg = $process->read();
echo "from process: $msg \n";

异步IO
- 设置超时
use Swoole\Process;
// 设置管道超时
$process = new Process(function (Process $worker) {
sleep(5);
});
$process->start();
$process->setTimeout(0.5);
$ret = $process->read();
var_dump($ret);
var_dump(swoole_errno());

管道超时
插播一个趣事, @thinkpc 看完 2017北京PHP开发者年会, 就知道为啥会点赞了
- 关闭管道
// 关闭管道: 默认值0->关闭读写 1->关闭写 2->关闭读
$process->close();
进程间通信(IPC) – 消息队列(message queue)
消息队列:
- 一系列保存在内核中的消息链表
- 有一个 msgKey, 可以通过此访问不同的消息队列
- 有数据大小限制, 默认 8192, 可以通过内核修改
- 阻塞 vs 非阻塞: 阻塞模式下
pop()空消息队列/push()满消息队列会阻塞, 非阻塞模式可以直接返回
swoole 中使用消息队列:
- 通信模式: 默认为争抢模式, 无法将消息投递给指定子进程
- 新建消息队列后, 主进程就可以使用
- 消息队列不可和管道一起使用, 也无法使用 swoole event loop
- 主进程中要调用
wait(), 否则子进程中调用pop()/push()会报错
use Swoole\Process;
$process = new Process(function (Process $worker) {
// $worker->push('worker');
echo "from master: ". $worker->pop(). "\n";
sleep(2);
// $worker->exit();
}, false, false); // 关闭管道
// 参数一为 msgKey, 这里是默认值
// 参数二为 通信模式, 默认值 2 表示争抢模式, 这里还加上了 非阻塞
$process->useQueue(ftok(__FILE__, 1), 2| Process::IPC_NOWAIT);
$process->push('hello1'); // 使用 useQueue 后, 主进程就可以读写消息队列了
$process->push('hello2');
echo "from woker: ". $process->pop(). "\n";
// echo "from woker: ". $process->pop(). "\n";
$process->start(); // 启动子进程
// 消息队列状态
var_dump($process->statQueue());
// 删除队列, 如果不调用则不会在程序结束时清楚数据, 下次使用相同 msgKey 时还可以访问数据
$process->freeQueue();
var_dump(Process::wait()); // 要调用 wait(), 否则子进程中 push()/pop() 会报错

消息队列
swoole process 模块提供的更多功能
swoole_set_process_name(): 修改进程名, 不兼容 macswoole_process->exec(string $execfile, array $args)执行外部程序
参数 $execfile 需要使用可执行文件的绝对路径, 参数 args 为参数数组
// 比如 python test.py 123
swoole_process->exec('/usr/bin/python', ['test.py', 123]);
// 更复杂的例子
swoole_process->exec(('/usr/local/bin/php', ['/var/www/project/yii-best-practice/cli/yii', 't/index', '-m=123', 'abc', 'xyz']);
// 父进程 exec 进程进行管道通信
use Swoole\Process;
$process = new Process(function (Process $worker) {
$worker->exec('/bin/echo', ['hello']);
$worker->write('hello');
}, true); // 需要启用标准输入输出重定向
$process->start();
echo "from exec: ". $process->read(). "\n";

父进程与exec进程通过管道通信
\Swoole\Process::kill($pid, $signo = SIGTERM): 向指定进程发送信号, 默认是终止进程, 传 0 可检测进程是否存在\Swoole\Process::wait(): 回收子进程, 如果主进程不调用此方法, 子进程会变成 僵尸进程, 浪费系统资源\Swoole\Process::signal(): 异步信号监听
use Swoole\Process;
// 异步信号监听 + wait
Process::signal(SIGCHLD, function ($signal) { // 监听子进程退出信号
// 可能同时有多个子进程退出, 所以要while循环
while ($ret = Process::wait(false)) { // false 表示不阻塞
var_dump($ret);
}
});
\Swoole\Process::daemon(): 将当前进程变为一个守护进程
use Swoole\Process;
// daemon
Process::daemon();
swoole_set_process_name('test daemon process');
sleep(100);

daemon-守护进程
\Swoole\Process::alarm(): 高精度定时器(微秒级), 对setitimer系统调用的封装, 可以配合\Swoole\Process::signal()/pcntl_signal使用
注意不可和 \Swoole\Timer 同时使用
// signal + alarm
// 第一个参数表示时间, 单位 us, -1 表示清除定时器
// 第二个参数表示类型 0->真实时间->SIGALAM 1->cpu时间->SIGVTALAM 2->用户态+内核态时间->SIGPROF
Process::alarm(100*1000); // 100ms
Process::signal(SIGALRM, function ($signal) {
static $i = 0;
echo "#$i \t alarm \n";
$i++;
if ($i>20) {
Process::alarm(-1); // -1 表示清除
}
});

alarm
\Swoole\Process::setaffinity(): 设置CPU亲和, 即将进程绑定到指定CPU核上
传值范围: [0, swoole_cpu_num())
CPU亲和: CPU的速度远远高于IO的速度, 所以CPU有多级缓存来解决IO等待的问题, 绑定指定CPU, 更容易命中CPU缓存
写在最后
资源推荐:
todo:
- 使用输入输出重定向
- 管道类型为
SOCK_STREAM时的情况, 是否需要 封包/解包 处理, 即 swoole wiki – process->write() 中提到的 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并 - 多进程 + 异步IO 的注意事项
能理解 因为子进程会继承父进程的内存和IO句柄 这个会产生的影响, 但是给的示例并没有说明这个问题
use Swoole\Process;
use Swoole\Event;
// 多个子进程 + 异步IO
$workers = [];
$workerNum = 3;
for ($i=0; $i<$workerNum; $i++) {
$process = new Process(function (Process $worker) {
$worker->write($worker->pid);
echo "worker: {$worker->pid} \n";
});
$pid = $process->start();
$workers[$pid] = $process;
// Event::add($process->pipe, function (int $pipe) use ($process) {
// $data = $process->read();
// echo "recv: $data \n";
// });
}
foreach ($workers as $worker) {
Event::add($worker->pipe, function (int $pipe) use ($worker) {
$data = $worker->read();
echo "recv: $data \n";
});
}

多进程异步IO
作者:daydaygo
链接:https://www.jianshu.com/p/4b6326cdaaa7
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
通过 Swoole\Table 实现 Swoole 多进程数据共享
第三方存储媒介
前面我们介绍了基于 Swoole 的 Process 及 Process\Pool 模块在 PHP 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 Swoole 实现的多进程间共享数据,需要借助第三方存储媒介实现:
- 数据库:MySQL、MongoDB
- 缓存:Redis、Memcached
- 磁盘文件
但是这也会引入新的问题,多进程同时操作一条记录或一个文件存在并发访问问题,以数据库操作为例,两个进程可能会同时读取一条数据,或者一个进程对某条记录进行更新处理时,另一个进程也来读取这条记录并进行操作,会导致最终结果数据与预期不一致的情况,这个时候,我们就需要引入锁的概念,当一个进程(比如进程A)对某个记录进行写操作时,对该记录加锁,这样其它进程就无法操作该条记录, 直到进程 A 事务提交再释放这个锁,让其他进程可以进行操作。
内存共享
PHP 相关扩展
对于单机操作来说,除了这些第三方存储媒介之外,还可以通过共享内存的方式实现进程间数据读写操作,有多个 PHP 扩展可以支持共享内存数据操作:
- Semaphore 扩展:可通过该扩展包提供的
shm_get_var和shm_put_var函数实现内存共享数据的读写操作; - Shmop 扩展:可通过该扩展包提供的
shmop_read和shmop_write函数实现内存共享数据的读写操作; - APCu(APC User Cache)扩展:可通过该扩展包提供的
apc_fetch和apc_store实现内存共享数据的读写操作。
Swoole Table
但是上述扩展要么不支持锁,要么高并发时性能比较差,所以 Swoole 自己实现了一个共享内存读写工具 —— Swoole\Table,该工具是一个基于共享内存和锁实现的高性能并发数据结构,可用于解决多进程/多线程数据共享和同步加锁问题:
- 性能强悍,单线程每秒可读写200万次;
- 应用代码无需加锁,内置行锁自旋锁,所有操作均是多线程/多进程安全,用户层完全不需要考虑数据同步问题;
- 支持多进程,可用于多进程之间共享数据;
- 使用行锁,而不是全局锁,仅当 2 个进程在同一 CPU 时间,并发读取同一条数据才会进行发生抢锁。
Swoole\Table 支持以 Key-Value 方式读写,使用起来非常简单:
<?php
// 初始化一个容量为 1024 的 Swoole Table
$table = new \Swoole\Table(1024);
// 在 Table 中新增 id 列
$table->column('id', \Swoole\Table::TYPE_INT);
// 在 Table 中新增 name 列,长度为 50
$table->column('name', \Swoole\Table::TYPE_STRING, 10);
// 在 Table 中新泽 score 列
$table->column('score', \Swoole\Table::TYPE_FLOAT);
// 创建这个 Swoole Table
$table->create();
// 设置 Key-Value 值
$table->set('student-1', ['id' => 1, 'name' => '学小君', 'score' => 80]);
$table->set('student-2', ['id' => 2, 'name' => '学院君', 'score' => 90]);
// 如果指定 Key 值存在则打印对应 Value 值
if ($table->exist('student-1')) {
echo "Student-" . $table->get('student-1', 'id') . ':' . $table->get('student-1', 'name').":".
$table->get('student-1', 'score') . "\n";
}
// 自增操作
$table->incr('student-2', 'score', 5);
// 自减操作
$table->decr('student-2', 'score', 5);
// 表中总记录数
$count = $table->count();
// 删除指定表记录
$table->del('student-1');
此外 Swoole\Table 类还实现了迭代器接口,支持通过 foreach 进行遍历。
在 Laravel 中使用 Swoole\Table
如果要在 Laravel 中集成 Swoole 使用 Swoole\Table,以 LaravelS 扩展包为例,首先要在配置文件 config/laravels.php 中定义 swoole_tables 配置项:
'swoole_tables' => [
'ws' => [ // 表名,会加上 Table 后缀,比如这里是 wsTable
'size' => 102400, // 表容量
'column' => [ // 表字段,字段名为 value
['name' => 'value', 'type' => \Swoole\Table::TYPE_INT, 'size' => 8],
],
],
... // 还可以定义其它表
],
然后我们可以在代码中通过swoole实例上的wsTable属性访问 SwooleTable:
class WebSocketService implements WebSocketHandlerInterface
{
...
// 连接建立时触发
public function onOpen(Server $server, Request $request)
{
// 在触发 WebSocket 连接建立事件之前,Laravel 应用初始化的生命周期已经结束,你可以在这里获取 Laravel 请求和会话数据
// 调用 push 方法向客户端推送数据,fd 是客户端连接标识字段
Log::info('WebSocket 连接建立:' . $request->fd);
app('swoole')->wsTable->set('fd:' . $request->fd, ['value' => $request->fd]);
$server->push($request->fd, 'Welcome to WebSocket Server built on LaravelS');
}
// 收到消息时触发
public function onMessage(Server $server, Frame $frame)
{
foreach (app('swoole')->wsTable as $key => $row) {
if (strpos($key, 'fd:') === 0 && $server->exist($row['value'])) {
Log::info('Receive message from client: ' . $row['value']);
// 调用 push 方法向客户端推送数据
$server->push($frame->fd, 'This is a message sent from WebSocket Server at ' . date('Y-m-d H:i:s'));
}
}
}
...
}
然后我们参考在 Laravel 中集成 Swoole 实现 WebSocket 服务器这篇教程从客户端向 WebSocket 服务器发起请求,即可在最新日志文件中看到相应的日志信息:
[2019-06-19 22:09:03] local.INFO: WebSocket 连接建立:1
[2019-06-19 22:09:07] local.INFO: Receive message from client: 1
apiDoc 使用指南
Version:1.0 StartHTML:000000201 EndHTML:000036428 StartFragment:000010634 EndFragment:000036390 StartSelection:000010634 EndSelection:000036390 SourceURL:https://www.jianshu.com/p/34eac66b47e3 apiDoc 使用指南 – 简书
安装
- 安装node.js
- 安装apiDoc
npm install apidoc -g
配置
在你的项目根目录下新建apidoc.json文件,该文件描述了项目对外提供接口的概要信息如名称、版本、描述、文档打开时浏览器显示标题和接口缺省访问地址。
apidoc.json
{
"name": "ServiceEbikeAPIs",
"version": "3.1.0",
"description": "车辆服务接口文档",
"title": "ServiceEbikeAPIs",
"url" : "http://cjl3.rokyinfo.net:7190/api-ebike"
}
使用样例
/**
*
* @apiDefine RkNotFoundException
*
* @apiError RkNotFoundException 找不到相关数据
*
* @apiErrorExample Error-Response:
* HTTP/1.1 404 Not Found
* {
* "error": {
* "code": 404,
* "msg": "",
* "path" ""
* }
* }
*
*/
/**
*
* @api {get} /v3.1/ues/:sn/rt-info 获取设备上报实时信息
* @apiVersion 3.1.0
* @apiName GetUeRealTimeInfo
* @apiGroup UE
*
* @apiHeader {String} Authorization 用户授权token
* @apiHeader {String} firm 厂商编码
* @apiHeaderExample {json} Header-Example:
* {
* "Authorization": "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOjM2NzgsImF1ZGllbmNlIjoid2ViIiwib3BlbkFJZCI6MTM2NywiY3JlYXRlZCI6MTUzMzg3OTM2ODA0Nywicm9sZXMiOiJVU0VSIiwiZXhwIjoxNTM0NDg0MTY4fQ.Gl5L-NpuwhjuPXFuhPax8ak5c64skjDTCBC64N_QdKQ2VT-zZeceuzXB9TqaYJuhkwNYEhrV3pUx1zhMWG7Org",
* "firm": "cnE="
* }
*
* @apiParam {String} sn 设备序列号
*
* @apiSuccess {String} sn 设备序列号
* @apiSuccess {Number} status 设备状态
* @apiSuccess {Number} soc 电池电量百分比
* @apiSuccess {Number} voltage 电池电压
* @apiSuccess {Number} current 电池电流
* @apiSuccess {Number} temperature 电池温度
* @apiSuccess {String} reportTime 上报时间(yyyy-MM-dd HH:mm:ss)
*
* @apiSuccessExample Success-Response:
* HTTP/1.1 200 OK
* {
* "sn": "P000000000",
* "status": 0,
* "soc": 80,
* "voltage": 60.0,
* "current": 10.0,
* "temperature": null,
* "reportTime": "2018-08-13 18:11:00"
* }
*
* @apiUse RkNotFoundException
*
*/
@RequestMapping(value = "/{sn}/rt-info", method = RequestMethod.GET)
public UeRealTimeInfo getUeRealTimeInfo(@RequestHeader(Constants.HEADER_LOGIN_USER_KEY) long userId, @PathVariable("sn") String sn) {
return ueService.getRealTimeInfo(sn);
}
@api
@api {method} path [title]
HTTP接口调用方法、路径及名称
@apiVersion
@apiVersion version
api 版本号
@apiName
@apiName name
api 名称
@apiGroup
@apiGroup name
api 分组
@apiHeader
@apiHeader [(group)] [{type}] [field=defaultValue] [description]
请求头参数
@apiParam
@apiParam [(group)] [{type}] [field=defaultValue] [description]
请求参数
@apiSuccess
@apiSuccess [(group)] [{type}] field [description]
返回数据描述
@apiSuccessExample
@apiSuccessExample [{type}] [title]
example
接口成功返回样例
@apiError
@apiError [(group)] [{type}] field [description]
接口失败描述
@apiErrorExample
@apiErrorExample [{type}] [title]
example
接口失败返回样例
@apiDefine
@apiDefine name [title]
[description]
类似于宏定义,可以被引用
@apiUse
@apiUse name
使用@apiDefine定义的描述
更详细的说明请参考官方文档http://apidocjs.com
生成文档
cd到apidoc.json所在路径(即项目根目录)执行如下命令即可
apidoc -i src/ -o apidoc/
执行成功后会生成apidoc文件夹如下图所示

图片
点开apidoc文件夹中index.html会发现已经生成的漂亮的api文档

图片
作者:袁志健
链接:https://www.jianshu.com/p/34eac66b47e3
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。