PHP多线程解决方案

一、php模拟实现多线程的三种方法

1、linux下的php多线程

下面所讲的东西是源自php的pcntl_fork函数.因为这个函数依赖操作系统fork的实现,所以本文所讲的东西只适用于linux/unix。那么先看看这个函数的用法吧.php手册上是这么说的:

<?php
$pid = pcntl_fork();
if ($pid == -1) {
         die('could not fork');
} else if ($pid) {
         // we are the parent
         pcntl_wait($status); //Protect against Zombie children
} else {
         // we are the child
}
?>

通过pcntl_fork创建一个子进程,如果返回值是-1的话,那么说明子进程创建失败.创建成功的进程id会返回给父进程,0返回给子进程.不好理解吧,所以应该这样写:

<?php
$pid = pcntl_fork();
if($pid == -1){
         //创建失败咱就退出呗,没啥好说的
         die('could not fork');
}
else{
        if($pid){
                //从这里开始写的代码是父进程的,因为写的是系统程序,记得退出的时候给个返回值
                exit(0);
        }
        else{
                //从这里开始写的代码都是在新的进程里执行的,同样正常退出的话,最好也给一个返回值
                exit(0);
        }
}
?>

这样一改好理解多了,如果你父进程希望知道子进程正常退出的话,可以加上前面的pcntl_wait。

2.通过stream_socket_client 方式

function sendStream() { 
    $english_format_number = number_format($number, 4, '.', ''); 
  
    echo $english_format_number;  
    exit(); 
    $timeout = 10; 
    $result = array(); 
    $sockets = array(); 
    $convenient_read_block = 8192; 
    $host = "test.local.com"; 
    $sql = "select waybill_id,order_id from xm_waybill where status>40 order by update_time desc limit 1 ";  
    $data = Yii::app()->db->createCommand($sql)->queryAll(); 
    $id = 0; 
  
    foreach ($data as $k => $v) { 
      if ($k % 2 == 0) { 
        $send_data[$k]['body'] = NoticeOrder::getSendData($v['waybill_id']); 
  
      } else { 
        $send_data[$k]['body'] = array($v['order_id'] => array('extra' => 16));  
      }  
      $data = json_encode($send_data[$k]['body']); 
      $s = stream_socket_client($host . ":80", $errno, $errstr, $timeout, STREAM_CLIENT_ASYNC_CONNECT | STREAM_CLIENT_CONNECT); 
      if ($s) {  
        $sockets[$id++] = $s; 
        $http_message = "GET /php/test.php?data=" . $data . " HTTP/1.0\r\nHost:" . $host . "\r\n\r\n";  
        fwrite($s, $http_message); 
      } else {  
        echo "Stream " . $id . " failed to open correctly."; 
      }  
    } 
  
    while (count($sockets)) { 
  
      $read = $sockets; 
  
      stream_select($read, $w = null, $e = null, $timeout); 
       if (count($read)) {  
        /* stream_select generally shuffles $read, so we need to 
         compute from which socket(s) we're reading. */
        foreach ($read as $r) { 
  
          $id = array_search($r, $sockets); 
          $data = fread($r, $convenient_read_block); 
          if (strlen($data) == 0) { 
            echo "Stream " . $id . " closes at " . date('h:i:s') . ".<br>  "; 
            fclose($r); 
             unset($sockets[$id]); 
          } else { 
            $result[$id] = $data; 
          } 
        } 
      } else {  
        /* A time-out means that *all* streams have failed 
         to receive a response. */
        echo "Time-out!\n"; 
        break; 
      }  
    }  
    print_r($result); 
  
  }

3、通过多进程代替多线程

function daemon($func_name,$args,$number){ 
  while(true){ 
    $pid=pcntl_fork(); 
    if($pid==-1){ 
      echo "fork process fail"; 
      exit(); 
    }elseif($pid){//创建的子进程 
  
      static $num=0; 
      $num++; 
      if($num>=$number){ 
        //当进程数量达到一定数量时候,就对子进程进行回收。 
        pcntl_wait($status); 
  
        $num--; 
      }  
    }else{ //为0 则代表是子进程创建的,则直接进入工作状态 
  
      if(function_exists($func_name)){ 
        while (true) { 
          $ppid=posix_getpid(); 
          var_dump($ppid); 
          call_user_func_array($func_name,$args); 
          sleep(2); 
        } 
      }else{ 
        echo "function is not exists"; 
      } 
      exit();   
    } 
  } 
}  
function worker($args){  
  //do something 
  
}  
daemon('worker',array(1),2); 

二、真正实现php多线程的方法

php真正的多线程实现方式,通过安装php的扩展 pthread 可以做到。

点此下载 但是这个下载的是 版本3 也就是php 7 才能用的,我们需要使的是 版本2 

php扩展

然后刷新的页面如下,拖到最底部:

php多线程
下载文件

下一页找到版本2的

下载下来,这个v2 才是php5才可以使用的

下载下来,安装:

或者,您直接这样下载:

