消息队列

Tastphp 默认用的消息队列是beanstalkd

关于beanstalkd

job典型的生命周期

   put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

job可能的状态迁移

   put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                 kick   | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*

几个重要概念

Tubes

一个服务器有一个或者多个tubes,用来储存统一类型的job。每个tube由一个就绪队列与延迟队列组成。每个job所有的状态迁移在一个tube中完成。consumers消费者可以监控感兴趣的tube,通过发送watch指令。consumers消费者可以取消监控tube,通过发送ignore命令。通过watch list命令返回所有监控的tubes,当客户端预订一个job,此job可能来自任何一个它监控的tube。

指令说明

  • put:插入一个job到队列
  • watch:说明 添加监控的tube到watch list列表,reserve指令将会从监控的tube列表获取job
  • delete:说明 从队列中删除一个job
  • reserve:说明 取出(预订)job,待处理。它将返回一个新预订的job,如果没有job,beanstalkd将直到有job时才发送响应。一旦job状态迁移为reserved,取出job的client被限制在指定的时间(如果设置了ttr)完成,否则超时,job状态重装迁移为ready
  • use:说明 producer生产者使用,随后使用put命令
  • release:说明 release指令将一个reserved的job放回ready queue
  • bury:说明 将一个job的状态迁移为buried,通过kick命令唤醒
  • kick:说明 此指令应用在当前使用的tube中,它将job的状态迁移为ready或者delayed

更多信息参见 Beanstalkd中文协议

使用例子

前提安装完beanstalkd,已经启动beanstalkd

1、注册

AppKernel 注册QueueService:

$this->registerQueueService();

config/parameters.yml 添加相应配置

beanstalkd.host: 127.0.0.1
tube: tastphpframework

2、使用

HomeController为例,在indexAction里面添加:

  • 消息生产者:
$this->get('queue')->put($this->get('tube'), json_encode(['queue msg'=>"hi,tastphp framework"]));
  • 消息消费者

bin/queue/worker.php里面创建文件

#!/usr/bin/env php
<?php
require __DIR__ . '/../../web/bootstrap.php';

$app = new \TastPHP\App\AppKernel();
$client = $app['queue']->connect();
$client->watch($app['tube']);

while ($job = $client->reserve()) {
    $received = json_decode($job->getData(), true);

    if (!empty($received)) {
        $app['logger']::info("queue job done!", $received);
        $client->delete($job);
    }
}

执行(这边只是测试用,线上可以用类似supervisor之类的进程管理工具):

➜  tastphp-docs-demo git:(master) ✗ php bin/queue/worker.php

结果

查看var/logs/info.log文件,表示成功执行:

[2017-10-24 16:21:44] tastphp.logger.INFO: queue job done! {"queue msg":"hi,tastphp framework"} []

results matching ""

    No results matching ""