swoole的process模块创建和使用子进程

Version:1.0 StartHTML:000000210 EndHTML:000047154 StartFragment:000003232 EndFragment:000047098 StartSelection:000003301 EndSelection:000047086 SourceURL:https://www.cnblogs.com/jkko123/p/10918056.html swoole的process模块创建和使用子进程 – 怀素真 – 博客园

swoole中为我们提供了一个进程管理模块 Process,替换PHP的 pcntl 扩展,方便我们创建进程,管理进程,和进程间的通信。

swoole提供了2种进程间的通信:

1、基于 unix socket 的管道 pipe。

2、基于 sysvmsg 的消息队列。

我们可以通过 new swoole_process() 快速的创建一个进程,默认会创建一个 SOCK_DGRAM 类型的管道,用于进程间的通信,当然可以设置成其他类型,也可以不创建。

一、通过同步阻塞管道进行进程间通信?

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748<?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {//创建子进程//默认为每个子进程创建一个管道,如果不想创建设置$pipe_type参数为false//注意管道默认是同步阻塞,半双工,如果读取不到数据就会阻塞$worker = new swoole_process(function (swoole_process $worker) {//注意,如果主进程中不写数据write(),那么子进程这里read()就会阻塞$task = json_decode($worker->read(), true); //进行计算任务$tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;} echo '子进程 PID : ', $worker->pid, ' 计算 ', $task['start'], ' - ', $task['end'], ' 结果 : ', $tmp, PHP_EOL;//往管道中写入计算的结果$worker->write($tmp);//子进程退出$worker->exit();}); //保存子进程$worker_process[$i] = $worker; //启动子进程$worker->start();} //往每个子进程管道中投递任务for ($i = 0; $i < $worker_process_nums; $i++) {$worker_process[$i]->write(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),]));} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}});

二、通过 swoole_event_add 将管道设为异步,来进行通信?

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556<?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {$worker = new swoole_process(function ($worker) {//在子进程中给管道添加事件监听//底层会自动将该管道设置为非阻塞模式//参数二,是可读事件回调函数,表示管道可以读了swoole_event_add($worker->pipe, function ($pipe) use ($worker) {$task = json_decode($worker->read(), true); $tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;}echo "子进程 : {$worker->pid} 计算 {$task['start']} - {$task['end']} \n";//子进程把计算的结果,写入管道$worker->write($tmp);//注意,swoole_event_add与swoole_event_del要成对使用swoole_event_del($worker->pipe);//退出子进程$worker->exit();});}); $worker_process[$i] = $worker; //启动子进程$worker->start();} for ($i = 0; $i < $worker_process_nums; $i++) {$worker = $worker_process[$i]; $worker->write(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),])); //主进程中,监听子进程管道事件swoole_event_add($worker->pipe, function ($pipe) use ($worker) {$result = $worker->read();echo "子进程 : {$worker->pid} 计算结果 {$result} \n";swoole_event_del($worker->pipe);});} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}});

三、使用消息队列来完成进程间通信?

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152<?php $worker_process_nums = 5;$worker_process = []; for ($i = 0; $i < $worker_process_nums; $i++) {//注意,这里将参数$pipe_type设为false,表示不创建管道$worker = new swoole_process(function ($worker) {$task = json_decode($worker->pop(), true); $tmp = 0;for ($i = $task['start']; $i < $task['end']; $i++) {$tmp += $i;}echo "子进程 : {$worker->pid} 计算 {$task['start']} - {$task['end']} \n";$worker->push($tmp);$worker->exit();}, false, false); //使用消息队列,作为进程间的通信//注意,消息队列是共享的$worker->useQueue(); $worker_process[$i] = $worker; //启动子进程$worker->start();} for ($i = 0; $i < $worker_process_nums; $i++) {//只需用一个子进程发送消息即可,因为消息队列是共享的$worker_process[0]->push(json_encode(['start' => mt_rand(1, 10),'end' => mt_rand(50, 100),]));} //注意,这里要暂停,防止加入队列的任务,立刻被主进程读出来。sleep(1); for ($i = 0; $i < $worker_process_nums; $i++) {$result = $worker_process[0]->pop();echo "计算结果 : {$result} \n";} //父进程监听子进程退出信号,回收子进程,防止出现僵尸进程swoole_process::signal(SIGCHLD, function ($sig) {//必须为false,非阻塞模式while ($ret = swoole_process::wait(false)) {echo "子进程 PID : {$ret['pid']} 退出\n";}});

四、进程可以通过 signal 监听信号,和 alarm 设置定时器。

我们可以在父进程上设置监听信号,当子进程退出时,重新挂起子进程。

也可以设置定时器,通过 swoole_process::kill($pid, 0); 定时检测进程是否存活。?

1234567891011121314151617181920212223<?php //每隔1秒触发SIGALAM信号//注意,alarm不能和Timer同时使用swoole_process::alarm(1000 * 1000, 0); swoole_process::signal(SIGALRM, function ($signo) {static $cnt = 0;$cnt++;echo "时钟定时信号\n"; if ($cnt > 10) {//清除定时器swoole_process::alarm(-1);}}); swoole_process::signal(SIGINT, function ($signo) {echo "我被ctrl+c了\n"; //退出主进程,不然将一直无法正常退出exit(0);});

https://www.cnblogs.com/jkko123/p/10918056.html

版权声明:博主文章,可以不经博主允许随意转载,随意修改,知识是用来传播的。

php+swoole多线程的操作

多个任务同时执行

比如,我们要对已知的用户数据进行判断,是否需要发送邮件和短信,如果需要发送则发送。

不使用多进程时,我们首先判断是否发送邮件,如果需要则发送;然后再判断是否需要发送短信,如果需要则发送。如果发送邮件耗时2s,发送短信耗时2s,那么我们完成任务大概需要4s左右的时间。

如果我们使用多线程的话,可以开两个线程,一个用于处理邮件,一个用于处理短信,则耗时一共需要2s左右,处理时间缩短了一半。

以下是代码实例:

复制代码

<?php
/**
 * Created by PhpStorm.
 * User: cyw0413
 * Date: 2018/10/20
 * Time: 10:37
 */
$info = array(
    “sendmail”=>1,
    “mailto”=>”12345@qq.com”,
    “sendsms”=>1,
    “smsto”=>”123456”
);
echo “start:”.date(“Y-m-d H:i:s”).PHP_EOL;
$mail_process = new swoole_process(‘sendMail’,true);
$mail_process->start();
$sms_process = new swoole_process(‘sendSMS’,true);
$sms_process->start();
//主进程输出子进程范围内容
echo $mail_process->read();
echo PHP_EOL;
echo $sms_process->read();
echo PHP_EOL;
echo “end:”.date(“Y-m-d H:i:s”).PHP_EOL;
//并行函数
function sendMail(swoole_process $worker){
    global $info;
    if($info[‘sendmail’]==1){
        sleep(2);
        $worker->write(“send mail to “.$info[‘mailto’]);
    }
}
function sendSMS(swoole_process $worker){
    global $info;
    if($info[‘sendmail’]==1){
        sleep(2);
        $worker->write(“send sms to “.$info[‘smsto’]);
    }
}

复制代码

大任务划分成多个小任务

假设我们现在有一个通过curl抓取网页内容的需求,需要抓取10个网页,url地址通过数组读取,每个curl耗时2s。如果我们通过for循环来抓取这10个网页,需要耗时20s,使用多进程我们可以将任务划分成5份,分别由5个进程执行,每个进程抓取2个url,并发执行,共耗时4s,效率提高5倍。

以下是代码实例

复制代码

<?php
/**
 * Created by PhpStorm.
 * User: cyw0413
 * Date: 2018/10/20
 * Time: 10:51
 */
$url_arr = array();
for ($i=0;$i<10;$i++){
    $url_arr[] = “www.baidu.com?wd=”.$i;
}
echo “start:”.date(“Y-m-d H:i:s”).PHP_EOL;
$workers = array();
for ($i=0;$i<5;$i++){
    $process = new swoole_process(‘getContents’,true);
    $process->start();
    $process->write($i);
    $workers[] = $process;
}
//主进程数据结果
foreach ($workers as $process){
    echo $process->read();
    echo PHP_EOL;
}
echo “end:”.date(“Y-m-d H:i:s”).PHP_EOL;
function getContents(swoole_process $worker){
    $i = $worker->read();
    global $url_arr;
    $res1 = execCurl($url_arr[($i*2)]);
    $res2 = execCurl($url_arr[($i*2+1)]);
    echo $res1.PHP_EOL.$res2;
}
function execCurl($url){
    sleep(2);
    return “handle “.$url.” finished”;
}

复制代码

总结

以上两种情况,本质上都是将逻辑上没有先后关系的任务,用多个进程程并发执行,提高效率。

php机制本身不提供多线程的操作,ptcl扩展提供了php操作linux多进程的接口。

Swoole| Swoole 中 Process

Version:1.0 StartHTML:000000201 EndHTML:000101815 StartFragment:000009570 EndFragment:000101777 StartSelection:000009570 EndSelection:000101777 SourceURL:https://www.jianshu.com/p/4b6326cdaaa7 Swoole| Swoole 中 Process – 简书

date: 2018-1-8 20:56:08
title: Swoole| Swoole 中 Process

这篇 blog 折腾了很久才写出来, 问题主要还是在 理解 上. 有时候就是这样,

理解了之后就很简单, 不理解就很难; 知道了就很简单, 不知道往往就很难. 所以 stay hungry stay foolish stay young 真的很重要

本来计划开发 swoft 框架 中的 Process 模块, 所以需要对 swoole 的 Process 模块要有比较深入的了解才行. 不过根据 swoole 官方 wiki 的实践过程中, 一直有未理解的部分. 之前虽然也做过多次 多进程编程, 但是当真正需要进行框架开发的时候, 就会发现以前学到的知识不够全面, 无法指导整体的设计. 好在一直在坚持, 奉上现在理解的程度.

内容一览:

  • 进程相关基础操作: fork/exit/kill/wait
  • 进程相关高级操作: 主进程退出子进程干完活后也退出; 子进程异常退出主进程自动重启
  • 进程间通信(IPC) – 管道(pipe)
  • 进程间通信(IPC) – 消息队列(message queue)
  • swoole process 模块提供的更多功能

进程相关基础操作

进程是什么: 进程是运行者的程序

先来看看一个最简单的例子:

<?php
echo posix_getpid(); // 获取当前进程的 pid
swoole_set_process_name('swoole process master'); // 修改所在进程的进程名
sleep(100); // 模拟一个持续运行 100s 的程序, 这样就可以在进程中查看到它, 而不是运行完了就结束

通过 ps aux 查看进程:

未设置进程名

设置进程名

再来看看 swoole 中使用子进程的基础操作:

use Swoole\Process;

$process = new Process(function (Process $worker) {
    if (Process::kill($worker->pid, 0)) { // kill操作常用来杀死进程, 传入 0 可以用来检测进程是否存在
        $worker->exit(); // 退出子进程
    }
});
$process->start(); // 启动子进程
Process::wait(); // 回收退出的子进程
  • new Process(): 通过回调函数来设置子进程将要执行的逻辑
  • $process->start(): 调用 fork() 系统调用, 来生成子进程
  • Process::kill(): kill操作给进程发送信号, 常用来杀死进程, 传入 0 可以用来检测进程是否存在
  • Process::wait(): 调用 wait() 系统调用, 回收子进程, 如果不回收, 子进程会编程 僵尸进程, 浪费系统资源
  • $worker->exit(): 子进程主动退出

我在这里有一个疑问:

主进程的生命周期是怎么样的? 子进程的生命周期是怎么样的?

有这样一个疑问也来自于我之前的思维惯性: 理解一个事物时从事物的生命周期进行理解. 结合 进程是运行着的程序 来一起理解:

  • new Process(): 只有回调函数的逻辑会在进程中执行
  • 除此之外的代码都是在主进程中执行

进程相关高级操作

  • 主进程退出子进程干完活后也退出
  • 子进程异常退出主进程自动重启

<?php

use Swoole\Process;

class MyProcess1
{
    public $mpid = 0; // master pid, 即当前程序的进程ID
    public $works = []; // 记录子进程的 pid
    public $maxProcessNum = 1;
    public $newIndex = 0;

    public function __construct()
    {
        try {
            swoole_set_process_name(__CLASS__. ' : master');
            $this->mpid = posix_getpid();
            $this->run();
            $this->processWait();
        } catch (\Exception $e) {
            die('Error: '. $e->getMessage());
        }
    }

    public function run()
    {
        for ($i=0; $i<$this->maxProcessNum; $i++) {
            $this->createProcess();
        }
    }

    public function createProcess($index = null)
    {
        if (is_null($index)) {
            $index = $this->newIndex;
            $this->newIndex++;
        }
        $process = new Process(function (Process $worker) use($index) { // 子进程创建后需要执行的函数
            swoole_set_process_name(__CLASS__. ": worker $index");
            for ($j=0; $j<3; $j++) { // 模拟子进程执行耗时任务
                $this->checkMpid($worker);
                echo "msg: {$j}\n";
                sleep(1);
            }
        }, false, false); // 不重定向输入输出; 不使用管道
        $pid = $process->start();
        $this->works[$index] = $pid;
        return $pid;
    }

    // 主进程异常退出, 子进程工作完后退出
    public function checkMpid(Process $worker) // demo中使用的引用, 引用表示传的参数可以被改变, 由于传入 $worker 是 \Swoole\Process 对象, 所以不用使用 &
    {
        if (!Process::kill($this->mpid, 0)) { // 0 可以用来检测进程是否存在
            $worker->exit();
            $msg = "master process exited, worker {$worker->pid} also quit\n"; // 需要写入到日志中
            file_put_contents('process.log', $msg, FILE_APPEND); // todo: 这句话没有执行
        }
    }

    // 重启子进程
    public function rebootProcess($pid)
    {
        $index = array_search($pid, $this->works);
        if ($index !== false) {
            $newPid = $this->createProcess($index);
            echo "rebootProcess: {$index}={$pid}->{$newPid} Done\n";
            return;
        }
        throw new \Exception("rebootProcess error: no pid {$pid}");
    }

    // 自动重启子进程
    public function processWait()
    {
        while (1) {
            if (count($this->works)) {
                $ret = Process::wait(); // 子进程退出
                if ($ret) {
                    $this->rebootProcess($ret['pid']);
                }
            } else {
                break;
            }
        }
    }
}

new MyProcess1();

说明以下几点:

  • 子进程运行结束后就会退出, 通过 Process::wait() 检测到子进程退出信号执行自动重启, 子进程就会一直执行下去
  • 关于函数参数传 引用/指针, 一个很好的理解方式是: 参数可以被修改

运行并模拟主进程异常退出:

模拟主进程异常退出

输出

进程间通信(IPC) – 管道(pipe)

管道的几个关键词:

  • 半双工: 数据单向流动, 一端只读, 一端只写.
  • 同步 vs 异步: 默认为同步阻塞模式, 可以使用 swoole_event_add() 添加管道到 swoole 的 event loop 中, 实现异步IO
  • 管道类型(数据格式): SOCK_STREAM, 流式, 需要用户自己处理数据的封包/解包; SOCK_DGRAM, 数据报, 每次收发都是一次完整的数据包 (DGRAM/STREAM)

注意, swoole wiki – process->write() 中提到 SOCK_DGRAM 并不会乱序丢包

先来看一个简单的例子, php从shell管道中读取数据:

// get pip data
$fp = fopen('php://stdin', 'r');
if ($fp) {
    while ($line = fgets($fp, 4096)) {
        echo "php get pip data: ". $line;
    }
    fclose($fp);
}

从shell管道读取数据

swoole process中的管道很强大, 支持 子进程写, 主进程读 以及 主进程写, 子进程读:

use Swoole\Process;

// 子进程写, 父进程读
$process = new Process(function (Process $worker) {
    $worker->write("worker");
});
$process->start();
$msg = $process->read();
echo "from process: $msg", "\n";

// 父进程写, 子进程读
$process = new Process(function (Process $worker) {
    $msg = $worker->read();
    echo "from master: $msg", "\n";
});
$process->start();
$process->write('master');

使用管道多次读写

注意区分 $worker->write()$process->write(), 之前一直错误的以为这 2 个是相同的, 其实就是把 $process 误以为是子进程, 从而相当于 $process->write() 就是子进程写管道 — 其实这里是主进程内执行的逻辑, 是主进程写数据到管道, 供子进程读取

swoole中其他管道相关操作:

  • 异步IO

use Swoole\Process;
use Swoole\Event;

// 异步IO
$process = new Process(function (Process $worker) {
    $GLOBALS['worker'] = $worker;
    Event::add($worker->pipe, function (int $pipe) { // 使用 swoole_event_add 添加管道到异步IO
        /** @var Process $worker */
        $worker = $GLOBALS['worker'];
        $msg = $worker->read();
        echo "from master: $msg \n";
        $worker->write("hello master");
        sleep(2);
        $worker->exit(0);
    });
});
$process->start();
$process->write("master msg 1");
$msg = $process->read();
echo "from process: $msg \n";

异步IO

  • 设置超时

use Swoole\Process;

// 设置管道超时
$process = new Process(function (Process $worker) {
    sleep(5);
});
$process->start();
$process->setTimeout(0.5);
$ret = $process->read();
var_dump($ret);
var_dump(swoole_errno());

管道超时

插播一个趣事, @thinkpc 看完 2017北京PHP开发者年会, 就知道为啥会点赞了

  • 关闭管道

// 关闭管道: 默认值0->关闭读写 1->关闭写 2->关闭读
$process->close();

进程间通信(IPC) – 消息队列(message queue)

消息队列:

  • 一系列保存在内核中的消息链表
  • 有一个 msgKey, 可以通过此访问不同的消息队列
  • 有数据大小限制, 默认 8192, 可以通过内核修改
  • 阻塞 vs 非阻塞: 阻塞模式下 pop()空消息队列/push()满消息队列会阻塞, 非阻塞模式可以直接返回

swoole 中使用消息队列:

  • 通信模式: 默认为争抢模式, 无法将消息投递给指定子进程
  • 新建消息队列后, 主进程就可以使用
  • 消息队列不可和管道一起使用, 也无法使用 swoole event loop
  • 主进程中要调用 wait(), 否则子进程中调用 pop()/push() 会报错

use Swoole\Process;
$process = new Process(function (Process $worker) {
    // $worker->push('worker');
    echo "from master: ". $worker->pop(). "\n";
    sleep(2);
    // $worker->exit();
}, false, false); // 关闭管道
// 参数一为 msgKey, 这里是默认值
// 参数二为 通信模式, 默认值 2 表示争抢模式, 这里还加上了 非阻塞
$process->useQueue(ftok(__FILE__, 1), 2| Process::IPC_NOWAIT);
$process->push('hello1'); // 使用 useQueue 后, 主进程就可以读写消息队列了
$process->push('hello2');
echo "from woker: ". $process->pop(). "\n";
// echo "from woker: ". $process->pop(). "\n";
$process->start(); // 启动子进程
// 消息队列状态
var_dump($process->statQueue());
// 删除队列, 如果不调用则不会在程序结束时清楚数据, 下次使用相同 msgKey 时还可以访问数据
$process->freeQueue();
var_dump(Process::wait()); // 要调用 wait(), 否则子进程中 push()/pop() 会报错

消息队列

swoole process 模块提供的更多功能

  • swoole_set_process_name(): 修改进程名, 不兼容 mac
  • swoole_process->exec(string $execfile, array $args) 执行外部程序

参数 $execfile 需要使用可执行文件的绝对路径, 参数 args 为参数数组

// 比如 python test.py 123
swoole_process->exec('/usr/bin/python', ['test.py', 123]);

// 更复杂的例子
swoole_process->exec(('/usr/local/bin/php', ['/var/www/project/yii-best-practice/cli/yii', 't/index', '-m=123', 'abc', 'xyz']);

// 父进程 exec 进程进行管道通信
use Swoole\Process;
$process = new Process(function (Process $worker) {
    $worker->exec('/bin/echo', ['hello']);
    $worker->write('hello');
}, true); // 需要启用标准输入输出重定向
$process->start();
echo "from exec: ". $process->read(). "\n";

父进程与exec进程通过管道通信

  • \Swoole\Process::kill($pid, $signo = SIGTERM): 向指定进程发送信号, 默认是终止进程, 传 0 可检测进程是否存在
  • \Swoole\Process::wait(): 回收子进程, 如果主进程不调用此方法, 子进程会变成 僵尸进程, 浪费系统资源
  • \Swoole\Process::signal(): 异步信号监听

use Swoole\Process;

// 异步信号监听 + wait
Process::signal(SIGCHLD, function ($signal) { // 监听子进程退出信号
    // 可能同时有多个子进程退出, 所以要while循环
    while ($ret = Process::wait(false)) { // false 表示不阻塞
        var_dump($ret);
    }
});

\Swoole\Process::daemon(): 将当前进程变为一个守护进程

use Swoole\Process;

// daemon
Process::daemon();
swoole_set_process_name('test daemon process');
sleep(100);

daemon-守护进程

  • \Swoole\Process::alarm(): 高精度定时器(微秒级), 对 setitimer 系统调用的封装, 可以配合 \Swoole\Process::signal() / pcntl_signal 使用

注意不可和 \Swoole\Timer 同时使用

// signal + alarm
// 第一个参数表示时间, 单位 us, -1 表示清除定时器
// 第二个参数表示类型 0->真实时间->SIGALAM 1->cpu时间->SIGVTALAM 2->用户态+内核态时间->SIGPROF
Process::alarm(100*1000); // 100ms
Process::signal(SIGALRM, function ($signal) {
    static $i = 0;
    echo "#$i \t alarm \n";
    $i++;
    if ($i>20) {
        Process::alarm(-1); // -1 表示清除
    }
});

alarm

  • \Swoole\Process::setaffinity(): 设置CPU亲和, 即将进程绑定到指定CPU核上

传值范围: [0, swoole_cpu_num())
CPU亲和: CPU的速度远远高于IO的速度, 所以CPU有多级缓存来解决IO等待的问题, 绑定指定CPU, 更容易命中CPU缓存

写在最后

资源推荐:

todo:

  • 使用输入输出重定向
  • 管道类型为 SOCK_STREAM 时的情况, 是否需要 封包/解包 处理, 即 swoole wiki – process->write() 中提到的 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并
  • 多进程 + 异步IO 的注意事项

能理解 因为子进程会继承父进程的内存和IO句柄 这个会产生的影响, 但是给的示例并没有说明这个问题

use Swoole\Process;
use Swoole\Event;

// 多个子进程 + 异步IO
$workers = [];
$workerNum = 3;
for ($i=0; $i<$workerNum; $i++) {
    $process = new Process(function (Process $worker) {
        $worker->write($worker->pid);
        echo "worker: {$worker->pid} \n";
    });
    $pid = $process->start();
    $workers[$pid] = $process;
    // Event::add($process->pipe, function (int $pipe) use ($process) {
    //  $data = $process->read();
    //  echo "recv: $data \n";
    // });
}
foreach ($workers as $worker) {
    Event::add($worker->pipe, function (int $pipe) use ($worker) {
        $data = $worker->read();
        echo "recv: $data \n";
    });
}

多进程异步IO

作者:daydaygo
链接:https://www.jianshu.com/p/4b6326cdaaa7
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

通过 Swoole\Table 实现 Swoole 多进程数据共享

第三方存储媒介

前面我们介绍了基于 Swoole 的 ProcessProcess\Pool 模块在 PHP 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 Swoole 实现的多进程间共享数据,需要借助第三方存储媒介实现:

  • 数据库:MySQL、MongoDB
  • 缓存:Redis、Memcached
  • 磁盘文件

但是这也会引入新的问题,多进程同时操作一条记录或一个文件存在并发访问问题,以数据库操作为例,两个进程可能会同时读取一条数据,或者一个进程对某条记录进行更新处理时,另一个进程也来读取这条记录并进行操作,会导致最终结果数据与预期不一致的情况,这个时候,我们就需要引入锁的概念,当一个进程(比如进程A)对某个记录进行写操作时,对该记录加锁,这样其它进程就无法操作该条记录, 直到进程 A 事务提交再释放这个锁,让其他进程可以进行操作。

内存共享

PHP 相关扩展

对于单机操作来说,除了这些第三方存储媒介之外,还可以通过共享内存的方式实现进程间数据读写操作,有多个 PHP 扩展可以支持共享内存数据操作:

  • Semaphore 扩展:可通过该扩展包提供的 shm_get_varshm_put_var 函数实现内存共享数据的读写操作;
  • Shmop 扩展:可通过该扩展包提供的 shmop_readshmop_write 函数实现内存共享数据的读写操作;
  • APCu(APC User Cache)扩展:可通过该扩展包提供的 apc_fetchapc_store 实现内存共享数据的读写操作。

Swoole Table

但是上述扩展要么不支持锁,要么高并发时性能比较差,所以 Swoole 自己实现了一个共享内存读写工具 —— Swoole\Table,该工具是一个基于共享内存和锁实现的高性能并发数据结构,可用于解决多进程/多线程数据共享和同步加锁问题:

  • 性能强悍,单线程每秒可读写200万次;
  • 应用代码无需加锁,内置行锁自旋锁,所有操作均是多线程/多进程安全,用户层完全不需要考虑数据同步问题;
  • 支持多进程,可用于多进程之间共享数据;
  • 使用行锁,而不是全局锁,仅当 2 个进程在同一 CPU 时间,并发读取同一条数据才会进行发生抢锁。

Swoole\Table 支持以 Key-Value 方式读写,使用起来非常简单:

<?php

// 初始化一个容量为 1024 的 Swoole Table
$table = new \Swoole\Table(1024);
// 在 Table 中新增 id 列
$table->column('id', \Swoole\Table::TYPE_INT);
// 在 Table 中新增 name 列,长度为 50
$table->column('name', \Swoole\Table::TYPE_STRING, 10);
// 在 Table 中新泽 score 列
$table->column('score', \Swoole\Table::TYPE_FLOAT);
// 创建这个 Swoole Table
$table->create();


// 设置 Key-Value 值
$table->set('student-1', ['id' => 1, 'name' => '学小君', 'score' => 80]);
$table->set('student-2', ['id' => 2, 'name' => '学院君', 'score' => 90]);

// 如果指定 Key 值存在则打印对应 Value 值
if ($table->exist('student-1')) {
    echo "Student-" . $table->get('student-1', 'id') . ':' . $table->get('student-1', 'name').":".
        $table->get('student-1', 'score') . "\n";
}

// 自增操作
$table->incr('student-2', 'score', 5);
// 自减操作
$table->decr('student-2', 'score', 5);

// 表中总记录数
$count = $table->count();

// 删除指定表记录
$table->del('student-1');

此外 Swoole\Table 类还实现了迭代器接口,支持通过 foreach 进行遍历。

在 Laravel 中使用 Swoole\Table

如果要在 Laravel 中集成 Swoole 使用 Swoole\Table,以 LaravelS 扩展包为例,首先要在配置文件 config/laravels.php 中定义 swoole_tables 配置项:

'swoole_tables'            => [
    'ws' => [ // 表名,会加上 Table 后缀,比如这里是 wsTable
        'size'   => 102400, //  表容量
        'column' => [ // 表字段,字段名为 value
            ['name' => 'value', 'type' => \Swoole\Table::TYPE_INT, 'size' => 8],
        ],
    ],
    ... // 还可以定义其它表
],

然后我们可以在代码中通过swoole实例上的wsTable属性访问 SwooleTable:

class WebSocketService implements WebSocketHandlerInterface
{
    ...

    // 连接建立时触发
    public function onOpen(Server $server, Request $request)
    {
        // 在触发 WebSocket 连接建立事件之前,Laravel 应用初始化的生命周期已经结束,你可以在这里获取 Laravel 请求和会话数据
        // 调用 push 方法向客户端推送数据,fd 是客户端连接标识字段
        Log::info('WebSocket 连接建立:' . $request->fd);
        app('swoole')->wsTable->set('fd:' . $request->fd, ['value' => $request->fd]);
        $server->push($request->fd, 'Welcome to WebSocket Server built on LaravelS');
    }

    // 收到消息时触发
    public function onMessage(Server $server, Frame $frame)
    {
        foreach (app('swoole')->wsTable as $key => $row) {
            if (strpos($key, 'fd:') === 0 && $server->exist($row['value'])) {
                Log::info('Receive message from client: ' . $row['value']);
                // 调用 push 方法向客户端推送数据
                $server->push($frame->fd, 'This is a message sent from WebSocket Server at ' . date('Y-m-d H:i:s'));
            }
        }
    }
    
    ...

}

然后我们参考在 Laravel 中集成 Swoole 实现 WebSocket 服务器这篇教程从客户端向 WebSocket 服务器发起请求,即可在最新日志文件中看到相应的日志信息:

[2019-06-19 22:09:03] local.INFO: WebSocket 连接建立:1  
[2019-06-19 22:09:07] local.INFO: Receive message from client: 1

ThinkPHP 事务处理 (事务回滚) 、异常处理

 使用事务处理的话,需要数据库引擎支持事务处理。比如 MySQL 的 MyISAM 不支持事务处理,需要使用 InnoDB 引擎。
 使用 transaction 方法操作数据库事务,当发生异常会自动回滚,例如:

 $trans_result = true;
                $trans = M();
                $trans->startTrans();   // 开启事务

                try {   // 异常处理
                    // 更新实施
                    $busbidList = M("busbid")->where($map)->select();
                    foreach($busbidList as $k => $v) {
                        $map['id'] = $busbidList[$k]['id'];
                        $result = M('busbid')->where($map)->data($data)->save();
                        if ($result === false) {
                            throw new Exception(“错误原因”);
                        }
                    }
                } catch (Exception $ex) {
                    $trans_result = false;
                    // 记录日志
                    Log::record("== xxx更新失败 ==", 'DEBUG'); 
                    Log::record($ex->getMessage(), 'DEBUG');
                }

                if ($trans_result === false) {
                    $trans->rollback();
                    // 更新失败
                    $array['status'] = 0;
                } else {
                    $trans->commit();
                    // 更新成功
                    $array['status'] = 1;
                }

 

 


方式二:

  
复制代码
M()->startTrans();  // 开启事务

if(操作失败) {
    M()->rollback();  // 回滚

}else {
    M()->commit();  // 提交
}
复制代码

方式三:

复制代码
1.引用TP5的think\Db类:
use think\Db;

2.下面为实现代码:

Db::startTrans();

//启动事务

try {

  这里写SQL语句

  Db::commit(); //提交事务

} catch (\PDOException $e) {

  Db::rollback(); //回滚事务

}
复制代码

tp5中的事务处理

例子1:
function shiwu(){
//开启事务
$this->startTrans();
// 进行相关的业务逻辑操作
$data[‘video_status’]=1;
$video_id=$this->insertGetId($data); // 保存用户信息
if ($video_id){
// 提交事务
$this->commit();
return $video_id;
}else{
// 事务回滚
$this->rollback();
}
}
123456789101112131415
例子2:
//事务处理
public function chuli(){
db::startTrans();
$res = db(‘blog’)->delete(’14’);
$res1 = db(‘blog’)->delete(’13’);
if ($res && $res1){
// 提交事务
db::commit();
echo “1”;
}else{
// 事务回滚
db::rollback();
echo “2”;
}

}

12345678910111213141516
例子3:
public function down(){
$user = session(‘usernames’);
if(isset($user)){
db::startTrans();//开启事务处理
$res1 = db(”)->del();
$res2 = db(”)->del();
if($res1&&$res2){
db::commit();
return json([‘status’=>’1′,’msg’=>’金币扣除5!’]);
}else{
db::rollback();
return json([‘status’=>’0′,’msg’=>’数据解析异常’]);
}

}else{
return json([‘status’=>’0′,’msg’=>’请先登录!’]);
}
}
————————————————
版权声明:本文为CSDN博主「梦缘雪痕」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_43181190/article/details/86698499

php swoole多进程/多线程例子

swoole的多线程其实就是多进程,进程创建太多切换的开销很大,如果能用上pthreads建议用pthreads,因为我用的是php7nts版本没办法用pthreads

swoole实例如下:

<?php
/**

  • 创建多进程
    */
    $worker_num = 6; // 默认进程数
    $workers = []; // 进程保存
    $redirect_stdout = false; // 重定向输出 ; 这个参数用途等会我们看效果
    for($i = 0; $i < $worker_num; $i++){
    $process = new swoole_process(‘callback_function’, $redirect_stdout); // 启用消息队列 int $msgkey = 0, int $mode = 2
    $process->useQueue(0, 2);
    $pid = $process->start(); // 管道写入内容
    $process->write(‘index:’.$i); $process->push(‘进程的消息队列内容’);
    // 将每一个进程的句柄存起来
    $workers[$pid] = $process;
    }

/**

  • 子进程回调
  • @param swoole_process $worker [description]
  • @return [type] [description]
    */
    function callback_function(swoole_process $worker)
    {
    $recv = $worker->pop();
    echo “子输出主内容: {$recv}”.PHP_EOL;
    //get guandao content
    $recv = $worker->read();
    $result = doTask(); echo PHP_EOL.$result.’===’.$worker->pid.’===’.$recv; $worker->exit(0);
    }

/**

  • 监控/回收子进程
    */
    while(1){
    $ret = swoole_process::wait();
    if ($ret){// $ret 是个数组 code是进程退出状态码,
    $pid = $ret[‘pid’];
    echo PHP_EOL.”Worker Exit, PID=” . $pid . PHP_EOL;
    }else{
    break;
    }
    }

/**

  • doTask
  • @return [type] [description]
    */
    function doTask()
    {
    sleep(2);
    return true;
    }
    ————————————————
    版权声明:本文为CSDN博主「fangdong88」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/fangdong88/article/details/78050135

PHP+swoole实现简单多人在线聊天群发

https://www.jb51.net/article/78316.htm

这篇文章主要介绍了PHP+swoole实现简单多人在线聊天群发 的相关资料,需要的朋友可以参考下

java

由于本文的能力有限,有好多聊天逻辑的细节没有实现,只实现了群发,具体代码如下所示:

php代码:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576$serv = new swoole_websocket_server("127.0.0.1",3999);//服务的基本设置$serv->set(array('worker_num' => 2,'reactor_num'=>8,'task_worker_num'=>1,'dispatch_mode' => 2,'debug_mode'=> 1,'daemonize' => true,'log_file' => __DIR__.'/log/webs_swoole.log','heartbeat_check_interval' => 60,'heartbeat_idle_time' => 600,));$serv->on('connect', function ($serv,$fd){// echo "client:$fd Connect.".PHP_EOL;});//测试receive$serv->on("receive",function(swoole_server $serv,$fd,$from_id,$data){// echo "receive#{$from_id}: receive $data ".PHP_EOL;});$serv->on('open', function($server, $req) {// echo "server#{$server->worker_pid}: handshake success with fd#{$req->fd}".PHP_EOL;;// echo PHP_EOL;});$serv->on('message',function($server,$frame) {// echo "message: ".$frame->data.PHP_EOL;$msg=json_decode($frame->data,true);switch ($msg['type']){case 'login':$server->push($frame->fd,"欢迎欢迎~");break;default:break;}$msg['fd']=$frame->fd;$server->task($msg);});$serv->on("workerstart",function($server,$workerid){// echo "workerstart: ".$workerid.PHP_EOL;// echo PHP_EOL;});$serv->on("task","on_task");$serv->on("finish",function($serv,$task_id,$data){return ;});$serv->on('close', function($server,$fd,$from_id) {// echo "connection close: ".$fd.PHP_EOL;// echo PHP_EOL;});$serv->start();function on_task($serv,$task_id,$from_id,$data) {switch ($data['type']){case 'login':$send_msg="说:我来了~";break;default:$send_msg="说:{$data['msg']['speak']}";break;}foreach ($serv->connections as $conn){if ($conn!=$data['fd']){if (strpos($data['msg']['name'],"游客")===0){$name=$data['msg']['name']."_".$data['fd'];}else{$name=$data['msg']['name'];}}else{$name="我";}$serv->push($conn,$name.$send_msg);}return;}function on_finish($serv,$task_id,$data){return true;}

前端代码:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"><html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>WebSocket测试</title> <script language="javascript"type="text/javascript" src="jquery-1.12.0.min.js"> </script></head><body><h2>WebSocket Test</h2> 昵称:<input type="text" id="name" size="5" value="游客"/> <input type="text" id="content"> <button onclick="speak_to_all()">发送</button><br/><br/><textarea id="message" style="overflow-x:hidden" rows="10" cols="50"></textarea> <div id="output"></div></body> <script language="javascript"type="text/javascript"> var wsUri ="ws://127.0.0.1:3999/"; var output; function init() { output = document.getElementById("output"); testWebSocket();}function testWebSocket() { websocket = new WebSocket(wsUri); websocket.onopen = function(evt) { onOpen(evt) }; websocket.onclose = function(evt) { onClose(evt) }; websocket.onmessage = function(evt) { onMessage(evt) }; websocket.onerror = function(evt) { onError(evt) }; }function get_speak_msg(){var name=document.getElementById("name").value;var speak=document.getElementById("content").value;var json_msg='{"name":"'+name+'","speak":\"'+speak+'"}';return json_msg;}function pack_msg(type,msg){return '{"type":"'+type+'","msg":'+msg+'}';}function onOpen(evt) {append_speak("已经联通服务器.........");speak_msg=get_speak_msg();send_msg=pack_msg("login",speak_msg);doSend(send_msg);}function onClose(evt) { append_speak("俺老孙去也!");} function onMessage(evt) {append_speak(evt.data);}function onError(evt) {alert(evt.data);}function doSend(message) { websocket.send(message);}function append_speak(new_msg){document.getElementById("message").value=document.getElementById("message").value+new_msg+"\n";document.getElementById('message').scrollTop = document.getElementById('message').scrollHeight;}function speak_to_all(){send_msg=pack_msg("speak",get_speak_msg());if(document.getElementById("content").value==""){return;}doSend(send_msg);document.getElementById("content").value="";}init();</script></html>

PHP 并发扣款,保证数据一致性(悲观锁和乐观锁)

PHP 并发扣款,保证数据一致性(悲观锁和乐观锁)

业务场景分析

用户购买商品的逻辑中,需要对用户钱包的余额进行查询和扣款

异常:如果同一用户并发执行多个业务进行” 查询 + 扣款” 的业务中有一定概率出现数据不一致

Tips:如果没有做限制单一接口请求频率,用户使用并发请求的手段也有概率出现数据不一致

扣款场景

Step1: 从数据库查询用户钱包余额

SELECT balance FROM user_wallet WHERE uid = $uid;+---------+| balance |+---------+| 100     |+---------+1 row in set (0.02 sec)

Step2: 业务逻辑

Tips: 文章分享处理同一用户并发扣款一致性,检查库存啥的逻辑略过

1. 查询商品价格,比如 70 元
2. 商品价格对比余额是否足够,足够时进行扣款提交订单逻辑

if(goodsPrice <= userBalance) {    $newUserBalance = userBalance - goodsPrice;  }else {    throw new UserWalletException(['msg' => '用户余额不足']);}

Step3: 将数据库的余额进行修改

UPDATE user_wallet SET balance=$newUserBalance WHERE uid = $uid

在没有并发的情况下,这个流程没有任何问题,原有余额 100,购买 70 元的商品,剩余 30 元

异常场景

Step1: 用户并发购买业务 A 和业务 B(不同实例 / 服务),一定概率并行查询余额是 100

step1

Step2: 业务 A 和业务 B 分别扣款逻辑处理,业务 A 商品 70 结果余额 30,业务 B 商品 80 结果余额 20

step1

Step3:

1 业务 A 先进行修改,修改余额为 30

step1

2 业务 A 后进行修改,修改余额为 20

step1

此时异常出现了,原余额 100 元,业务 A 和业务 B 的商品价格总和 150 元(70+80)都购买成功且余额还剩 20 元。

异常点:业务 A 和业务 B 并行查询余额为 100

解决方案

:lock:

悲观锁

使用 Redis 悲观锁,例如抢到一个 KEY 才能继续操作,否则禁止操作

封装了一个开箱即用的 RedisLock

<?php use Ar414\RedisLock; $redis = new \Redis();$redis->connect('127.0.0.1','6379'); $lockTimeOut = 5;$redisLock = new RedisLock($redis,$lockTimeOut); $lockKey    = 'lock:user:wallet:uid:1001';$lockExpire = $redisLock->getLock($lockKey); if($lockExpire) {    try {        //select user wallet balance for uid        $userBalance = 100;        //select goods price for goods_id        $goodsPrice = 80;         if($userBalance >= $goodsPrice) {            $newUserBalance = $userBalance - $goodsPrice;            //TODO set user balance in db        }else {            throw new Exception('user balance insufficient');        }        $redisLock->releaseLock($lockKey,$lockExpire);    } catch (\Throwable $throwable) {        $redisLock->releaseLock($lockKey,$lockExpire);        throw new Exception('Busy network');    }}

乐观锁

使用 CAS(Compare And Set)

在 set 写回的时候,加上初始状态的条件 compare, 只有初始状态不变的时候才允许 set 写回成功,保证数据一致性的方法

将:

UPDATE user_wallet SET balance=$newUserBalance WHERE uid = $uid

改为:

UPDATE user_wallet SET balance=$newUserBalance WHERE uid = $uid AND balance = $oldUserBalance

这样的话并发操作时只有一个是执行成功的,根据 affect rows 是否为 1 判断是否成功

结语

  • 解决方案有很多,这只是其中一种解决方案
  • 使用 Redis 悲观锁的方案会降低吞吐量

php 高并发

https://blog.csdn.net/wx_it/article/details/105827491

php 事务处理transaction

MySQL 事务主要用于处理操作量大,复杂度高的数据。比如说,在人员管理系统中,你删除一个人员,你即需要删除人员的基本资料,也要删除和该人员相关的信息,如信箱,文章等等,这样,这些数据库操作语句就构成一个事务!

  • 在MySQL中只有使用了Innodb数据库引擎的数据库或表才支持事务
  • 事务处理可以用来维护数据库的完整性,保证成批的SQL语句要么全部执行,要么全部不执行
  • 事务用来管理insert,update,delete语句

一般来说,事务是必须满足4个条件(ACID): Atomicity(原子性)、Consistency(稳定性)、Isolation(隔离性)、Durability(持久性)

  • 1、事务的原子性:一组事务,要么成功;要么撤回。
  • 2、稳定性 : 有非法数据(外键约束之类),事务撤回。
  • 3、隔离性:事务独立运行。一个事务处理后的结果,影响了其他事务,那么其他事务会撤回。事务的100%隔离,需要牺牲速度。
  • 4、持久性:一个事务一旦被提交,它对数据库中数据的改变就是永久的,接下来即使数据库发生故障也不应该对其有任何影响。

执行下面的程序

复制代码
$mysqli = new mysqli('localhost','root','mayi1991','mysqldemo');
if($mysqli->connect_error){
    die('数据库连接错误'.$mysqli->connect_error);
}

$sql1 = "update account set balance = balance - 2 where id = 1";
//这里故意写错指令中的balance1属性
$sql2 = "update account set balance1 = balance + 2 where id = 2";        
$result1 = $mysqli->query($sql1);
$result2 = $mysqli->query($sql2);

if(!$result1 || !$result2){
    die('操作错误'.$mysqli->error);
}else{
    die('操作成功');
}
$mysqli->close();
复制代码

虽然上面的代码有报错,但是在数据库中,id=1的balance已经改变;这样就会出现问题;

我们要的同时改变,如果有一个出错,就不改变;这个时候,我们就需要“事务控制”来保证“一致性”;

我们需要用到的方法autocommit()  commit();看下面的代码

复制代码
$mysqli = new mysqli('localhost','root','mayi1991','mysqldemo');
if($mysqli->connect_error){
    die('数据库连接错误'.$mysqli->connect_error);
}

//关闭数据库自动提交
$mysqli->autocommit(false);

$sql1 = "update account set balance = balance - 2 where id = 1";
//这里故意写错属性balance1
$sql2 = "update account set balance1 = balance + 2 where id = 2";
$result1 = $mysqli->query($sql1);
$result2 = $mysqli->query($sql2);

if(!$result1 || !$result2){
    die('操作错误'.$mysqli->error);
    $mysqli->rollback();    //事务回退
}else{
    //操作全部正确后再提交
    $mysqli->commit();
}
$mysqli->close();
复制代码

首先利用autocommit(false)方法,关闭数据库自动提交,然后当操作语句全部成功后,commit()提交到数据库;

如果操作失败,我们用rollback()方法回退。