cd /tools  
   wget https://github.com/krakjoe/pthreads/archive/v2.0.10.zip  
   unzip   v2.0.10.zip  
   cd pthreads-2.0.10  
   /usr/local/php/bin/phpize  
   ./configure --with-php-config=/usr/local/php/bin/php-config    
   make  
   make install

注意:您的php 在编译的时候需要开启 –enable-maintainer-zts

./configure --prefix=/usr/local/php --disable-fileinfo   --enable-fpm --with-config-file-path=/etc --with-config-file-scan-dir=/etc/php.d --with-openssl --with-zlib --with-curl --enable-ftp --with-gd --with-xmlrpc  --with-jpeg-dir --with-png-dir --with-freetype-dir --enable-gd-native-ttf --enable-mbstring --with-mcrypt=/usr/local/libmcrypt --enable-zip --with-mysql=/usr/local/mysql --without-pear --enable-maintainer-zts 
vim /etc/php.ini 
添加
extension=pthreads.so
重启php  
/etc/init.d/php-fpm restart

在php中判断一个字符串包含另一个字符串

方法一:用php的strpos() 函数判断字符串中是否包含某字符串的方法

if(strpos('Longway','way') !== false){ 
 echo '包含way';
}else{
 echo '不包含way';
}

方法二:使用了explode

用explode进行判断PHP判断字符串的包含代码如下:

<?php
  $url = "001a.gif";
  $str = "a";
  $con = explode($str,$url);
  if (count($con)>1):
  echo $url."中包含".$str;
  else:
  echo $url."中没有包含".$str; 
  endif;
?>

方法三:strstr

strstr() 函数搜索一个字符串在另一个字符串中的第一次出现。
该函数返回字符串的其余部分(从匹配点)。如果未找到所搜索的字符串,则返回 false。

代码如下:

<?php
  /*如手册上的举例*/
  $email = 'user@example.com';
  $domain = strstr($email, '@');
  echo $domain;
?>

方法四:stristr

stristr() 函数查找字符串在另一个字符串中第一次出现的位置。
如果成功,则返回字符串的其余部分(从匹配点)。如果没有找到该字符串,则返回 false。

它和strstr的使用方法完全一样.唯一的区别是stristr不区分大小写.

多线程编程 – PHP 实现

多线程

线程

首先说下线程:

线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务.

使用多线程主要是因为它在执行效率上有很大优势。由于线程是操作系统能够进行调度的最小单位

  • 一个多线程程序比单线程程序被操作系统调度的概率更大,所以多线程程序一般会比单线程程序更高效;
  • 多线程程序的多个线程可以在多核 CPU 的多个核心同时运行,可以将完全发挥机器多核的优势;

同时对比多进程程序,多线程有以下特点:

  • 线程的创建和切换的系统开销都比进程要小,所以一定程度上会比多进程更高效;
  • 线程天生的共享内存空间,线程间的通信更简单,避免了进程IPC引入新的复杂度。

适用场景

多线程的优化是很多,可是无脑使用多线程并不能提升程序的执行效率,因为线程的创建和销毁、上下文切换、线程同步等也是有性能损耗的,耗费时间可能比顺序执行的代码还多。如:

sumSmall是一个从1累加到50000的函数。

上图是在主线程内执行了三次 sumSmall 和三个线程分别执行 sumSmall ,再将结果同步到一个线程的时间对比,我们会发现只在主线程执行的时间反而更短,三个线程创建、切换、同步的时间远远大过了线程异步执行节省的时间。

而函数 sumLarge 从1累加到5000000,下图同一线程执行三次和三个线程执行的耗时:

这次,多线程终于有效率优势了。

是否使用多线程还需要根据具体需求而定,一般考虑以下两种情况:

  • I/O 阻塞会使操作系统发生任务调度,阻塞当前任务,所以代码中 I/O 多的情况下,使用多线程时可以将代码并行。例如多次读整块的文件,或请求多个网络资源。
  • 多线程能充分利用 CPU,所以有多处大计算量代码时,也可以使用多线程使他们并行执行,例如上文中后一个例子。

PHP中的多线程

PHP 默认并不支持多线程,要使用多线程需要安装 pthread 扩展,而要安装 pthread 扩展,必须使用 --enable-maintainer-zts 参数重新编译 PHP,这个参数是指定编译 PHP 时使用线程安全方式。

线程安全

多线程是让程序变得不安分的一个因素,在使用多线程之前,首先要考虑线程安全问题:

线程安全:线程安全是编程中的术语,指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。

在传统多线程中,由于多个线程共享变量,所以可能会导致出现如下问题:

  1. 存在一个全局数组$arr = array('a');;
  2. A 线程获取数组长度为1;
  3. B 线程获取数组长度为1;
  4. A 线程 pop 出数组元素 $a = array_pop($arr); $a = 'a';;
  5. B 线程也 pop 数组元素 $b = array_pop($arr); $a = null;;
  6. 此时 B 线程内就出现了灵异事件,明明数组长度大于0,或没有 pop 出东西;

