Quay lại

Routing - Direct exchange trong RabbitMQ Chuyên mục RabbitMQ    2024-04-07    22 Lượt xem    21 Lượt thích    comment-3 Created with Sketch Beta. 0 Bình luận

Routing - Direct exchange trong RabbitMQ

Trong bài hướng dẫn trước đó, chúng ta đã xây dựng một hệ thống ghi log đơn giản. Chúng ta đã có thể phát sóng (broadcast) các message log đến nhiều receivers.

Trong bài hướng dẫn này, chúng ta sẽ thêm một tính năng mới vào hệ thống - chúng ta sẽ làm cho việc subscribe chỉ dựa trên một phần nhỏ của các message trở thành khả thi. Ví dụ, chúng ta sẽ có thể chuyển chỉ các message lỗi quan trọng (critical error) đến tệp log (để tiết kiệm không gian đĩa), trong khi vẫn có thể in tất cả các message log trên cửa sổ console.

Bindings

Trong các ví dụ trước đó, chúng ta đã tạo các ràng buộc (bindings) rồi. Bạn có thể nhớ mã code như sau:

$channel->queue_bind($queue_name, 'logs');

Một ràng buộc là một mối quan hệ giữa một exchange và một hàng đợi (queue). Điều này có thể đơn giản đọc như sau: hàng đợi (queue) quan tâm đến các message từ exchange này.

Ràng buộc có thể có thêm tham số routing_key. Để tránh nhầm lẫn với một tham số của $channel::basic_publish, chúng ta sẽ gọi nó là binding key. Đây là cách chúng ta có thể tạo một ràng buộc với một key:

$binding_key = 'black'; 
$channel->queue_bind($queue_name, $exchange_name, $binding_key);

Ý nghĩa của một binding key phụ thuộc vào loại exchange. Các exchange loại fanout, mà chúng ta đã sử dụng trước đó, đơn giản là bỏ qua giá trị của nó.

Direct exchange

Hệ thống ghi log của chúng ta từ bài hướng dẫn trước đó phát sóng (broadcast) tất cả các message đến tất cả các consumers. Chúng ta muốn mở rộng chức năng đó để cho phép lọc các message dựa trên mức độ nghiêm trọng của chúng. Ví dụ, chúng ta có thể muốn kịch bản là ghi các message log vào ổ đĩa (disk) nhưng chỉ nhận các critical errors, và không lãng phí không gian disk cho các warning or info log messages.

Chúng ta đã sử dụng một exchange loại fanout, mà không cung cấp nhiều linh hoạt - nó chỉ có khả năng phát sóng (broadcast) vô thức.

Thay vào đó, chúng ta sẽ sử dụng một exchange loại direct. Thuật toán định tuyến phía sau một exchange direct rất đơn giản - một message sẽ được gửi đến các hàng đợi (queue) có binding key chính xác trùng khớp với routing key của message.

Để minh họa điều đó, hãy xem xét cài đặt sau:

Trong cài đặt này, chúng ta có thể thấy exchange direct X với hai hàng đợi (queue) được ràng buộc với nó. hàng đợi (queue) đầu tiên được ràng buộc với binding keyorange, và hàng đợi (queue) thứ hai có hai ràng buộc, một với binding keyblack và một với green.

Trong một cài đặt như vậy, một message được xuất bản (publish) đến exchange với một routing keyorange sẽ được định tuyến đến hàng đợi (queue) Q1. Các message với routing key là black hoặc green sẽ đi đến Q2. Tất cả các message khác sẽ bị loại bỏ.

Multiple bindings

Việc ràng buộc nhiều hàng đợi (queue) với cùng một khóa ràng buộc là hoàn toàn hợp lệ. Trong ví dụ của chúng ta, chúng ta có thể thêm một ràng buộc giữa XQ1 với khóa ràng buộc là black. Trong trường hợp đó, direct exchange sẽ hoạt động giống như fanout và sẽ phát sóng (broadcast) message đến tất cả các hàng đợi (queue) phù hợp. Một message có routing key là black sẽ được gửi đến cả Q1Q2.

Emitting logs

Chúng ta sẽ sử dụng mô hình này cho hệ thống ghi log của chúng ta. Thay vì sử dụng fanout, chúng ta sẽ gửi các message đến một direct exchange. Chúng ta sẽ cung cấp mức độ nghiêm trọng (severity) của log làm routing key. Điều này sẽ cho phép script nhận message có mức độ nghiêm trọng mà nó muốn nhận. Hãy tập trung vào việc phát log trước tiên.

Như thường lệ, chúng ta cần tạo một exchange trước:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

Và chúng ta đã sẵn sàng gửi một message:

$channel->exchange_declare('direct_logs', 'direct', false, false, false); 
$channel->basic_publish($msg, 'direct_logs', $severity);

Để đơn giản hóa việc, chúng ta sẽ giả định rằng '$severity có thể là 'info', 'warning', 'error'.

Subscribing

Nhận các message sẽ hoạt động giống như trong hướng dẫn trước đó, với một ngoại lệ - chúng ta sẽ tạo một binding mới cho mỗi mức độ nghiêm trọng mà chúng ta quan tâm.

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

Test

emit_log_direct.php class:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo ' [x] Sent ', $severity, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs_direct.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->getRoutingKey(), ':', $msg->getBody(), "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->getMessage();
}

$channel->close();
$connection->close();

Nếu bạn muốn lưu trữ chỉ các thông báo log 'warning' và 'error' (không bao gồm 'info') vào một file, chỉ cần mở một cửa sổ dòng lệnh và gõ:

php receive_logs_direct.php warning error > logs_from_rabbit.log

Nếu bạn muốn xem tất cả các thông báo log trên màn hình của bạn, hãy mở một cửa sổ dòng lệnh mới và thực hiện:

php receive_logs_direct.php info warning error
# => [*] Đang chờ các thông báo log. Để thoát nhấn CTRL+C​

Và, ví dụ, để gửi một thông báo log lỗi, chỉ cần gõ:

php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Gửi 'error':'Run. Run. Or it will explode.'

Bình luận (0)

Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough

Bài viết liên quan

Learning English Everyday