PHP用CURL发送Content-type为application/json的POST请求方法

PHP发送JSON POST
/**
 * Curl版本
 * 使用方法:
 * $post_string = "app=request&version=beta";
 * request_by_curl('http://www.waitalone.cn/restServer.php', $post_string);
 */
function json_post($url, $data = NULL)
{

    $curl = curl_init();

    curl_setopt($curl, CURLOPT_URL, $url);
    curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, false);
    curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, false);
    if(!$data){
        return 'data is null';
    }
    if(is_array($data))
    {
        $data = json_encode($data);
    }
    curl_setopt($curl, CURLOPT_POST, 1);
    curl_setopt($curl, CURLOPT_POSTFIELDS, $data);
    curl_setopt($curl, CURLOPT_HEADER, 0);
    curl_setopt($curl, CURLOPT_HTTPHEADER,array(
        'Content-Type: application/json; charset=utf-8',
        'Content-Length:' . strlen($data),
        'Cache-Control: no-cache',
        'Pragma: no-cache'
    ));
    curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
    $res = curl_exec($curl);
    $errorno = curl_errno($curl);
    if ($errorno) {
        return $errorno;
    }
    curl_close($curl);
    return $res;

}

PHP接受JSON POST
$data = json_decode(file_get_contents('php://input'), true);

swoole 如何实现 websocket 客户端的功能?

workerman 的 websocket 客服端是这样实现的:(很简单,很容易就使用了,和前端差不多)

http://doc.workerman.net/315306

swoole 如何实现这样相同的功能呢?

看了一下的一些文档,始终无法理解:

https://github.com/matyhtf/framework/blob/master/libs/Swoole/Client/WebSocket.php

https://wiki.swoole.com/wiki/page/p-http_client.html

如果只是获取的话,比较简单:

<?php
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);

$client->on('connect', function($cli) {
    $cli->send("GET / HTTP/1.1\r\n\r\n");
});
$client->on('receive', function($cli, $data) {
    echo "Received: " . $data;
});
$client->on('error', function($cli) {
    echo "Connect failed\r\n\r\n";
});
$client->on('close', function($cli) {
    echo "Connection close\r\n\r\n";
});

$client->connect('xxx.xxx.xxx.xxx', 1234, true);

但是我这里想要实现的是类似 workerman 那样的,可以一直连接。

使用PHP客户端连接到websocket

这是我一直在使用的代码,已经在不同的论坛上广泛发布.但由于某种原因,我无法让它发挥作用.

任何帮助,将不胜感激.

$host = 'host';  //where is the websocket server
$port = 443; //ssl
$local = "http://www.example.com/";  //url where this script run
$data = '{"id": 2,"command": "server_info"}';  //data to be send

$head =        "GET / HTTP/1.1"."\r\n".
               "Upgrade: WebSocket"."\r\n".
               "Connection: Upgrade"."\r\n".
               "Origin: $local"."\r\n".
               "Host: $host"."\r\n".
               "Content-Length: ".strlen($data)."\r\n"."\r\n";
////WebSocket handshake
$sock = fsockopen($host, $port, $errno, $errstr, 2);
fwrite($sock, $head ) or die('error:'.$errno.':'.$errstr);
$headers = fread($sock, 2000);
fwrite($sock, "\x00$data\xff" ) or die('error:'.$errno.':'.$errstr);
$wsdata = fread($sock, 2000);  //receives the data included in the websocket package "\x00DATA\xff"
$retdata = trim($wsdata,"\x00\xff"); //extracts data
////WebSocket handshake
fclose($sock);

echo $retdata;

我可能更喜欢使用现有的websocket客户端库(可能是 https://github.com/gabrielbull/php-websocket-client或 https://github.com/Devristo/phpws/tree/master/src/Devristo/Phpws/Client?)而不是自己滚动,但我至少通过使用以下方式连接:

$head = "GET / HTTP/1.1"."\r\n".
    "Host: $host"."\r\n".
    "Upgrade: websocket"."\r\n".
    "Connection: Upgrade"."\r\n".
    "Sec-WebSocket-Key: asdasdaas76da7sd6asd6as7d"."\r\n".
    "Sec-WebSocket-Version: 13"."\r\n".
    "Content-Length: ".strlen($data)."\r\n"."\r\n";

我的服务器正在使用TLS / SSL,所以我还需要:

$sock = fsockopen('tls://'.$host, $port, $errno, $errstr, 2);

完整的协议规范是:https://tools.ietf.org/rfc/rfc6455.txt

WebSocket\Server

1.7.9增加了内置的WebSocket服务器支持,通过几行PHP代码就可以写出一个异步非阻塞多进程的WebSocket服务器。

$server = new Swoole\WebSocket\Server("0.0.0.0", 9501);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    echo "server: handshake success with fd{$request->fd}\n";
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    $server->push($frame->fd, "this is server");
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

onRequest回调

WebSocket\Server 继承自 Http\Server

  • 设置了onRequest回调,WebSocket\Server也可以同时作为http服务器
  • 未设置onRequest回调,WebSocket\Server收到http请求后会返回http 400错误页面
  • 如果想通过接收http触发所有websocket的推送,需要注意作用域的问题,面向过程请使用globalWebSocket\Server进行引用,面向对象可以把WebSocket\Server设置成一个成员属性

1、面向过程代码

$server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
        echo "server: handshake success with fd{$request->fd}\n";
    });
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
        echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
        $server->push($frame->fd, "this is server");
    });
$server->on('close', function ($ser, $fd) {
        echo "client {$fd} closed\n";
    });