PHP 实现

PHP 实现的线程安全主要是使用 TSRM 机制对 全局变量和静态变量进行了隔离,将全局变量和静态变量 给每个线程都复制了一份,各线程使用的都是主线程的一个备份,从而避免了变量冲突,也就不会出现线程安全问题。

PHP 对多线程的封装保证了线程安全,程序员不用考虑对全局变量加各种锁来避免读写冲突了,同时也减少了出错的机会,写出的代码更加安全。

但由此导致的是,子线程一旦开始运行,主线程便无法再对子线程运行细节进行调整了,线程一定程度上失去了线程之间通过全局变量进行消息传递的能力。

同时 PHP 开启线程安全选项后,使用 TSRM 机制分配和使用变量时也会有额外的损耗,所以在不需要多线程的 PHP 环境中,使用 PHP 的 ZTS (非线程安全) 版本就好。

类和方法

PHP 将线程 封装成了 Thread 类,线程的创建通过实例化一个线程对象来实现,由于类的封装性,变量的使用只能通过构造函数传入,而线程运算结果也需要通过类变量传出。

下面介绍几个常用的 Thread 类方法:

  • run():此方法是一个抽象方法,每个线程都要实现此方法,线程开始运行后,此方法中的代码会自动执行;
  • start():在主线程内调用此方法以开始运行一个线程;
  • join():各个线程相对于主线程都是异步执行,调用此方法会等待线程执行结束;
  • kill():强制线程结束;
  • isRunning():返回线程的运行状态,线程正在执行run()方法的代码时会返回 true;

因为线程安全的实现,PHP 的多线程开始运行后,无法再通过共享内存空间通信,线程也无法通过线程间通信复用,所以我认为 PHP 的“线程池”并没有什么意义。扩展内自带的Pool 类是一个对多线程分配管理的类,这里也不再多介绍了。


实例代码

下面是一个线程类,用来请求某一接口。接下来根据它写两个多线程的应用实例:

class Request extends Thread {    public $url;    public $response;    public function __construct($url) {        $this->url = $url;    }    public function run() {        $this->response = file_get_contents($this->url);    }}

异步请求

将同步的请求拆分为多个线程异步调用,以提升程序的运行效率。

$chG = new Request("www.google.com");$chB = new Request("www.baidu.com");$chG ->start();$chB ->start();$chG->join();$chB->join(); $gl = $chG->response;$bd = $chB->response;

超时控制

偶然间发现公司网站某一网页上的一块内容时有时无,不知道具体实现,但这给了我使用多线程的灵感:利用线程异步实现快速失败和超时控制。

我们在使用 curl 请求某个地址时,可以通过 CURLOPT_CONNECTTIMEOUT / CURLOPT_TIMEOUT 参数分别设置 curl 的连接超时时间和读取数据超时时间,但总的超时时间不好控制。而且在进行数据库查询时的超时时间无法设置(鸟哥博客:为MySQL设置查询超时)。

这时我们便可以借用多线程来实现此功能:在执行线程类的 start() 方法后,不调用 join() 方法,使线程一直处于异步状态,不阻塞主线程的执行。

此时主线程相当于旗舰,而各子线程相当于巡航舰,旗舰到达某地后不必要一直等待巡航舰也归来,等待一段时间后离开即可,从而避免巡航舰意外时旗舰白白空等。

代码:

$chG = new Request("www.google.com");$chB = new Request("www.baidu.com");$chG->start();$chB->start();$chB->join();// 此处不对chG执行join方法 sleep(1); // sleep一个能接受的超时时间$gl = $chG->response;$bd = $chB->response;$bd->kill();if (!$gl) {    $gl = ""; // 处理异常,或在线程类内给$gl一个默认值}

用php将任何格式视频转为flv的代码

