后端开发|php教程
php
后端开发-php教程
本篇文章给大家带来的内容是关于php如何使用命令行实现异步多进程模式的任务处理(代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
汇丰源码,ubuntu 搜狗不跟随,宝宝爬虫图片大全,php sheets,seo公司网lzw
用PHP来实现异步任务一直是个难题,现有的解决方案中:PHP知名的异步框架有 swoole 和 Workerman,但都是无法在 web 环境中直接使用的,即便强行搭建 web 环境,异步调用也是使用多进程模式实现的。但有时真的不需要用启动服务的方式,让服务端一直等待客户端消息,何况中间还不能改动服务端代码。本文就介绍一下不使用任何框架和第三方库的情况下,在 CLI 环境中如何实现多进程以及在web环境中的异步调用。
java框架源码,vscode创建网页文件,ubuntu 4屏,tomcat v7.0,发热贴爬虫,php app接口加密,站长工具seo综合查询91,php网站qq互联lzw
在 web 环境的异步调用
装修报价系统源码,vscode 去掉换行,ubuntu 捕捉屏幕,tomcat8 密码,sqlite低版本号,下列关于网络爬虫描述中正确的有,iis php5配置,镇江seo推广公司价格,网站群系统,用户登陆注册页面模板lzw
常用的方式有两种
1. 使用 socket 连接
这种方式就是典型的C/S架构,需要有服务端支持。
// 1. 创建socket套接字$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);// 2. 进行socket连接socket_connect($socket, 127.0.0.1, 3939);//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑// 3. 向服务端发送请求socket_write($socket, $request, strlen($request));// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)$recv = socket_read($socket, 2048);// 5. 关闭socket连接socket_close($socket);
2. 使用 popen 打开进程管道
这种方式是使用操作系统命令,由操作系统直接执行。
本文讨论的异步调用就是使用这种方式。
$sf = /path/to/cli_async_task.php; //要执行的脚本文件$op = call; //脚本文件接收的参数1$data = base64_encode(serialize([TestTask, arg1, arg2])); //脚本文件接收的参数2pclose(popen("php $sf --op $op --data $data &", )); //打开之后接着就关闭进程管道,让该进程以守护模式运行echo PHP_EOL.异步任务已执行。.PHP_EOL;
这种方式的优点就是:一步解决,当前进程不需要任何开销。
缺点也很明显:无法跟踪任务脚本的运行状态。
所以重头戏会是在执行任务的脚本文件上,下面就介绍任务处理和多进程的实现方式。
在CLI
环境的多进程任务处理
注意:多进程模式仅支持Linux,不支持Windows!!
这里会从0开始(未使用任何框架和类库)介绍每一个步骤,最后会附带一份完整的代码。
1. 创建脚本
任何脚本不可忽视的地方就是错误处理。所以写一个任务处理脚本首先就是写错误处理方式。
在PHP中就是调用 set_exception_handler set_error_handler register_shutdown_function 这三个函数,然后写上自定义的处理方法。
接着是定义自动加载函数 spl_autoload_register 免去每使用一个新类都要 require / include 的烦恼。
定义日志操作方法。
定义任务处理方法。
读取来自命令行的参数,开始执行任务。
2. 多进程处理
PHP 创建多进程是使用 pcntl_fork 函数,该函数会 fork 一份当前进程(影分身术),于是就有了两个进程,当前进程是主进程(本体),fork 出的进程是子进程(影分身)。需要注意的是两个进程代码环境是一样的,两个进程都是执行到了 pcntl_fork 函数位置。区别就是 getmypid 获得的进程号不一样,最重要的区分是当调用 pcntl_fork函数时,子进程获得的返回值是 0,而主进程获得的是子进程的进程号 pid。
好了,当我们知道谁是子进程后,就可以让该子进程执行任务了。
那么主进程是如何得知子进程的状态呢?
使用 pcntl_wait。该函数有两个参数 $status 和 $options ,$status 是引用类型,用来存储子进程的状态,$options 有两个可选常量WNOHANG| WUNTRACED,分别表示不等待子进程结束立即返回和等待子进程结束。很明显使用WUNTRACED会阻塞主进程。(也可以使用 pcntl_waitpid 函数获取特定 pid 子进程状态)
在多进程中,主进程要做的就是管理每个子进程的状态,否则子进程很可能无法退出而变成僵尸进程。
关于多进程间的消息通信
这一块需要涉及具体的业务逻辑,所以只能简单的提一下。不考虑使用第三方比如redis
等服务的情况下,PHP原生可以实现就是管道通信和共享内存等方式。实现起来都比较简单,缺点就是可使用的数据容量有限,只能用简单文本协议交换数据。
如何手动结束所有进程任务
如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk ‘{print $2}’|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。
以下是完整的任务执行脚本代码:
可能无法直接使用,需要修改的地方有:
脚本目录和日志目录常量
自动加载任务类的方法(默认是加载脚本目录中以Task
结尾的文件)
其他的如:错误和日志处理方式和文本格式就随意吧…
如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。
多进程的例子:execAsyncTask(‘multi’, [ ‘test’ => [‘a’, ‘b’, ‘c’], ‘grab’ => [[‘url’ => ‘’, ‘callback’ => ‘http://localhost’]] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。
<?phperror_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);@ini_set(display_errors, 0);@ini_set(date.timezone, PRC);chdir(__DIR__);/* 任务脚本目录 */defined(TASK_PATH) or define(TASK_PATH, realpath(__DIR__ ./tasks));/* 任务日志目录 */defined(TASK_LOGS_PATH) or define(TASK_LOGS_PATH, __DIR__ ./tasks/logs);if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);set_exception_handler(function($e) { $time = date(H:i:s, time()); $msg = sprintf(\.
[%s] %s (%s)
. "\n".%s
,
$time, $e->getMessage(), $e->getCode(), $e->getTraceAsString()
);
file_put_contents(TASK_LOGS_PATH ./exception-.date(Ymd)..log, $msg.PHP_EOL, FILE_APPEND|LOCK_EX);
});
set_error_handler(function($errno, $errmsg, $filename, $line) {
if (!(error_reporting() & $errno)) return;
ob_start();
debug_print_backtrace();
$backtrace = ob_get_contents(); ob_end_clean();
$datetime = date(Y-m-d H:i:s, time());
$msg = << $header) {
if (!is_numeric($_k))
$header = sprintf(\%s: %s, $_k, $header);
$_headers .= $header . "\r\n";
}
}
$headers = "Connection: close\r\n" . $_headers;
$opts = array(
http => array(
method => strtoupper(@$job[method] ?: get),
content => @$job[data] ?: null,
header => $headers,
user_agent => @$job[args][user_agent] ?: HTTPGRAB/1.0 (compatible),
proxy => @$job[args][proxy] ?: null,
imeout => intval(@$job[args][ imeout] ?: 120),
protocol_version => @$job[args][protocol_version] ?: 1.1,
max_redirects => 3,
ignore_errors => true
)
);
$ret = @file_get_contents($url, false, stream_context_create($opts));
//debug_log($url. -->.strlen($ret));
if ($ret and isset($job[callback])) {
$postdata = http_build_query(array(
msg_id => @$job[msg_id] ?: 0,
url => @$job[url],
esult => $ret
));
$opts = array(
http => array(
method => POST,
header => Content-type:application/x-www-form-urlencoded. "\r\n",
content => $postdata,
imeout => 30
)
);
file_get_contents($job[callback], false, stream_context_create($opts));
//debug_log(json_encode(@$http_response_header));
//debug_log($job[callback]. -->.$ret2);
}
return $ret;
}
function clean($tmpdirs, $expires=3600*24*7) {
$ret = [];
foreach ((array)$tmpdirs as $tmpdir) {
$ret[$tmpdir] = 0;
foreach (glob($tmpdir.DIRECTORY_SEPARATOR.*) as $_file) {
if (fileatime($_file) open($file, \ZipArchive::CREATE)) {
return false;
}
_backup_dir($zip, $dest);
$zip->close();
return $file;
}
function _backup_dir($zip, $dest, $sub=\) {
$dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
$sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
$dir = opendir($dest);
if (!$dir) return false;
while (false !== ($file = readdir($dir))) {
if (is_file($dest . $file)) {
$zip->addFile($dest . $file, $sub . $file);
} else {
if ($file != . and $file != .. and is_dir($dest . $file)) {
//$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR);
_backup_dir($zip, $dest . $file, $file);
}
}
}
closedir($dir);
return true;
}
function execute_task($op, $data) {
debug_log(Start...);
$t1 = microtime(true);
switch($op) {
case call: //执行任务脚本类
$cmd = $data;
if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd;
elseif (is_array($cmd)) {
if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0];
}
$ret = call($cmd);
break;
case grab: //抓取网页
if (is_string($data)) $data = [url => $data];
if (is_array($data)) $ret = grab($data);
else throw new \Exception(无效的命令参数!);
break;
case clean: //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天)
if (isset($data[dirs])) {
$ret = clean($data[dirs], @$data[expires]);
} else {
$ret = clean($data);
}
break;
case ackup: //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹
if (isset($data[zip]) and is_dir($data[dest]))
$ret = backup($data[zip], $data[dest]);
else
throw new \Exception(没有指定需要备份的文件!);
break;
case equire: //加载脚本文件
if (is_file($data)) $ret = require($data);
else throw new \Exception(不是可请求的文件!);
break;
case est:
sleep(rand(1, 5));
$ret = ucfirst(strval($data)). .PID:. getmypid();
break;
case multi: //多进程处理模式
$results = $childs = [];
$fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . pipe.. posix_getpid();
if (!file_exists($fifo)) {
if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道
throw new Exception(make pipe failed!);
}
}
//$shmid = shmop_open(ftok(__FILE__, h), c, 0644, 4096); //共享内存
//shmop_write($shmid, serialize([]), 0);
//$data = unserialize(shmop_read($shmid, 0, 4096));
//shmop_delete($shmid);
//shmop_close($shmid);
foreach($data as $_op => $_datas) {
$_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据
foreach($_datas as $_data) {
$pid = pcntl_fork();
if ($pid == 0) { //子进程中执行任务
$_ret = execute_task($_op, $_data);
$_pid = getmypid();
$pipe = fopen($fifo, w); //写
//stream_set_blocking($pipe, false);
$_ret = serialize([pid => $_pid, op => $_op, args => $_data, esult => $_ret]);
if (strlen($_ret) > 4096) //写入管道的数据最大4K
$_ret = serialize([pid => $_pid, op => $_op, args => $_data, esult => [RESPONSE_TOO_LONG]]);
//debug_log(write pipe: .$_ret);
fwrite($pipe, $_ret.PHP_EOL);
fflush($pipe);
fclose($pipe);
exit(0); //退出子进程
} elseif ($pid > 0) { //主进程中记录任务
$childs[] = $pid;
$results[$pid] = 0;
debug_log(fork by child: .$pid);
//pcntl_wait($status, WNOHANG);
} elseif ($pid == -1) {
throw new Exception(could not fork at . getmygid());
}
}
}
$pipe = fopen($fifo, +); //读
stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。
$n = 0;
while(count($childs) > 0) {
foreach($childs as $i => $pid) {
$res = pcntl_waitpid($pid, $status, WNOHANG);
if (-1 == $res || $res > 0) {
$_ret = @unserialize(fgets($pipe)); //读取管道数据
$results[$pid] = $_ret;
unset($childs[$i]);
debug_log( ead child: .$pid . - . json_encode($_ret, 64|256));
}
if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程
}
usleep(200000); $n++;
}
debug_log(child process completed.);
@fclose($pipe);
@unlink($fifo);
$ret = json_encode($results, 64|256);
break;
default:
throw new \Exception(没有可执行的任务!);
break;
}
$t2 = microtime(true);
$times = round(($t2 - $t1) * 1000, 2);
$log = sprintf([%s] %s --> (%s) %sms, strtoupper($op),
@json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times); debug_log($log); return $ret;}// 读取 CLI 命令行参数$params = getopt(\, array(op:, data:));$op = $params[op];$data = unserialize(base64_decode($params[data]));// 开始执行任务execute_task($op, $data);function __autoload($classname) { $parts = explode(\\\, ltrim($classname, \\\)); if (false !== strpos(end($parts), \_)) { array_splice($parts, -1, 1, explode(\_, current($parts))); } $filename = implode(DIRECTORY_SEPARATOR, $parts) . .php; if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match(/.*Task$/, $classname)) { //查找以Task结尾的任务脚本类 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . .php; } else { return false; }}