$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
    global $server;//调用外部的server
    // $server->connections 遍历所有websocket连接用户的fd,给所有用户推送
    foreach ($server->connections as $fd) {
        // 需要先判断是否是正确的websocket连接,否则有可能会push失败
        if ($server->isEstablished($fd)) {
            $server->push($fd, $request->get['message']);
        }
    }
});
$server->start();

2、面向对象代码

class WebsocketTest {
    public $server;
    public function __construct() {
        $this->server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
        $this->server->on('open', function (swoole_websocket_server $server, $request) {
            echo "server: handshake success with fd{$request->fd}\n";
        });
        $this->server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
            echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
            $server->push($frame->fd, "this is server");
        });
        $this->server->on('close', function ($ser, $fd) {
            echo "client {$fd} closed\n";
        });
        $this->server->on('request', function ($request, $response) {
            // 接收http请求从get获取message参数的值,给用户推送
            // $this->server->connections 遍历所有websocket连接用户的fd,给所有用户推送
            foreach ($this->server->connections as $fd) {
                // 需要先判断是否是正确的websocket连接,否则有可能会push失败
                if ($this->server->isEstablished($fd)) {
                    $this->server->push($fd, $request->get['message']);
                }
            }
        });
        $this->server->start();
    }
}
new WebsocketTest();

客户端

  • Chrome/Firefox/高版本IE/Safari等浏览器内置了JS语言的WebSocket客户端
  • 微信小程序开发框架内置的WebSocket客户端
  • 异步的PHP程序中可以使用Swoole\Http\Client作为WebSocket客户端
  • apache/php-fpm或其他同步阻塞的PHP程序中可以使用swoole/framework提供的同步WebSocket客户端
  • WebSocket客户端不能与WebSocket服务器通信

swoole 的websocket 连接

介绍下两种连接websocket服务器的方法

1 通过访问http服务器然后访问我们的html页面连接我们的websouket服务器,这样我们就需要3个文件。1 http.php  2 ws.php 3 ws.html 简单贴下我的图,也是参考官方文档写的

http.php

<?php
/**

  • Created by PhpStorm.
  • User: xyj
  • Date: 18-9-9
  • Time: 下午4:31
    */

//实例化
$http_server = new swoole_http_server(‘0.0.0.0’,9501);

//服务器配置
$http_server->set(
[
‘enable_static_handler’ => true,
‘document_root’ => ‘/www/wwwroot/test_project/swoole’,
]
);

$http_server->on(‘request’,function($request ,$response){
//print_r($request->get);
//设置响应头信息
$response->cookie(‘xyj’,’hello’,86400);
//服务器返回信息
$response->end(‘http_server’ . json_encode($request->get));
});

$http_server->start();
ws.php

<?php
/**

  • Created by PhpStorm.
  • User: xyj
  • Date: 18-9-9
  • Time: 下午5:02
    */

//创建websocket服务器对象,监听0.0.0.0:9502端口
$ws = new swoole_websocket_server(“0.0.0.0”, 9502);

$ws->set(
[
‘enable_static_handler’ => true,
‘document_root’ => ‘/www/wwwroot/test_project/swoole’,
]
);
//监听WebSocket连接打开事件
$ws->on(‘open’, function ($ws, $request) {
var_dump($request->fd, $request->get, $request->server);
$ws->push($request->fd, “hello, welcome\n”);
});

//监听WebSocket消息事件
$ws->on(‘message’, function ($ws, $frame) {
echo “Message: {$frame->data}\n”;
$ws->push($frame->fd, “server: {$frame->data}”);
});

//监听WebSocket连接关闭事件
$ws->on(‘close’, function ($ws, $fd) {
echo “client-{$fd} is closed\n”;
});

$ws->start();
ws.html



Document

hello swoole 测试

如果是在本地测试环境运行那么wsServer = ‘ws://127.0.0.1:9502’,如果是外网 那么127.0.0.1 就要改成你浏览器上所访问的地址,这里的端口必须要和你websocket 服务器端口一致否则无法访问的

准备工作做好了那么我们就开始访问下websocket,首先在终端 运行http.php 和 ws.php, 正常情况下是这样如果遇到错误一般常见的就是端口被占用,如果是swoole扩展没装好或者是语法错误,那么请在仔细的读啃下官方文档。

如果是端口被占用我们可以使用 lsof -i:9501 命令查看 端口信息, 在kill之前请先认真的看下 该端口是否是重要端口,不要盲目的kill 。

还有一种就是我们开启过http.php 在关闭后 可能被占用的端口没有及时的释放,当我们再次php http.php的时候他也会抛出端口被占用的错误,这里我们可以用 这个命令 netstat -ntlp 来查看tcp监听的端口,其父进程 kill 然后在运行就ok了,继续使用这个命令来查看tcp端口如图

走到这里我们的两个服务器都已经开启了 。我们就可以通过浏览器访问了,在浏览器上输入地址:9501 , 此时这里的端口是要个http服务器的端口一致而不是websocket的端口,访问成功会出现如下图

————————————————
版权声明:本文为CSDN博主「君子有攸往」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/xiao199306/article/details/82621140

swoole的process模块创建和使用子进程

Version:1.0 StartHTML:000000210 EndHTML:000047154 StartFragment:000003232 EndFragment:000047098 StartSelection:000003301 EndSelection:000047086 SourceURL:https://www.cnblogs.com/jkko123/p/10918056.html swoole的process模块创建和使用子进程 – 怀素真 – 博客园

swoole中为我们提供了一个进程管理模块 Process,替换PHP的 pcntl 扩展,方便我们创建进程,管理进程,和进程间的通信。