用php将任何格式视频转为flv的代码_php技巧_脚本之家 <?
define(“ROOT_DIR”,dirname(__FILE__));
class EcodeFlv {
var $fromFile; //上传来的文件
var $toFilePath; //保存文件路径
var $toPicPath; //保存图片路径
var $mpeg; //ffmpeg.exe文件的路径
var $mencode; //mencode.exe文件的路径
var $cmdToFile; //转换文件命令
var $cmdToPic; //转换图片命令
var $toFileName; //转换后的文件名
var $mpegComm; //ffmpeg.exe的转换命令
var $mencodeComm; //mencode.exe的命令
var $mpegType;
var $mencodeType;
var $midi; //mdi.exe的路径
var $cmdMidi; //mdi.exe的命令
//初始化类
function EcodeFlv($fromFile,$toFilePath,$toPicPath,$mpeg,$mencode,$midi) {
$this->mpegComm = false;
$this->mencodeComm = false;
$this->fromFile = $fromFile;
$this->toFilePath = $toFilePath;
$this->toPicPath = ROOT_DIR.”/”.$toPicPath;
$this->mpeg = ROOT_DIR.$mpeg;
$this->mencode = ROOT_DIR.$mencode;
$this->midi = ROOT_DIR.$midi;
$this->mpegType=array (
“audio/x-mpeg”=>”.mp3″,
“video/mpeg”=>”.mpeg”,
“video/3gpp”=>”.3gp”,
“video/x-ms-asf”=>”.asf”,
“video/x-msvideo”=>”.avi”
);
$this->mencodeType = array(
“application/vnd.rn-realmedia”=>”.rm”,
“audio/x-pn-realaudio”=>”.rmvb”,
“audio/x-ms-wmv”=>”.wmv”,
);
}

//检查文件类型

function checkType() {
if(function_exists(mime_content_type)){
return false;
}else{
//$contentType = mime_content_type($this->fromFile);
$exe = “D:\server\php\extras\magic”;
$handel = new finfo(FILEINFO_MIME, $exe);
$contentType = $handel->file($this->fromFile);
}
foreach($this->mpegType as $index=>$key){
if($contentType == $index){
$name = md5(date(“Ymd”).tiime());
$this->toFileName = $name;
$this->$mpegComm = true;
return true;
}
}
foreach($this->mencodeType as $index=>$key){
if($contentType == $index){
$name = md5(date(“Ymd”).time());
$this->toFileName = $name;
$this->mencodeComm = true;
return true;
}else{
return false;
}
}
}

//设置文件,图片大小
function setSize($flvSize,$picSize) {
$flvWidth = $flvSize[0];
$flvHeight = $flvSize[1];
$picWidth = $picSize[0];
$picHeight = $picSize[1];
$picName = $this->toPicPath.$this->toFileName.”.jpg”;
$flvName = $this->toFilePath.$this->toFileName.”.flv”;
$toMdi = ROOT_DIR.”/”.$flvName;
$size = $picWidth.”x”.$picHeight;
if($this->mpegComm){
$this->cmdToFile= “$this->mpeg -i $this->fromFile -y -ab 56 -ar 22050 -b 500 -r 15 -s $flvWith*$flvHeight $flvName”;
}
elseif($this->mencodeComm){
$this->cmdToFile = “$this->mencode $this->fromFile -vf scale=$flvWidth:$flvHeight -ffourcc FLV1 -of lavf -ovc lavc -lavcopts vcodec=flv:vbitrate=70:acodec=mp3:abitrate=56:dia=-1 -ofps 25 -srate 22050 -oac mp3lame -o $flvName”;
}
$this->cmdToPic = “$this->mpeg -i $toMdi -y -f image2 -ss 8 -t 0.003 -s $size $picName”;
$this->cmdMidi = “$this->midi $toMdi /k”;
echo $this->cmdToPic;
}

//开始转换
function toEcode() {
set_time_limit(0);
exec($this->cmdToFile,$flvStatus)
exec($this->cmdToPic,$picStatus);
exec($this->cmdMidi,$mStatus);
}

}
?>

PHP生成指定范围内的N个不重复的随机数

PHP生成指定范围内的N个不重复的随机数_php技巧_脚本之家

将生成的随机数存入数组,再在数组中去除重复的值,即可生成一定数量的不重复随机数。

在PHP网站开发中,有时候我们需要生成指定范围内一定数量的不重复随机数,具体怎么设计这个生产随机数的函数呢?我们可以将随机产生的数存入数组,但在存入的同时去除重复的值,即可生成一定数量的不重复随机数。当然也可以把指定范围内的数值存进数组,再使用shuffle($array)打乱这个数组,然后再截取其中一定数量的值。但后面的一种做法在指定的随机数范围太大的时候会产生一个较大的数组。

下面给出第一种做法的代码,第二种做法更简单,大家可以尝试下,其实也差不多?

