workerman异步任务实现
运行task进程:
use \Workerman\Worker;
use \Workerman\Connection\TcpConnection;
// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 10;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
echo "接收到任务: " . $task_data . PHP_EOL;
// 假设发来的是json数据
$task_data = json_decode($task_data, true);
// 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
// $task_result = ......
sleep(3);
$task_result = array(
'task_id' => $task_data['task_id'],
'result' => 'task result',
);
// 发送结果
$connection->send(json_encode($task_result));
};
Worker::runAll();
调用:
$worker = new Worker();
$worker->onWorkerStart = function($worker) {
for($i = 0; $i < 100; $i++) {
// 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// 任务及参数数据
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// 发送数据
$task_connection->send(json_encode($task_data));
$task_connection->onMessage = function(AsyncTcpConnection $con, $data) {
echo "接收到的消息: " . $data . PHP_EOL;
$con->close();
};
$task_connection->onError = function(AsyncTcpConnection $con, $code, $msg) {
echo "报错消息: {$code} " . $msg . PHP_EOL;
};
$task_connection->onClose = function(AsyncTcpConnection $con) {
echo "连接关闭: " . $con->id . PHP_EOL;
};
$task_connection->connect();
}
};
Worker::runAll();