swoole提供了2种进程间的通信:

1、基于 unix socket 的管道 pipe。

2、基于 sysvmsg 的消息队列。

我们可以通过 new swoole_process() 快速的创建一个进程,默认会创建一个 SOCK_DGRAM 类型的管道,用于进程间的通信,当然可以设置成其他类型,也可以不创建。

一、通过同步阻塞管道进行进程间通信?

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748<?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {//创建子进程//默认为每个子进程创建一个管道,如果不想创建设置$pipe_type参数为false//注意管道默认是同步阻塞,半双工,如果读取不到数据就会阻塞$worker = new swoole_process(function (swoole_process $worker) {//注意,如果主进程中不写数据write(),那么子进程这里read()就会阻塞$task = json_decode($worker->read(), true); //进行计算任务$tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;} echo '子进程 PID : ', $worker->pid, ' 计算 ', $task['start'], ' - ', $task['end'], ' 结果 : ', $tmp, PHP_EOL;//往管道中写入计算的结果$worker->write($tmp);//子进程退出$worker->exit();}); //保存子进程$worker_process[$i] = $worker; //启动子进程$worker->start();} //往每个子进程管道中投递任务for ($i = 0; $i < $worker_process_nums; $i++) {$worker_process[$i]->write(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),]));} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}});

二、通过 swoole_event_add 将管道设为异步,来进行通信?

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556<?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {$worker = new swoole_process(function ($worker) {//在子进程中给管道添加事件监听//底层会自动将该管道设置为非阻塞模式//参数二,是可读事件回调函数,表示管道可以读了swoole_event_add($worker->pipe, function ($pipe) use ($worker) {$task = json_decode($worker->read(), true); $tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;}echo "子进程 : {$worker->pid} 计算 {$task['start']} - {$task['end']} \n";//子进程把计算的结果,写入管道$worker->write($tmp);//注意,swoole_event_add与swoole_event_del要成对使用swoole_event_del($worker->pipe);//退出子进程$worker->exit();});}); $worker_process[$i] = $worker; //启动子进程$worker->start();} for ($i = 0; $i < $worker_process_nums; $i++) {$worker = $worker_process[$i]; $worker->write(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),])); //主进程中,监听子进程管道事件swoole_event_add($worker->pipe, function ($pipe) use ($worker) {$result = $worker->read();echo "子进程 : {$worker->pid} 计算结果 {$result} \n";swoole_event_del($worker->pipe);});} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}});

三、使用消息队列来完成进程间通信?

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152<?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {//注意,这里将参数$pipe_type设为false,表示不创建管道$worker = new swoole_process(function ($worker) {$task = json_decode($worker->pop(), true); $tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;}echo "子进程 : {$worker->pid} 计算 {$task['start']} - {$task['end']} \n";$worker->push($tmp);$worker->exit();}, false, false); //使用消息队列,作为进程间的通信//注意,消息队列是共享的$worker->useQueue(); $worker_process[$i] = $worker; //启动子进程$worker->start();} for ($i = 0; $i < $worker_process_nums; $i++) {//只需用一个子进程发送消息即可,因为消息队列是共享的$worker_process[0]->push(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),]));} //注意,这里要暂停,防止加入队列的任务,立刻被主进程读出来。sleep(1); for ($i = 0; $i < $worker_process_nums; $i++) {$result = $worker_process[0]->pop();echo "计算结果 : {$result} \n";} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}});

四、进程可以通过 signal 监听信号,和 alarm 设置定时器。

我们可以在父进程上设置监听信号,当子进程退出时,重新挂起子进程。

也可以设置定时器,通过 swoole_process::kill($pid, 0); 定时检测进程是否存活。?

1234567891011121314151617181920212223<?php //每隔1秒触发SIGALAM信号//注意,alarm不能和Timer同时使用swoole_process::alarm(1000 * 1000, 0); swoole_process::signal(SIGALRM, function ($signo) {static $cnt = 0;$cnt++;echo "时钟定时信号\n"; if ($cnt > 10) {//清除定时器swoole_process::alarm(-1);}}); swoole_process::signal(SIGINT, function ($signo) {echo "我被ctrl+c了\n"; //退出主进程,不然将一直无法正常退出exit(0);});

https://www.cnblogs.com/jkko123/p/10918056.html

版权声明:博主文章,可以不经博主允许随意转载,随意修改,知识是用来传播的。

php+swoole多线程的操作

多个任务同时执行

比如,我们要对已知的用户数据进行判断,是否需要发送邮件和短信,如果需要发送则发送。

不使用多进程时,我们首先判断是否发送邮件,如果需要则发送;然后再判断是否需要发送短信,如果需要则发送。如果发送邮件耗时2s,发送短信耗时2s,那么我们完成任务大概需要4s左右的时间。

如果我们使用多线程的话,可以开两个线程,一个用于处理邮件,一个用于处理短信,则耗时一共需要2s左右,处理时间缩短了一半。

以下是代码实例:

复制代码

<?php
/**
 * Created by PhpStorm.
 * User: cyw0413
 * Date: 2018/10/20
 * Time: 10:37
 */
$info = array(
    “sendmail”=>1,
    “mailto”=>”12345@qq.com”,
    “sendsms”=>1,
    “smsto”=>”123456”
);
echo “start:”.date(“Y-m-d H:i:s”).PHP_EOL;
$mail_process = new swoole_process(‘sendMail’,true);
$mail_process->start();
$sms_process = new swoole_process(‘sendSMS’,true);
$sms_process->start();
//主进程输出子进程范围内容
echo $mail_process->read();
echo PHP_EOL;
echo $sms_process->read();
echo PHP_EOL;
echo “end:”.date(“Y-m-d H:i:s”).PHP_EOL;
//并行函数
function sendMail(swoole_process $worker){
    global $info;
    if($info[‘sendmail’]==1){
        sleep(2);
        $worker->write(“send mail to “.$info[‘mailto’]);
    }
}
function sendSMS(swoole_process $worker){
    global $info;
    if($info[‘sendmail’]==1){
        sleep(2);
        $worker->write(“send sms to “.$info[‘smsto’]);
    }
}

复制代码

大任务划分成多个小任务

假设我们现在有一个通过curl抓取网页内容的需求,需要抓取10个网页,url地址通过数组读取,每个curl耗时2s。如果我们通过for循环来抓取这10个网页,需要耗时20s,使用多进程我们可以将任务划分成5份,分别由5个进程执行,每个进程抓取2个url,并发执行,共耗时4s,效率提高5倍。

以下是代码实例

复制代码

<?php
/**
 * Created by PhpStorm.
 * User: cyw0413
 * Date: 2018/10/20
 * Time: 10:51
 */
$url_arr = array();
for ($i=0;$i<10;$i++){
    $url_arr[] = “www.baidu.com?wd=”.$i;
}
echo “start:”.date(“Y-m-d H:i:s”).PHP_EOL;
$workers = array();
for ($i=0;$i<5;$i++){
    $process = new swoole_process(‘getContents’,true);
    $process->start();
    $process->write($i);
    $workers[] = $process;
}
//主进程数据结果
foreach ($workers as $process){
    echo $process->read();
    echo PHP_EOL;
}
echo “end:”.date(“Y-m-d H:i:s”).PHP_EOL;
function getContents(swoole_process $worker){
    $i = $worker->read();
    global $url_arr;
    $res1 = execCurl($url_arr[($i*2)]);
    $res2 = execCurl($url_arr[($i*2+1)]);
    echo $res1.PHP_EOL.$res2;
}
function execCurl($url){
    sleep(2);
    return “handle “.$url.” finished”;
}

复制代码

总结

以上两种情况,本质上都是将逻辑上没有先后关系的任务,用多个进程程并发执行,提高效率。

php机制本身不提供多线程的操作,ptcl扩展提供了php操作linux多进程的接口。

Swoole| Swoole 中 Process

Version:1.0 StartHTML:000000201 EndHTML:000101815 StartFragment:000009570 EndFragment:000101777 StartSelection:000009570 EndSelection:000101777 SourceURL:https://www.jianshu.com/p/4b6326cdaaa7 Swoole| Swoole 中 Process – 简书

date: 2018-1-8 20:56:08
title: Swoole| Swoole 中 Process

这篇 blog 折腾了很久才写出来, 问题主要还是在 理解 上. 有时候就是这样,

理解了之后就很简单, 不理解就很难; 知道了就很简单, 不知道往往就很难. 所以 stay hungry stay foolish stay young 真的很重要

本来计划开发 swoft 框架 中的 Process 模块, 所以需要对 swoole 的 Process 模块要有比较深入的了解才行. 不过根据 swoole 官方 wiki 的实践过程中, 一直有未理解的部分. 之前虽然也做过多次 多进程编程, 但是当真正需要进行框架开发的时候, 就会发现以前学到的知识不够全面, 无法指导整体的设计. 好在一直在坚持, 奉上现在理解的程度.

内容一览:

  • 进程相关基础操作: fork/exit/kill/wait
  • 进程相关高级操作: 主进程退出子进程干完活后也退出; 子进程异常退出主进程自动重启
  • 进程间通信(IPC) – 管道(pipe)
  • 进程间通信(IPC) – 消息队列(message queue)
  • swoole process 模块提供的更多功能

进程相关基础操作

进程是什么: 进程是运行者的程序

先来看看一个最简单的例子:

<?php
echo posix_getpid(); // 获取当前进程的 pid
swoole_set_process_name('swoole process master'); // 修改所在进程的进程名
sleep(100); // 模拟一个持续运行 100s 的程序, 这样就可以在进程中查看到它, 而不是运行完了就结束

通过 ps aux 查看进程:

未设置进程名

设置进程名

再来看看 swoole 中使用子进程的基础操作:

use Swoole\Process;

$process = new Process(function (Process $worker) {
    if (Process::kill($worker->pid, 0)) { // kill操作常用来杀死进程, 传入 0 可以用来检测进程是否存在
        $worker->exit(); // 退出子进程
    }
});
$process->start(); // 启动子进程
Process::wait(); // 回收退出的子进程
  • new Process(): 通过回调函数来设置子进程将要执行的逻辑
  • $process->start(): 调用 fork() 系统调用, 来生成子进程
  • Process::kill(): kill操作给进程发送信号, 常用来杀死进程, 传入 0 可以用来检测进程是否存在
  • Process::wait(): 调用 wait() 系统调用, 回收子进程, 如果不回收, 子进程会编程 僵尸进程, 浪费系统资源
  • $worker->exit(): 子进程主动退出

我在这里有一个疑问:

主进程的生命周期是怎么样的? 子进程的生命周期是怎么样的?

有这样一个疑问也来自于我之前的思维惯性: 理解一个事物时从事物的生命周期进行理解. 结合 进程是运行着的程序 来一起理解:

  • new Process(): 只有回调函数的逻辑会在进程中执行
  • 除此之外的代码都是在主进程中执行

进程相关高级操作

  • 主进程退出子进程干完活后也退出
  • 子进程异常退出主进程自动重启

<?php

use Swoole\Process;

class MyProcess1
{
    public $mpid = 0; // master pid, 即当前程序的进程ID
    public $works = []; // 记录子进程的 pid
    public $maxProcessNum = 1;
    public $newIndex = 0;

    public function __construct()
    {
        try {
            swoole_set_process_name(__CLASS__. ' : master');
            $this->mpid = posix_getpid();
            $this->run();
            $this->processWait();
        } catch (\Exception $e) {
            die('Error: '. $e->getMessage());
        }
    }

    public function run()
    {
        for ($i=0; $i<$this->maxProcessNum; $i++) {
            $this->createProcess();
        }
    }

    public function createProcess($index = null)
    {
        if (is_null($index)) {
            $index = $this->newIndex;
            $this->newIndex++;
        }
        $process = new Process(function (Process $worker) use($index) { // 子进程创建后需要执行的函数
            swoole_set_process_name(__CLASS__. ": worker $index");
            for ($j=0; $j<3; $j++) { // 模拟子进程执行耗时任务
                $this->checkMpid($worker);
                echo "msg: {$j}\n";
                sleep(1);
            }
        }, false, false); // 不重定向输入输出; 不使用管道
        $pid = $process->start();
        $this->works[$index] = $pid;
        return $pid;
    }

    // 主进程异常退出, 子进程工作完后退出
    public function checkMpid(Process $worker) // demo中使用的引用, 引用表示传的参数可以被改变, 由于传入 $worker 是 \Swoole\Process 对象, 所以不用使用 &
    {
        if (!Process::kill($this->mpid, 0)) { // 0 可以用来检测进程是否存在
            $worker->exit();
            $msg = "master process exited, worker {$worker->pid} also quit\n"; // 需要写入到日志中
            file_put_contents('process.log', $msg, FILE_APPEND); // todo: 这句话没有执行
        }
    }

    // 重启子进程
    public function rebootProcess($pid)
    {
        $index = array_search($pid, $this->works);
        if ($index !== false) {
            $newPid = $this->createProcess($index);
            echo "rebootProcess: {$index}={$pid}->{$newPid} Done\n";
            return;
        }
        throw new \Exception("rebootProcess error: no pid {$pid}");
    }

    // 自动重启子进程
    public function processWait()
    {
        while (1) {
            if (count($this->works)) {
                $ret = Process::wait(); // 子进程退出
                if ($ret) {
                    $this->rebootProcess($ret['pid']);
                }
            } else {
                break;
            }
        }
    }
}

new MyProcess1();

说明以下几点:

  • 子进程运行结束后就会退出, 通过 Process::wait() 检测到子进程退出信号执行自动重启, 子进程就会一直执行下去
  • 关于函数参数传 引用/指针, 一个很好的理解方式是: 参数可以被修改

运行并模拟主进程异常退出:

模拟主进程异常退出

输出

进程间通信(IPC) – 管道(pipe)

管道的几个关键词:

  • 半双工: 数据单向流动, 一端只读, 一端只写.
  • 同步 vs 异步: 默认为同步阻塞模式, 可以使用 swoole_event_add() 添加管道到 swoole 的 event loop 中, 实现异步IO
  • 管道类型(数据格式): SOCK_STREAM, 流式, 需要用户自己处理数据的封包/解包; SOCK_DGRAM, 数据报, 每次收发都是一次完整的数据包 (DGRAM/STREAM)

注意, swoole wiki – process->write() 中提到 SOCK_DGRAM 并不会乱序丢包

先来看一个简单的例子, php从shell管道中读取数据:

// get pip data
$fp = fopen('php://stdin', 'r');
if ($fp) {
    while ($line = fgets($fp, 4096)) {
        echo "php get pip data: ". $line;
    }
    fclose($fp);
}

从shell管道读取数据

swoole process中的管道很强大, 支持 子进程写, 主进程读 以及 主进程写, 子进程读:

use Swoole\Process;

// 子进程写, 父进程读
$process = new Process(function (Process $worker) {
    $worker->write("worker");
});
$process->start();
$msg = $process->read();
echo "from process: $msg", "\n";

// 父进程写, 子进程读
$process = new Process(function (Process $worker) {
    $msg = $worker->read();
    echo "from master: $msg", "\n";
});
$process->start();
$process->write('master');

使用管道多次读写

注意区分 $worker->write()$process->write(), 之前一直错误的以为这 2 个是相同的, 其实就是把 $process 误以为是子进程, 从而相当于 $process->write() 就是子进程写管道 — 其实这里是主进程内执行的逻辑, 是主进程写数据到管道, 供子进程读取

swoole中其他管道相关操作:

  • 异步IO

use Swoole\Process;
use Swoole\Event;

// 异步IO
$process = new Process(function (Process $worker) {
    $GLOBALS['worker'] = $worker;
    Event::add($worker->pipe, function (int $pipe) { // 使用 swoole_event_add 添加管道到异步IO
        /** @var Process $worker */
        $worker = $GLOBALS['worker'];
        $msg = $worker->read();
        echo "from master: $msg \n";
        $worker->write("hello master");
        sleep(2);
        $worker->exit(0);
    });
});
$process->start();
$process->write("master msg 1");
$msg = $process->read();
echo "from process: $msg \n";

异步IO

  • 设置超时

use Swoole\Process;

// 设置管道超时
$process = new Process(function (Process $worker) {
    sleep(5);
});
$process->start();
$process->setTimeout(0.5);
$ret = $process->read();
var_dump($ret);
var_dump(swoole_errno());

管道超时

插播一个趣事, @thinkpc 看完 2017北京PHP开发者年会, 就知道为啥会点赞了

  • 关闭管道

// 关闭管道: 默认值0->关闭读写 1->关闭写 2->关闭读
$process->close();

进程间通信(IPC) – 消息队列(message queue)

消息队列:

  • 一系列保存在内核中的消息链表
  • 有一个 msgKey, 可以通过此访问不同的消息队列
  • 有数据大小限制, 默认 8192, 可以通过内核修改
  • 阻塞 vs 非阻塞: 阻塞模式下 pop()空消息队列/push()满消息队列会阻塞, 非阻塞模式可以直接返回

swoole 中使用消息队列:

  • 通信模式: 默认为争抢模式, 无法将消息投递给指定子进程
  • 新建消息队列后, 主进程就可以使用
  • 消息队列不可和管道一起使用, 也无法使用 swoole event loop
  • 主进程中要调用 wait(), 否则子进程中调用 pop()/push() 会报错

use Swoole\Process;
$process = new Process(function (Process $worker) {
    // $worker->push('worker');
    echo "from master: ". $worker->pop(). "\n";
    sleep(2);
    // $worker->exit();
}, false, false); // 关闭管道
// 参数一为 msgKey, 这里是默认值
// 参数二为 通信模式, 默认值 2 表示争抢模式, 这里还加上了 非阻塞
$process->useQueue(ftok(__FILE__, 1), 2| Process::IPC_NOWAIT);
$process->push('hello1'); // 使用 useQueue 后, 主进程就可以读写消息队列了
$process->push('hello2');
echo "from woker: ". $process->pop(). "\n";
// echo "from woker: ". $process->pop(). "\n";
$process->start(); // 启动子进程
// 消息队列状态
var_dump($process->statQueue());
// 删除队列, 如果不调用则不会在程序结束时清楚数据, 下次使用相同 msgKey 时还可以访问数据
$process->freeQueue();
var_dump(Process::wait()); // 要调用 wait(), 否则子进程中 push()/pop() 会报错

消息队列

swoole process 模块提供的更多功能

  • swoole_set_process_name(): 修改进程名, 不兼容 mac
  • swoole_process->exec(string $execfile, array $args) 执行外部程序

参数 $execfile 需要使用可执行文件的绝对路径, 参数 args 为参数数组

// 比如 python test.py 123
swoole_process->exec('/usr/bin/python', ['test.py', 123]);

// 更复杂的例子
swoole_process->exec(('/usr/local/bin/php', ['/var/www/project/yii-best-practice/cli/yii', 't/index', '-m=123', 'abc', 'xyz']);

// 父进程 exec 进程进行管道通信
use Swoole\Process;
$process = new Process(function (Process $worker) {
    $worker->exec('/bin/echo', ['hello']);
    $worker->write('hello');
}, true); // 需要启用标准输入输出重定向
$process->start();
echo "from exec: ". $process->read(). "\n";

父进程与exec进程通过管道通信

  • \Swoole\Process::kill($pid, $signo = SIGTERM): 向指定进程发送信号, 默认是终止进程, 传 0 可检测进程是否存在
  • \Swoole\Process::wait(): 回收子进程, 如果主进程不调用此方法, 子进程会变成 僵尸进程, 浪费系统资源
  • \Swoole\Process::signal(): 异步信号监听

use Swoole\Process;

// 异步信号监听 + wait
Process::signal(SIGCHLD, function ($signal) { // 监听子进程退出信号
    // 可能同时有多个子进程退出, 所以要while循环
    while ($ret = Process::wait(false)) { // false 表示不阻塞
        var_dump($ret);
    }
});

\Swoole\Process::daemon(): 将当前进程变为一个守护进程

use Swoole\Process;

// daemon
Process::daemon();
swoole_set_process_name('test daemon process');
sleep(100);

daemon-守护进程

  • \Swoole\Process::alarm(): 高精度定时器(微秒级), 对 setitimer 系统调用的封装, 可以配合 \Swoole\Process::signal() / pcntl_signal 使用

注意不可和 \Swoole\Timer 同时使用

// signal + alarm
// 第一个参数表示时间, 单位 us, -1 表示清除定时器
// 第二个参数表示类型 0->真实时间->SIGALAM 1->cpu时间->SIGVTALAM 2->用户态+内核态时间->SIGPROF
Process::alarm(100*1000); // 100ms
Process::signal(SIGALRM, function ($signal) {
    static $i = 0;
    echo "#$i \t alarm \n";
    $i++;
    if ($i>20) {
        Process::alarm(-1); // -1 表示清除
    }
});

alarm

  • \Swoole\Process::setaffinity(): 设置CPU亲和, 即将进程绑定到指定CPU核上

传值范围: [0, swoole_cpu_num())
CPU亲和: CPU的速度远远高于IO的速度, 所以CPU有多级缓存来解决IO等待的问题, 绑定指定CPU, 更容易命中CPU缓存

写在最后

资源推荐:

todo:

  • 使用输入输出重定向
  • 管道类型为 SOCK_STREAM 时的情况, 是否需要 封包/解包 处理, 即 swoole wiki – process->write() 中提到的 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并
  • 多进程 + 异步IO 的注意事项

能理解 因为子进程会继承父进程的内存和IO句柄 这个会产生的影响, 但是给的示例并没有说明这个问题

use Swoole\Process;
use Swoole\Event;

// 多个子进程 + 异步IO
$workers = [];
$workerNum = 3;
for ($i=0; $i<$workerNum; $i++) {
    $process = new Process(function (Process $worker) {
        $worker->write($worker->pid);
        echo "worker: {$worker->pid} \n";
    });
    $pid = $process->start();
    $workers[$pid] = $process;
    // Event::add($process->pipe, function (int $pipe) use ($process) {
    //  $data = $process->read();
    //  echo "recv: $data \n";
    // });
}
foreach ($workers as $worker) {
    Event::add($worker->pipe, function (int $pipe) use ($worker) {
        $data = $worker->read();
        echo "recv: $data \n";
    });
}

多进程异步IO

作者:daydaygo
链接:https://www.jianshu.com/p/4b6326cdaaa7
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

通过 Swoole\Table 实现 Swoole 多进程数据共享

第三方存储媒介

前面我们介绍了基于 Swoole 的 ProcessProcess\Pool 模块在 PHP 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 Swoole 实现的多进程间共享数据,需要借助第三方存储媒介实现:

  • 数据库:MySQL、MongoDB
  • 缓存:Redis、Memcached
  • 磁盘文件

但是这也会引入新的问题,多进程同时操作一条记录或一个文件存在并发访问问题,以数据库操作为例,两个进程可能会同时读取一条数据,或者一个进程对某条记录进行更新处理时,另一个进程也来读取这条记录并进行操作,会导致最终结果数据与预期不一致的情况,这个时候,我们就需要引入锁的概念,当一个进程(比如进程A)对某个记录进行写操作时,对该记录加锁,这样其它进程就无法操作该条记录, 直到进程 A 事务提交再释放这个锁,让其他进程可以进行操作。

内存共享

PHP 相关扩展

对于单机操作来说,除了这些第三方存储媒介之外,还可以通过共享内存的方式实现进程间数据读写操作,有多个 PHP 扩展可以支持共享内存数据操作:

  • Semaphore 扩展:可通过该扩展包提供的 shm_get_varshm_put_var 函数实现内存共享数据的读写操作;
  • Shmop 扩展:可通过该扩展包提供的 shmop_readshmop_write 函数实现内存共享数据的读写操作;
  • APCu(APC User Cache)扩展:可通过该扩展包提供的 apc_fetchapc_store 实现内存共享数据的读写操作。

Swoole Table

但是上述扩展要么不支持锁,要么高并发时性能比较差,所以 Swoole 自己实现了一个共享内存读写工具 —— Swoole\Table,该工具是一个基于共享内存和锁实现的高性能并发数据结构,可用于解决多进程/多线程数据共享和同步加锁问题:

  • 性能强悍,单线程每秒可读写200万次;
  • 应用代码无需加锁,内置行锁自旋锁,所有操作均是多线程/多进程安全,用户层完全不需要考虑数据同步问题;
  • 支持多进程,可用于多进程之间共享数据;
  • 使用行锁,而不是全局锁,仅当 2 个进程在同一 CPU 时间,并发读取同一条数据才会进行发生抢锁。

Swoole\Table 支持以 Key-Value 方式读写,使用起来非常简单:

<?php

// 初始化一个容量为 1024 的 Swoole Table
$table = new \Swoole\Table(1024);
// 在 Table 中新增 id 列
$table->column('id', \Swoole\Table::TYPE_INT);
// 在 Table 中新增 name 列,长度为 50
$table->column('name', \Swoole\Table::TYPE_STRING, 10);
// 在 Table 中新泽 score 列
$table->column('score', \Swoole\Table::TYPE_FLOAT);
// 创建这个 Swoole Table
$table->create();


// 设置 Key-Value 值
$table->set('student-1', ['id' => 1, 'name' => '学小君', 'score' => 80]);
$table->set('student-2', ['id' => 2, 'name' => '学院君', 'score' => 90]);

// 如果指定 Key 值存在则打印对应 Value 值
if ($table->exist('student-1')) {
    echo "Student-" . $table->get('student-1', 'id') . ':' . $table->get('student-1', 'name').":".
        $table->get('student-1', 'score') . "\n";
}

// 自增操作
$table->incr('student-2', 'score', 5);
// 自减操作
$table->decr('student-2', 'score', 5);

// 表中总记录数
$count = $table->count();

// 删除指定表记录
$table->del('student-1');

此外 Swoole\Table 类还实现了迭代器接口,支持通过 foreach 进行遍历。

在 Laravel 中使用 Swoole\Table

如果要在 Laravel 中集成 Swoole 使用 Swoole\Table,以 LaravelS 扩展包为例,首先要在配置文件 config/laravels.php 中定义 swoole_tables 配置项:

'swoole_tables'            => [
    'ws' => [ // 表名,会加上 Table 后缀,比如这里是 wsTable
        'size'   => 102400, //  表容量
        'column' => [ // 表字段,字段名为 value
            ['name' => 'value', 'type' => \Swoole\Table::TYPE_INT, 'size' => 8],
        ],
    ],
    ... // 还可以定义其它表
],

然后我们可以在代码中通过swoole实例上的wsTable属性访问 SwooleTable:

class WebSocketService implements WebSocketHandlerInterface
{
    ...

    // 连接建立时触发
    public function onOpen(Server $server, Request $request)
    {
        // 在触发 WebSocket 连接建立事件之前,Laravel 应用初始化的生命周期已经结束,你可以在这里获取 Laravel 请求和会话数据
        // 调用 push 方法向客户端推送数据,fd 是客户端连接标识字段
        Log::info('WebSocket 连接建立:' . $request->fd);
        app('swoole')->wsTable->set('fd:' . $request->fd, ['value' => $request->fd]);
        $server->push($request->fd, 'Welcome to WebSocket Server built on LaravelS');
    }

    // 收到消息时触发
    public function onMessage(Server $server, Frame $frame)
    {
        foreach (app('swoole')->wsTable as $key => $row) {
            if (strpos($key, 'fd:') === 0 && $server->exist($row['value'])) {
                Log::info('Receive message from client: ' . $row['value']);
                // 调用 push 方法向客户端推送数据
                $server->push($frame->fd, 'This is a message sent from WebSocket Server at ' . date('Y-m-d H:i:s'));
            }
        }
    }
    
    ...

}

然后我们参考在 Laravel 中集成 Swoole 实现 WebSocket 服务器这篇教程从客户端向 WebSocket 服务器发起请求,即可在最新日志文件中看到相应的日志信息:

[2019-06-19 22:09:03] local.INFO: WebSocket 连接建立:1  
[2019-06-19 22:09:07] local.INFO: Receive message from client: 1

apiDoc 使用指南

Version:1.0 StartHTML:000000201 EndHTML:000036428 StartFragment:000010634 EndFragment:000036390 StartSelection:000010634 EndSelection:000036390 SourceURL:https://www.jianshu.com/p/34eac66b47e3 apiDoc 使用指南 – 简书

安装

  1. 安装node.js
  2. 安装apiDoc

npm install apidoc -g

配置

在你的项目根目录下新建apidoc.json文件,该文件描述了项目对外提供接口的概要信息如名称、版本、描述、文档打开时浏览器显示标题和接口缺省访问地址。
apidoc.json

{
  "name": "ServiceEbikeAPIs",
  "version": "3.1.0",
  "description": "车辆服务接口文档",
  "title": "ServiceEbikeAPIs",
  "url" : "http://cjl3.rokyinfo.net:7190/api-ebike"
}

使用样例

/**
 *
 * @apiDefine RkNotFoundException
 *
 * @apiError RkNotFoundException 找不到相关数据
 *
 * @apiErrorExample Error-Response:
 *     HTTP/1.1 404 Not Found
 *     {
 *       "error": {
 *           "code": 404,
 *           "msg": "",
 *           "path" ""
 *       }
 *     }
 *
 */

/**
 *
 * @api {get} /v3.1/ues/:sn/rt-info 获取设备上报实时信息
 * @apiVersion 3.1.0
 * @apiName GetUeRealTimeInfo
 * @apiGroup UE
 *
 * @apiHeader {String} Authorization 用户授权token
 * @apiHeader {String} firm 厂商编码
 * @apiHeaderExample {json} Header-Example:
 *     {
 *       "Authorization": "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOjM2NzgsImF1ZGllbmNlIjoid2ViIiwib3BlbkFJZCI6MTM2NywiY3JlYXRlZCI6MTUzMzg3OTM2ODA0Nywicm9sZXMiOiJVU0VSIiwiZXhwIjoxNTM0NDg0MTY4fQ.Gl5L-NpuwhjuPXFuhPax8ak5c64skjDTCBC64N_QdKQ2VT-zZeceuzXB9TqaYJuhkwNYEhrV3pUx1zhMWG7Org",
 *       "firm": "cnE="
 *     }
 *
 * @apiParam {String} sn 设备序列号
 *
 * @apiSuccess {String} sn 设备序列号
 * @apiSuccess {Number} status 设备状态
 * @apiSuccess {Number} soc 电池电量百分比
 * @apiSuccess {Number} voltage 电池电压
 * @apiSuccess {Number} current 电池电流
 * @apiSuccess {Number} temperature 电池温度
 * @apiSuccess {String} reportTime 上报时间(yyyy-MM-dd HH:mm:ss)
 *
 * @apiSuccessExample Success-Response:
 *     HTTP/1.1 200 OK
 *     {
 *       "sn": "P000000000",
 *       "status": 0,
 *       "soc": 80,
 *       "voltage": 60.0,
 *       "current": 10.0,
 *       "temperature": null,
 *       "reportTime": "2018-08-13 18:11:00"
 *     }
 *
 * @apiUse RkNotFoundException
 *
 */
@RequestMapping(value = "/{sn}/rt-info", method = RequestMethod.GET)
public UeRealTimeInfo getUeRealTimeInfo(@RequestHeader(Constants.HEADER_LOGIN_USER_KEY) long userId, @PathVariable("sn") String sn) {

    return ueService.getRealTimeInfo(sn);
}

@api

@api {method} path [title]

HTTP接口调用方法、路径及名称

@apiVersion

@apiVersion version

api 版本号

@apiName

@apiName name

api 名称

@apiGroup

@apiGroup name

api 分组

@apiHeader

@apiHeader [(group)] [{type}] [field=defaultValue] [description]

请求头参数

@apiParam

@apiParam [(group)] [{type}] [field=defaultValue] [description]

请求参数

@apiSuccess

@apiSuccess [(group)] [{type}] field [description]

返回数据描述

@apiSuccessExample

@apiSuccessExample [{type}] [title]
                   example

接口成功返回样例

@apiError

@apiError [(group)] [{type}] field [description]

接口失败描述

@apiErrorExample

@apiErrorExample [{type}] [title]
                 example

接口失败返回样例

@apiDefine

@apiDefine name [title]
                     [description]

类似于宏定义,可以被引用

@apiUse

@apiUse name

使用@apiDefine定义的描述

更详细的说明请参考官方文档http://apidocjs.com

生成文档

cd到apidoc.json所在路径(即项目根目录)执行如下命令即可

apidoc -i src/ -o apidoc/

执行成功后会生成apidoc文件夹如下图所示

图片

点开apidoc文件夹中index.html会发现已经生成的漂亮的api文档

图片

作者:袁志健
链接:https://www.jianshu.com/p/34eac66b47e3
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。