<?php
/** array unique_rand( int $min, int $max, int $num )* 生成一定数量的不重复随机数,指定的范围内整数的数量必须* 比要生成的随机数数量大* $min 和 $max: 指定随机数的范围* $num: 指定生成数量*/function unique_rand($min, $max, $num) {$count = 0;$return = array();while ($count < $num) {$return[] = mt_rand($min, $max);$return = array_flip(array_flip($return));$count = count($return);}//打乱数组,重新赋予数组新的下标shuffle($return);return $return;} //生成10个1到100范围内的不重复随机数$arr = unique_rand(1, 100, 10);echo implode($arr, ",");?>

<?php /* * array unique_rand( int $min, int $max, int $num ) * 生成一定数量的不重复随机数,指定的范围内整数的数量必须 * 比要生成的随机数数量大 * $min 和 $max: 指定随机数的范围 * $num: 指定生成数量 */ function unique_rand($min, $max, $num) { $count = 0; $return = array(); while ($count < $num) { $return[] = mt_rand($min, $max); $return = array_flip(array_flip($return)); $count = count($return); } //打乱数组,重新赋予数组新的下标 shuffle($return); return $return; } //生成10个1到100范围内的不重复随机数 $arr = unique_rand(1, 100, 10); echo implode($arr, “,”); ?>

程序运行如下:

48,5,19,36,63,72,82,77,46,16

补充几点说明:

1、生成随机数时我们用了 mt_rand() 函数。这个函数生成随机数的平均速度要比 rand() 快几倍。

2、去除数组中的重复值时用了“翻转法”,就是用 array_flip() 把数组的 key 和 value 交换两次。这种做法在去除数组重复值的同时效率也比用 array_unique() 快得多。

3、返回数组前,先使用 shuffle() 为数组赋予新的键名,保证键名是 0-n 连续的数字。如果不进行此步骤,可能在删除重复值时造成键名不连续,如果用for遍历的时候会有问题,但如果用foreach或不需要遍历的时候可以不需要shuffle。

PHP实现类似百度搜索自动完成(代码简单)

一、效果图:

二、HTML代码

jQuery UI 自动完成(Autocomplete) – 默认功能
标签:

三、PHP代码
prepare(“select title from article where title like :title”);
$result->execute(array(‘title’ => “%” . $keyword . “%”));
$data = $result->fetchall(PDO::FETCH_ASSOC);
//将二维数组转化为一维数组(自动补全插件要求的是一个一维数组)
foreach ($data as $k => $v) {
$datas[] = $v[‘title’];
}
return $datas;
}

//获取输入框的内容
//注:jquery-ui的自动补全ajax 当我们输入一个c时,Autocomplete实际发送的请求路径为at.php?term=c
$keyword = $_GET[‘term’];

//根据用户输入值查询相关数据
$data = test($keyword);
//输出json字符串
echo json_encode($data); //输出查询的结果(json格式输出)

?>
 

备注:HTML部分引入的css,js源代码:

文章参考:http://www.365mini.com/page/jquery-ui-autocomplete.htm

更多功能可参考: http://www.runoob.com/jqueryui/example-autocomplete.html

如果js和css源代码地址找不到,到这里下载

总结:以上是结合mysql 和 jquery-ui实现的自动提示,实际上如果数据库数据量较大的情况,整体对数据库开销也比较大。

这样,也可以尝试使用全文检索工具 xunsearch 或 sphinx 来实现。好处是减少了mysql数据库的查询压力,提高了检索速度。

xunSearch的使用:https://blog.csdn.net/m_nanle_xiaobudiu/article/details/81663636
————————————————
版权声明:本文为CSDN博主「m_nanle_xiaobudiu」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/m_nanle_xiaobudiu/article/details/79343394

PHP 高级编程之多线程-消息队列

1. 多线程环境安装

1.1. PHP 5.5.9

安装PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh
./configure --prefix=/srv/php-5.5.9 \--with-config-file-path=/srv/php-5.5.9/etc \--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \--enable-fpm \--with-fpm-user=www \--with-fpm-group=www \--with-pear \--with-curl \--with-gd \--with-jpeg-dir \--with-png-dir \--with-freetype-dir \--with-zlib-dir \--with-iconv \--with-mcrypt \--with-mhash \--with-pdo-mysql \--with-mysql-sock=/var/lib/mysql/mysql.sock \--with-openssl \--with-xsl \--with-recode \--enable-sockets \--enable-soap \--enable-mbstring \--enable-gd-native-ttf \--enable-zip \--enable-xml \--enable-bcmath \--enable-calendar \--enable-shmop \--enable-dba \--enable-wddx \--enable-sysvsem \--enable-sysvshm \--enable-sysvmsg \--enable-opcache \--enable-pcntl \--enable-maintainer-zts \--disable-debug

编译必须启用zts支持否则无法安装 pthreads(–enable-maintainer-zts)

1.2. 安装 pthreads 扩展

安装https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash

查看pthreads是否已经安装

# php -m | grep pthreads

2. Thread

<?phpclass HelloWorld extends Thread {    public function __construct($world) {       $this->world = $world;    }     public function run() {        print_r(sprintf("Hello %s\n", $this->world));    }} $thread = new HelloWorld("World"); if ($thread->start()) {    printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?>

3. Worker 与 Stackable

<?phpclass SQLQuery extends Stackable {         public function __construct($sql) {                $this->sql = $sql;        }         public function run() {                $dbh  = $this->worker->getConnection();                $row = $dbh->query($this->sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        } } class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }         /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }} $worker = new ExampleWorker("My Worker Thread"); $work=new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work); $table1 = new SQLQuery('select * from demousers limit 2');$worker->stack($table1); $worker->start();$worker->shutdown();?>

4. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

<?php$counter = 0;//$handle=fopen("php://memory", "rw");//$handle=fopen("php://temp", "rw");$handle=fopen("/tmp/counter.txt", "w");fwrite($handle, $counter );fclose($handle); class CounterThread extends Thread {public function __construct($mutex = null){ose($this->handle);}    public function run() {if($this->mutex)$locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle));$counter++;rewind($this->handle);fputs($this->handle, $counter );printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex)Mutex::unlock($this->mutex);    }} //没有互斥锁for ($i=0;$i<50;$i++){$threads[$i] = new CounterThread();$threads[$i]->start(); } //加入互斥锁$mutex = Mutex::create(true);for ($i=0;$i<50;$i++){$threads[$i] = new CounterThread($mutex);$threads[$i]->start(); } Mutex::unlock($mutex);for ($i=0;$i<50;$i++){$threads[$i]->join();}Mutex::destroy($mutex); ?>

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

