Quay lại
Publish/Subscribe trong RabbitMQ

Trong phần hướng dẫn trước, chúng ta đã tạo một work queue. Giả định đằng sau một work queue là mỗi task được gửi đến chính xác một worker. Trong phần này, chúng ta sẽ thực hiện một công việc hoàn toàn khác -- chúng ta sẽ gửi một tin nhắn đến nhiều consumer. Mô hình này được biết đến với tên gọi là "publish/subscribe" (xuất bản/đăng ký).

Để minh họa mô hình, chúng ta sẽ xây dựng một hệ thống ghi log đơn giản. Nó sẽ bao gồm hai chương trình -- chương trình đầu tiên sẽ emit (phát ra) các message ghi log và chương trình thứ hai sẽ nhận và in chúng.

Trong hệ thống ghi nhật ký của chúng ta, mỗi bản copy running receiver program sẽ nhận các tin nhắn. Như vậy, chúng ta có thể chạy một receiver và chuyển hướng các log vào disk; và đồng thời, chúng ta cũng có thể chạy một receiver khác và xem logs trên screen.

Về cơ bản, các message ghi logs đã publish sẽ được broadcast (phát) đến tất cả các receivers.

Exchanges

Trong các phần trước của bài hướng dẫn, chúng ta đã gửi và nhận các messages tới và từ một queue. Bây giờ là lúc giới thiệu mô hình messaging(gửi, nhận) đầy đủ trong RabbitMQ.

Hãy đi qua nhanh những điều chúng ta đã thảo luận trong các bài hướng dẫn trước:

  • Một producer là một ứng dụng người dùng gửi các message.
  • Một hàng đợi (queue) (queue) là một buffer (bộ đệm) lưu trữ các message.
  • Một consumer là một ứng dụng người dùng nhận các message.

Ý tưởng cốt lõi trong mô hình messaging (gửi, nhận) trong RabbitMQ là producer không bao giờ gửi bất kỳ messages nào trực tiếp đến một hàng đợi (queue). Thực tế, nhiều khi producer thậm chí còn không biết liệu một message có được gửi đến bất kỳ hàng đợi (queue) nào hay không.

Thay vào đó, producer chỉ có thể gửi messages tới exchange. Exchange là một thứ rất đơn giản. Một bên nó nhận các message từ producer và bên kia đẩy chúng vào các hàng đợi (queue). Exchange nó sẽ biết chính xác phải làm gì với một message nó nhận được. Liệu nó message này nên được thêm vào một hàng đợi (queue) cụ thể? Hay message này nên được thêm vào nhiều hàng đợi (queue)? Hay message này có nên bị loại bỏ (trong trường hợp message không còn cần thiết hoặc không thể xử lý được). Các quy tắc cho việc đó được xác định bởi Exchange type.

Có một số loại exchange khả dụng: direct, topic, headersfanout. Chúng ta sẽ cùng nhau đi từng loại trong bài viết này, để các bạn có một cách hiểu về từng loại mình sẽ liệt kê dưới đây và sau đó chúng ta sẽ đi vào ví dụ cụ thể.

  1. Fanout Exchange:

    • Fanout exchange là loại exchange đơn giản nhất trong RabbitMQ.
    • Khi nhận được một message, exchange fanout sẽ phát tán (broadcast) message đó tới tất cả các hàng đợi (queue) mà nó biết.
    • Không quan tâm đến routing key hoặc binding key.
    • Mọi hàng đợi (queue) được kết nối với exchange fanout sẽ nhận được mỗi message mà exchange nhận được.
  2. Direct Exchange:

    • Direct exchange là loại exchange đơn giản nhưng mạnh mẽ trong RabbitMQ.
    • Khi gửi một message đến direct exchange, exchange sẽ chuyển message đến hàng đợi (queue) nào có binding key giống hoàn toàn với routing key của message.
    • Binding key trong direct exchange là định nghĩa rõ ràng với một giá trị cụ thể, không có kết hợp phức tạp như trong topic exchange.
    • Mỗi message chỉ được gửi đến một hàng đợi (queue) duy nhất, dựa trên routing key của nó.
  3. Topic Exchange:

    • Topic exchange cho phép phân phối message dựa trên một hoặc nhiều từ khóa (routing key).
    • Khi gửi một message đến một topic exchange, message sẽ được gửi đến các hàng đợi (queue) mà có binding key khớp với routing key của message.
    • Binding key có thể là một chuỗi với các từ khóa được phân tách bằng dấu chấm (ví dụ: "weather.usa.california").
    • Các hàng đợi (queue) có binding key phù hợp sẽ nhận được message.
  4. Headers Exchange:

    • Headers exchange cho phép phân phối message dựa trên các thuộc tính của message thay vì routing key.
    • Khi gửi một message đến headers exchange, message sẽ được gửi đến các hàng đợi (queue) mà có các thuộc tính khớp với các thuộc tính được chỉ định.
    • Thuộc tính của message được xác định trong header và có thể là bất kỳ giá trị nào.
    • Headers exchange yêu cầu binding có một bộ các đối số header để xác định các điều kiện phù hợp.

