#查看mysql的root账号的密码 grep 'temporary password' /var/log/mysqld.log #登录mysql mysql -uroot -p #修改密码 ALTER USER 'root'@'localhost' IDENTIFIED BY 'password'; #修改root用户可远程登录 GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'password' WITH GRANT OPTION; #刷新 flush privileges;
#查看mysql的root账号的密码 grep 'temporary password' /var/log/mysqld.log #登录mysql mysql -uroot -p #修改密码 ALTER USER 'root'@'localhost' IDENTIFIED BY 'password'; #修改root用户可远程登录 GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'password' WITH GRANT OPTION; #刷新 flush privileges;
<?phpclass HelloWorld extends Thread { public function __construct($world) { $this->world = $world; } public function run() { print_r(sprintf("Hello %s\n", $this->world)); }} $thread = new HelloWorld("World"); if ($thread->start()) { printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?>
3. Worker 与 Stackable
<?phpclass SQLQuery extends Stackable { public function __construct($sql) { $this->sql = $sql; } public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; }} $worker = new ExampleWorker("My Worker Thread"); $work=new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work); $table1 = new SQLQuery('select * from demousers limit 2');$worker->stack($table1); $worker->start();$worker->shutdown();?>
4. 互斥锁
什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。
下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。
<?php$counter = 0;//$handle=fopen("php://memory", "rw");//$handle=fopen("php://temp", "rw");$handle=fopen("/tmp/counter.txt", "w");fwrite($handle, $counter );fclose($handle); class CounterThread extends Thread {public function __construct($mutex = null){ose($this->handle);} public function run() {if($this->mutex)$locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle));$counter++;rewind($this->handle);fputs($this->handle, $counter );printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex)Mutex::unlock($this->mutex); }} //没有互斥锁for ($i=0;$i<50;$i++){$threads[$i] = new CounterThread();$threads[$i]->start(); } //加入互斥锁$mutex = Mutex::create(true);for ($i=0;$i<50;$i++){$threads[$i] = new CounterThread($mutex);$threads[$i]->start(); } Mutex::unlock($mutex);for ($i=0;$i<50;$i++){$threads[$i]->join();}Mutex::destroy($mutex); ?>
<?phpclass Work extends Stackable { public function __construct() { } public function run() { $dbh = $this->worker->getConnection(); $sql = "select id,name from members order by id desc limit 50"; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; }} $worker = new ExampleWorker("My Worker Thread"); $work=new Work();$worker->stack($work); $worker->start();$worker->shutdown();?>
8.2. Pool 与 PDO
在线程池中链接数据库
# cat pool.php<?phpclass ExampleWorker extends Worker { public function __construct(Logging $logger) {$this->logger = $logger;} protected $logger;} /* the collectable class implements machinery for Pool::collect */class Work extends Stackable {public function __construct($number) {$this->number = $number;}public function run() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real';$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true ) );$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";#echo $sql;$row = $dbh->query($sql);$mt4_trades = $row->fetch(PDO::FETCH_ASSOC);if($mt4_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";$dbh->query($sql);.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'db_example';$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) );$sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){ $pool->submit(new Work($account));} $pool->shutdown(); ?>
CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)LANGUAGE SQLNOT DETERMINISTIC READS SQL DATASQL SECURITY DEFINER COMMENT '交易监控'BEGIN DECLARE Example CHAR(1) DEFAULT 'N'; IF CMD IN ('0','1') THEN IF VOLUME >=10 AND VOLUME <=90 THEN select coding into Example from example.members where username = LOGIN and coding = 'Y';IF Example = 'Y' THEN select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));END IF;END IF;END IF;END CREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);END
9.2. 数据处理端
<?phpclass ExampleWorker extends Worker { #public function __construct(Logging $logger) {#$this->logger = $logger;#} #protected $logger;protected static $dbh;public function __construct() { }public function run(){$dbhost = '192.168.2.1';// 数据库服务器$dbport = 3306; $dbuser = 'www'; // 数据库用户名 $dbpass = 'password'; // 数据库密码$dbname = 'example';// 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true)); }protected function getInstance(){ return self::$dbh; } } /* the collectable class implements machinery for Pool::collect */class Fee extends Stackable {public function __construct($msg) {$trades = explode(",", $msg);$this->data = $trades;print_r($trades);} public function run() {#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try {$dbh = $this->worker->getInstance(); $insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";$sth = $dbh->prepare($insert);$sth->bindValue(':ticket', $this->data[0]);$sth->bindValue(':login', $this->data[1]);$sth->bindValue(':volume', $this->data[2]);$sth->execute();//$sth = null;//$dbh = null; /* 业务实现在此处 */ $update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";$sth = $dbh->prepare($update);$sth->bindValue(':ticket', $this->data[0]);$sth->execute();//echo $sth->queryString;}catch(PDOException $e) {$error = sprintf("%s,%s\n", $mobile, $id );file_put_contents("mobile_error.log", $error, FILE_APPEND);} #$dbh = null;//printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);}} class Example {/* config */const LISTEN = "tcp://192.168.2.15:5555";const MAXCONN = 100;const pidfile = __CLASS__;const uid= 80;const gid= 80; protected $pool = NULL;protected $zmq = NULL;public function __construct() {$this->pidfile = '/var/run/'.self::pidfile.'.pid';}private function daemon(){if (file_exists($this->pidfile)) {echo "The file $this->pidfile exists.\n";exit();} $pid = pcntl_fork();if ($pid == -1) { die('could not fork');} else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie childrenexit($pid);} else {// we are the childfile_put_contents($this->pidfile, getmypid());posix_setuid(self::uid);posix_setgid(self::gid);return(getmypid());}}private function start(){$pid = $this->daemon();$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);$this->zmq->bind(self::LISTEN); /* Loop receiving and echoing back */while ($message = $this->zmq->recv()) {if($message){$this->pool->submit(new Fee($message));$this->zmq->send('TRUE');}else{$this->zmq->send('FALSE');}}$pool->shutdown();}private function stop(){ if (file_exists($this->pidfile)) {$pid = file_get_contents($this->pidfile);posix_kill($pid, 9);unlink($this->pidfile);}}private function help($proc){printf("%s start | stop | help \n", $proc);}public function main($argv){if(count($argv) < 2){printf("please input help parameter\n");exit();}if($argv[1] === 'stop'){$this->stop();}else if($argv[1] === 'start'){$this->start();}else{$this->help($argv[0]);}}} $example = new Example();$example->main($argv);
使用方法
# php example.php start# php example.php stop# php example.php help