4.1. 多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能。

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a'); $shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread {public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() { $counter = shm_get_var( $this->shmid, 1 );$counter++;shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }} for ($i=0;$i<100;$i++){$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){$threads[$i]->start(); } for ($i=0;$i<100;$i++){$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

5. 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。

$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

<?php$tmp = tempnam(__FILE__, 'PHP');$key = ftok($tmp, 'a'); $shmid = shm_attach($key);$counter = 0;shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread {public function __construct($shmid){        $this->shmid = $shmid;    }    public function run() {         $this->synchronized(function($thread){            $thread->wait();        }, $this); $counter = shm_get_var( $this->shmid, 1 );$counter++;shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);    }} for ($i=0;$i<100;$i++){$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){$threads[$i]->start(); } for ($i=0;$i<100;$i++){$threads[$i]->synchronized(function($thread){$thread->notify();}, $threads[$i]);} for ($i=0;$i<100;$i++){$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

6. 线程池

6.1. 线程池

自行实现一个Pool类

<?phpclass Update extends Thread {     public $running = false;    public $row = array();    public function __construct($row) { $this->row = $row;        $this->sql = null;    }     public function run() { if(strlen($this->row['bankno']) > 100 ){$bankno = safenet_decrypt($this->row['bankno']);}else{$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);file_put_contents("bankno_error.log", $error, FILE_APPEND);} if( strlen($bankno) > 7 ){$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql;} printf("%s\n",$this->sql);    } } class Pool {public $pool = array();public function __construct($count) {$this->count = $count;}public function push($row){if(count($this->pool) < $this->count){$this->pool[] = new Update($row);return true;}else{return false;}}public function start(){foreach ( $this->pool as $id => $worker){$this->pool[$id]->start();}}public function join(){foreach ( $this->pool as $id => $worker){               $this->pool[$id]->join();}}public function clean(){foreach ( $this->pool as $id => $worker){if(! $worker->isRunning()){            unset($this->pool[$id]);            }}}} try {$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',PDO::MYSQL_ATTR_COMPRESS => true)); $sql  = "select id,bankno from members order by id desc";$row = $dbh->query($sql);$pool = new Pool(5);while($member = $row->fetch(PDO::FETCH_ASSOC)){ while(true){if($pool->push($member)){ //压入任务到池中break;}else{ //如果池已经满,就开始启动线程$pool->start();$pool->join();$pool->clean();}}}$pool->start();    $pool->join(); $dbh = null; } catch (Exception $e) {    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>

6.2. 动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

<?phpclass Update extends Thread {     public $running = false;    public $row = array();    public function __construct($row) { $this->row = $row;        $this->sql = null;//print_r($this->row);    }     public function run() { if(strlen($this->row['bankno']) > 100 ){$bankno = safenet_decrypt($this->row['bankno']);}else{$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);file_put_contents("bankno_error.log", $error, FILE_APPEND);} if( strlen($bankno) > 7 ){$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql;} printf("%s\n",$this->sql);    } }   try {$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',PDO::MYSQL_ATTR_COMPRESS => true)); $sql     = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql);$pool = array();while($member = $row->fetch(PDO::FETCH_ASSOC)){$id = $member['id'];while (true){if(count($pool) < 5){$pool[$id] = new Update($member);$pool[$id]->start();break;}else{foreach ( $pool as $name => $worker){if(! $worker->isRunning()){unset($pool[$name]);}}}} } $dbh = null; } catch (Exception $e) {    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>

6.3. pthreads Pool类

pthreads 提供的 Pool class 例子

<?php class WebWorker extends Worker { public function __construct(SafeLog $logger) {$this->logger = $logger;} protected $loger;} class WebWork extends Stackable { public function isComplete() {return $this->complete;} public function run() {$this->worker->logger->log("%s executing in Thread #%lu",  __CLASS__, $this->worker->getThreadId());$this->complete = true;} protected $complete;} class SafeLog extends Stackable { protected function log($message, $args = []) {$args = func_get_args(); if (($message = array_shift($args))) {echo vsprintf("{$message}\n", $args);}}}  $pool = new Pool(8, \WebWorker::class, [new SafeLog()]); $pool->submit($w=new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->shutdown(); $pool->collect(function($work){return $work->isComplete();}); var_dump($pool);

7. 多线程文件安全读写(文件锁)

文件所种类。

LOCK_SH 取得共享锁定(读取的程序)。LOCK_EX 取得独占锁定(写入的程序。LOCK_UN 释放锁定(无论共享或独占)。LOCK_NB 如果不希望 flock() 在锁定时堵塞

共享锁例子

<?php $fp = fopen("/tmp/lock.txt", "r+"); if (flock($fp, LOCK_EX)) {  // 进行排它型锁定    ftruncate($fp, 0);      // truncate file    fwrite($fp, "Write something here\n");    fflush($fp);            // flush output before releasing the lock    flock($fp, LOCK_UN);    // 释放锁定} else {    echo "Couldn't get the lock!";} fclose($fp); ?>

共享锁例子2

<?php$fp = fopen('/tmp/lock.txt', 'r+'); /* Activate the LOCK_NB option on an LOCK_EX operation */if(!flock($fp, LOCK_EX | LOCK_NB)) {    echo 'Unable to obtain lock';    exit(-1);} /* ... */ fclose($fp);?>

8. 多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

<?phpclass Work extends Stackable {         public function __construct() {        }         public function run() {                $dbh  = $this->worker->getConnection();                $sql     = "select id,name from members order by id desc limit 50";                $row = $dbh->query($sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        } } class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }         /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }} $worker = new ExampleWorker("My Worker Thread"); $work=new Work();$worker->stack($work); $worker->start();$worker->shutdown();?>

8.2. Pool 与 PDO

在线程池中链接数据库

# cat pool.php<?phpclass ExampleWorker extends Worker { public function __construct(Logging $logger) {$this->logger = $logger;} protected $logger;} /* the collectable class implements machinery for Pool::collect */class Work extends Stackable {public function __construct($number) {$this->number = $number;}public function run() {                $dbhost = 'db.example.com';               // 数据库服务器                $dbuser = 'example.com';                 // 数据库用户名                $dbpw = 'password';                               // 数据库密码                $dbname = 'example_real';$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true                        )                );$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";#echo $sql;$row = $dbh->query($sql);$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);if($mt4_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";$dbh->query($sql);.example.com';                      // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                               // 数据库密码$dbname = 'db_example';$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );$sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){        $pool->submit(new Work($account));} $pool->shutdown(); ?>

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

<?phpclass ExampleWorker extends Worker { #public function __construct(Logging $logger) {#$this->logger = $logger;#} #protected $logger;protected  static $dbh;public function __construct() { }public function run(){$dbhost = 'db.example.com';// 数据库服务器    $dbuser = 'example.com';        // 数据库用户名        $dbpw = 'password';             // 数据库密码$dbname = 'example';// 数据库名 self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true)); }protected function getInstance(){        return self::$dbh;    } } /* the collectable class implements machinery for Pool::collect */class Work extends Stackable {public function __construct($data) {$this->data = $data;#print_r($data);} public function run() {#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try {$dbh  = $this->worker->getInstance();#print_r($dbh);               $id = $this->data['id'];$mobile = safenet_decrypt($this->data['mobile']);#printf("%d, %s \n", $id, $mobile);if(strlen($mobile) > 11){$mobile = substr($mobile, -11);}if($mobile == 'null'){#$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";#printf("%s\n",$sql);#$dbh->query($sql);$mobile = '';$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";}else{$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";}$sth = $dbh->prepare($sql);$sth->bindValue(':mobile', $mobile);$sth->bindValue(':id', $id);$sth->execute();#echo $sth->debugDumpParams();}catch(PDOException $e) {$error = sprintf("%s,%s\n", $mobile, $id );file_put_contents("mobile_error.log", $error, FILE_APPEND);} #$dbh = null;printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);}} $pool = new Pool(100, \ExampleWorker::class, []); #foreach (range(0, 100) as $number) {#$pool->submit(new Work($number));#} $dbhost = 'db.example.com';                     // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                             // 数据库密码$dbname = 'example';$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );#print_r($dbh); #$sql = "select id, mobile from members where id < :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':id',300);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','272');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();  $sql = "select id, mobile from members order by id asc"; // limit 1000";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){        #$order =  $account['order'];        #printf("%s\n",$order);        //print_r($members);        $pool->submit(new Work($members));#unset($account['order']);} $pool->shutdown(); ?>

8.3. 多线程中操作数据库总结

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

<?php$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(    PDO::ATTR_PERSISTENT => true));?>

9. Thread And ZeroMQ

应用场景,我使用触发器监控数据库某个表,一旦发现有改变就通知程序处理数据

9.1. 数据库端

首先安装ZeroMQ 与 MySQL UDF https://github.com/netkiller/mysql-zmq-plugin, 然后创建触发器。

CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)LANGUAGE SQLNOT DETERMINISTIC READS SQL DATASQL SECURITY DEFINER COMMENT '交易监控'BEGIN DECLARE Example CHAR(1) DEFAULT 'N'; IF CMD IN ('0','1') THEN IF VOLUME >=10 AND VOLUME <=90 THEN select coding into Example from example.members where username = LOGIN and coding = 'Y';IF Example = 'Y' THEN select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));END IF;END IF;END IF;END CREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);END