Chúng ta sẽ tập trung vào loại đầu trước - fanout. Hãy tạo một Exchange loại này, và gọi nó là logs:

Exchange fanout rất đơn giản. Như bạn có thể đoán được từ tên, nó chỉ phát tất cả các message mà nó nhận được tới tất cả các hàng đợi (queue) mà nó biết. Và đó chính xác là điều chúng ta cần cho logger của chúng ta.

Default exchange

Trong các phần trước của hướng dẫn, chúng ta không biết gì về các exchange, nhưng vẫn có thể gửi message đến các hàng đợi (queue). Điều đó có thể là do chúng ta đang sử dụng một exchange mặc định, mà chúng ta nhận diện bằng chuỗi rỗng ("").

Nhớ lại cách chúng ta đã publish một message trước đó:

$channel->basic_publish($msg, '', 'hello');

Ở đây, chúng ta sử dụng exchange mặc định hoặc không có tên: Các message được định tuyến đến queue có tên được chỉ định bởi routing_key, nếu nó tồn tại. Routing key là đối số thứ ba của basic_publish.

Note: Trong RabbitMQ, khi bạn không chỉ định tên exchange cụ thể trong code của mình, message vẫn được gửi đi qua exchange mặc định. Sau đó, exchange mặc định sẽ định tuyến message đến hàng đợi (queue) dựa trên routing_key được chỉ định. Do đó, nếu bạn không chỉ định exchange, RabbitMQ sẽ dựa vào routing_key để định tuyến message đến hàng đợi (queue) tương ứng. Điều này giúp RabbitMQ quản lý và phân phối message một cách linh hoạt và hiệu quả giữa các ứng dụng khác nhau.

Bây giờ, chúng ta có thể publish đến exchange đã đặt tên của chúng ta thay vì exchange mặc định:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

Trước đó chúng ta đã sử dụng các hàng đợi (queue) có tên cụ thể (nhớ lại hellotask_queue?). Việc có thể đặt tên cho một hàng đợi (queue) là rất quan trọng đối với chúng ta - chúng ta cần chỉ định các worker đến cùng một hàng đợi (queue). Việc đặt tên cho một hàng đợi (queue) là quan trọng khi bạn muốn chia sẻ hàng đợi (queue) giữa các producer và consumer.

Nhưng điều này không phải là trường hợp cho logger của chúng ta. Chúng ta muốn nghe về tất cả các message log, không chỉ một phần của chúng. Chúng ta cũng chỉ quan tâm đến những tin nhắn hiện đang được gửi chứ không phải những tin nhắn cũ. Để giải quyết vấn đề đó, chúng ta cần hai điều.

Thứ nhất, mỗi khi chúng ta kết nối với Rabbit, chúng ta cần một hàng đợi (queue) trống mới. Để làm điều này, chúng ta có thể tạo một hàng đợi (queue) với tên ngẫu nhiên, hoặc, tốt hơn nữa - để máy chủ chọn tên hàng đợi (queue) ngẫu nhiên cho chúng ta.

Thứ hai, sau khi chúng ta ngắt kết nối consumer, hàng đợi (queue) sẽ tự động bị xóa.

Trong client php-amqplib, khi chúng ta cung cấp tên hàng đợi (queue) là một chuỗi rỗng, chúng ta tạo một non-durable queue với một tên được tạo ra:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

Khi phương thức trả về, biến $queue_name chứa tên hàng đợi (queue) ngẫu nhiên được tạo ra bởi RabbitMQ. Ví dụ, nó có thể trông như:

amq.gen-JzTY20BRgKO-HjmUJj0wLg.

khi consumer ngắt kết nối, hàng đợi (queue) sẽ bị xóa vì nó được khai báo là exclusive với trạng thái là true.

Đặt thuộc tính exclusive của queue về false, điều này có nghĩa là nhiều consumer có thể kết nối vào queue đó cùng một lúc, còn nếu là true có nghĩa là chỉ có một consumer duy nhất được phép kết nối với queue này và queue sẽ bị xóa khi consumer ngắt kết nối.

Bạn có thể tìm hiểu thêm về exclusive flag và các thuộc tính queue khác trong hướng dẫn về hàng đợi (queue).

Bindings

Chúng ta đã tạo một fanout exchange và một hàng đợi (queue). Bây giờ chúng ta cần thông báo cho exchange gửi các message đến hàng đợi (queue) của chúng ta. Mối quan hệ giữa sự exchange và một hàng đợi (queue) được gọi là một binding.

