RabbitMQ + PHP 教程一:简单队列 Hello World

摘要:一个生产者,一个消费者,生产者发送消息,然后消费者获取队列中的消息并处理。

系列文档

RabbitMQ + PHP 教程一:简单队列 Hello World(本文)

RabbitMQ + PHP 教程二:工作队列(任务队列)

RabbitMQ + PHP 教程三:发布 / 订阅

RabbitMQ + PHP 教程四:路由模式

RabbitMQ + PHP 教程五:主题模式


RabbitMQ(一)简单队列 Hello World

本教程中我们主要使用两个 php 文件,一个生产者 send.php 和消费者 receive.php 文件,发送单个消息的生产者和接收消息并打印出来的消费者。主要用于演示 php-amqplib 依赖包的一些 api 细节。在下图其中 p 是生产者,c 是消费者,中间的红框是一个消息队列,代表消息使用者保留的消息缓冲区。


RabbitMQ(一)简单队列 Hello World


开发前提:安装 php-amqplib 依赖包

php 操作 rabbitmq 需要安装一个依赖包,我们直接使用 composer 安装 php-amqplib 依赖包。安装教程在 rabbitmq 开始篇和 php 配置,安装好的目录如下所示:

[root@localhost rabbitmq]pwd
/srv/www/rabbitmq/
[root@localhost rabbitmq]# ll
-rw-r--r-- 1 root root   70 1月   4 22:02 composer.json
-rw-r--r-- 1 root root 6711 1月   4 22:02 composer.lock
drwxr-xr-x 5 root root   74 1月   4 22:02 vendor


示例完整代码如下

<?php
/**
 * 生产者 send.php
 */

// 引入类库
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建 client 到 server 连接,该连接使用套接字连接,并使用身份验证
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建一个通道,方便调用 api
$channel = $connection->channel();

// 声明要发送到的队列 hello
$channel->queue_declare('hello', false, false, false, false);

// 指定要发送的消息内容 hello world
$msg = new AMQPMessage('hello world');
// 将消息内容发动到指定的 hello 队列,如果这里找不到对应的队列,那么消息是无法到达消费者那里的,此时我们使用的是默认交换类型
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

// 关闭通道和连接
$channel->close();
$connection->close();
<?php
/**
 * 消费者 receive.php
 */

// 引入类库
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

// 创建 client 到 server 连接,该连接使用套接字连接,并使用身份验证
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建一个通道,方便调用 api
$channel = $connection->channel();

// 声明要消费的队列 hello,此处要和 send.php 消费者指定的队列一样
$channel->queue_declare('hello', false, false, false, false);

// 控制台提示
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

/**
 * 只有生产才会有消费。但是有可能我们会在生产者之前去启动消费者,所以我们要确保这个消费队列存在,才会尝试从中消费消息
 * 我们将告诉 rabbitmq 服务器,如果队列存在且有消息,要将队列中的消息传递给我们
 * 我们将定一个 callback 回调函数,它将接受服务器发送的消息
 * 请记住,消息是从服务器异步发送给客户端的
 * @param $msg
 */
$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 当 $channel 有回调时,我们的代码会被阻塞,每当我们收到一条消息,我们的 $callback 函数都会传递给我们生产者发送的消息
while ($channel->is_consuming()) {
    $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();

我们在 shell 命令终端打开会话 shell 1(消费者)和 shell 2(生产者) 进行测试。

# 消费者 shell 1

[root@localhost rabbitmq]# php receive.php 
 [*] Waiting for messages. To exit press CTRL+C
# 生产者 shell 2

[root@localhost rabbitmq]# php send.php 
 [x] Sent 'Hello World!'

当我们在控制台输入 php send.php 命令后,就会发现 shell 1(消费者)控制台会打印出一条消费消息。

# 消费者 shell 1
[root@localhost rabbitmq]# php receive.php 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received hello world


查看消息队列

当我们想查看 rabbitmq 中有哪些队列,以及队列中有多少条消息时,可以使用 rabbitmqctl list_queues 命令查看。

[root@localhost rabbitmq]# /usr/local/rabbitmq/sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
hello   0


文中涉及 demo 的源码地址

send.php    https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/php/send.php
receive.php https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/php/receive.php


参考资料:

https://www.rabbitmq.com/tutorials/tutorial-one-php.html

结束语:感谢您对本网站文章的浏览,欢迎您的分享和转载,但转载请说明文章出处。
Top