9.2. 数据处理端

<?phpclass ExampleWorker extends Worker { #public function __construct(Logging $logger) {#$this->logger = $logger;#} #protected $logger;protected  static $dbh;public function __construct() { }public function run(){$dbhost = '192.168.2.1';// 数据库服务器$dbport = 3306;    $dbuser = 'www';        // 数据库用户名        $dbpass = 'password';             // 数据库密码$dbname = 'example';// 数据库名 self::$dbh  = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true)); }protected function getInstance(){        return self::$dbh;    } } /* the collectable class implements machinery for Pool::collect */class Fee extends Stackable {public function __construct($msg) {$trades = explode(",", $msg);$this->data = $trades;print_r($trades);} public function run() {#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try {$dbh  = $this->worker->getInstance(); $insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";$sth = $dbh->prepare($insert);$sth->bindValue(':ticket', $this->data[0]);$sth->bindValue(':login', $this->data[1]);$sth->bindValue(':volume', $this->data[2]);$sth->execute();//$sth = null;//$dbh = null; /* 业务实现在此处 */ $update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";$sth = $dbh->prepare($update);$sth->bindValue(':ticket', $this->data[0]);$sth->execute();//echo $sth->queryString;}catch(PDOException $e) {$error = sprintf("%s,%s\n", $mobile, $id );file_put_contents("mobile_error.log", $error, FILE_APPEND);} #$dbh = null;//printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);}} class Example {/* config */const LISTEN = "tcp://192.168.2.15:5555";const MAXCONN = 100;const pidfile = __CLASS__;const uid= 80;const gid= 80; protected $pool = NULL;protected $zmq = NULL;public function __construct() {$this->pidfile = '/var/run/'.self::pidfile.'.pid';}private function daemon(){if (file_exists($this->pidfile)) {echo "The file $this->pidfile exists.\n";exit();} $pid = pcntl_fork();if ($pid == -1) { die('could not fork');} else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie childrenexit($pid);} else {// we are the childfile_put_contents($this->pidfile, getmypid());posix_setuid(self::uid);posix_setgid(self::gid);return(getmypid());}}private function start(){$pid = $this->daemon();$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);$this->zmq->bind(self::LISTEN); /* Loop receiving and echoing back */while ($message = $this->zmq->recv()) {if($message){$this->pool->submit(new Fee($message));$this->zmq->send('TRUE');}else{$this->zmq->send('FALSE');}}$pool->shutdown();}private function stop(){ if (file_exists($this->pidfile)) {$pid = file_get_contents($this->pidfile);posix_kill($pid, 9);unlink($this->pidfile);}}private function help($proc){printf("%s start | stop | help \n", $proc);}public function main($argv){if(count($argv) < 2){printf("please input help parameter\n");exit();}if($argv[1] === 'stop'){$this->stop();}else if($argv[1] === 'start'){$this->start();}else{$this->help($argv[0]);}}} $example = new Example();$example->main($argv);

使用方法

# php example.php start# php example.php stop# php example.php help

此程序涉及守候进程实现$this->daemon()运行后转到后台运行,进程ID保存,进程的互斥(不允许同时启动两个进程),线程池连接数以及线程任务等等

转载自http://my.oschina.net/neochen/blog/294354

php判断当前是 http还是 https

** * 判断是否SSL协议
 * @return boolean */
function is_ssl() 
{ 
if(isset($_SERVER['HTTPS']) && ('1' == $_SERVER['HTTPS'] || 'on' == strtolower($_SERVER['HTTPS']))){
 return true; 
}elseif(isset($_SERVER['SERVER_PORT']) && ('443' == $_SERVER['SERVER_PORT'] )) { return true; 
} 
return false; 
}

使用

$http = 'http://'; $http =is_ssl()?'https://':$http;

报错:The Apache Tomcat installation at this directory is version 8.5.27. A Tomcat 8.0 installation is

今天在eclipse中配置tomcat时,遇到了一个报错,如下所示:

  这里我的Tomcat的版本是8.5.27,报这个错的原因是ellipse里面限制Tomcat的最高版本是8.0的,我用的tomcat的版本明显高于eclipse的要求;具体的改法如下:

  1.首先找到Tomcat的本地安装路径;

  2.然后找到lib文件夹中的Catalina.jar包,用解压软件打开这个jar包;

  3.依次找到并且双击打开catalina.jar\org\apache\catalina\util\ServerInfo.properties文件,如下所示:

  4.将文件中server.info=Apache Tomcat/8.5.27中的8.5.27改成8.0.0即可;

修改完成后重新配置Tomcat不在报错

  修改完成!

thinkphp5 join使用注意

A表有id,name,time等字段,
B表有id,type,uid,email,address等字段。
A表中的id和B表中的uid对应。

    Db::table(A表)->alias('a')
                ->join('B表 b', 'a.id = b.uid')
                ->find();

这样是把B表中的所有字段都给返回了,B表的字段会覆盖A中的同名字段,
比如最终返回的结果中id是B表中的id

这时要注意指定字段->field(‘a.*,b.email,b.adderss’)