$channel->queue_bind($queue_name, 'logs');

Kể từ bây giờ, logs exchange sẽ gửi các message đến hàng đợi (queue) của chúng ta.

Liệt kê các binding Bạn có thể liệt kê các binding hiện có bằng cách sử dụng:

rabbitmqctl list_bindings

Test

Chương trình producer, làm nhiệm vụ phát ra các message log, không khác biệt nhiều so với hướng dẫn trước đó. Thay đổi quan trọng nhất là chúng ta muốn publish messages đến logs exchange của chúng ta thay vì là sự exchange không tên. Dưới đây là đoạn mã code cho tập lệnh emit_log.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

Như bạn thấy, sau khi thiết lập kết nối, chúng ta đã khai báo exchange. Bước này là cần thiết vì việc publish đến một exchange không tồn tại là bị cấm.

Các message sẽ bị mất nếu chưa có hàng đợi (queue) nào được gắn với exchange, nhưng điều này không thành vấn đề đối với chúng ta; nếu không có consumer nào đang lắng nghe thì chúng ta có thể an toàn bỏ qua message đó.

Đoạn mã cho receive_logs.php là:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->getBody(), "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->getMessage();
}

$channel->close();
$connection->close();

Do cả producer và worker đều sử dụng cùng một exchange 'logs' với loại 'fanout', khi producer gửi một message đi, nó sẽ được phát tán (broadcast) tới tất cả các hàng đợi đang được gắn với exchange 'logs', bao gồm cả hàng đợi (queue) mà worker đang sử dụng.

Vậy tóm lại trong RabbitMQ, exchange loại fanout hoạt động như sau:

  1. Nhận message từ producer: Một producer gửi một message tới một exchange fanout.

  2. Phát tán message: Khi exchange fanout nhận được message từ producer, nó sẽ phát tán message đó tới tất cả các hàng đợi (queue) được gắn với nó, mà không quan tâm đến bất kỳ routing key nào. Điều này có nghĩa là mọi hàng đợi (queue) gắn với exchange fanout sẽ nhận được bản sao của message này.

  3. Gửi message tới các consumer: Các consumer đang lắng nghe (consume) trên các hàng đợi (queue) được gắn với exchange fanout sẽ nhận được bản sao của message. Mỗi consumer sẽ nhận được message độc lập từ các hàng đợi (queue) của mình.

Kết quả là mỗi message được gửi tới exchange fanout sẽ được phát tán (broadcast) tới tất cả các hàng đợi (queue) được gắn với nó, mà không quan tâm đến bất kỳ quy tắc định tuyến (routing rules) nào như trong các loại exchange khác. Điều này cho phép các consumer nhận được bản sao của mỗi message mà không cần xác định một routing key cụ thể.


Có thể bạn chưa biết?


- Khi sử dụng exchange loại fanout, các thông tin thống kê liên quan đến hàng đợi như số lượng message được chờ xử lý (queued), số lượng message đã sẵn sàng (ready), số lượng message chưa được xác nhận (unacked), và tổng số lượng message (total) có thể không hiển thị trên dashboard của RabbitMQ.

Nguyên nhân chính là do cách fanout exchange hoạt động. Fanout exchange không quan tâm đến các bindings và không xác định cụ thể các hàng đợi mà message sẽ được gửi đến. Thay vào đó, nó sẽ gửi message đến tất cả các hàng đợi đã được kết nối với nó, mà không cần xác định rõ các thông tin thống kê như số lượng message đã được gửi đến mỗi hàng đợi.

Do đó, khi sử dụng exchange fanout, các thông tin thống kê như queued, ready, unacked và total có thể không được hiển thị hoặc không có giá trị cụ thể trên dashboard của RabbitMQ.

Tuy nhiên, mặc dù không có các kết nối cụ thể hoặc hàng đợi được hiển thị trên dashboard, message vẫn được phân phối thành công đến các consumer như bình thường. Điều này là do cách hoạt động của exchange fanout trong RabbitMQ, nơi mà message được gửi tới tất cả các hàng đợi mà exchange này đang gắn kết mà không cần thiết lập các kết nối trực tiếp.


Khi queue name được tạo ra ngẫu nhiên, mỗi lần kết nối một consumer mới, một hàng đợi mới sẽ được tạo và các tin nhắn sẽ được gửi đến hàng đợi đó. Điều này giúp đảm bảo rằng mỗi tin nhắn sẽ chỉ được xử lý bởi một worker duy nhất, giảm nguy cơ mất tin nhắn khi một worker bị die.


 
 
 

Bình luận (0)

Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough

Bài viết liên quan

Learning English Everyday