Quay lại

Point-to-Point work queue trong Rabbitmq Chuyên mục RabbitMQ    2024-04-06    15 Lượt xem    12 Lượt thích    comment-3 Created with Sketch Beta. 0 Bình luận

Point-to-Point work queue trong Rabbitmq

Work queue là một loại cơ chế trong hệ thống phân phối tin nhắn (messaging system), nơi các công việc (tasks) hoặc các tin nhắn được đặt vào một hàng đợi (queue) để chờ xử lý bởi các worker. Trong một work queue, các công việc được đưa vào hàng đợi và sau đó được lấy ra và xử lý bởi các worker một cách tuần tự hoặc song song. Cơ chế này thường được sử dụng để phân phối công việc giữa các worker một cách hiệu quả, giúp tối ưu hóa tài nguyên và thời gian xử lý. RabbitMQ là một trong những hệ thống phân phối tin nhắn (message broker) phổ biến được sử dụng để triển khai các work queue.

Trong bài hướng dẫn đầu tiên, chúng ta đã viết các chương trình để gửi và nhận các tin nhắn từ một hàng đợi(queue) được đặt tên. Trong bài này, chúng ta sẽ tạo ra một Work Queue (Hàng đợi công việc) sẽ được sử dụng để phân phối các công việc tốn thời gian giữa nhiều worker.

Ý tưởng chính đằng sau Work Queue (còn được gọi là: Task Queue) là tránh thực hiện một công việc tốn tài nguyên ngay lập tức và phải chờ đợi cho đến khi nó hoàn thành. Thay vào đó, chúng ta lên lịch cho công việc đó được thực hiện sau. Chúng ta đóng gói một công việc thành một tin nhắn và gửi nó đến một hàng đợi. Một tiến trình worker chạy ở nền sẽ lấy các công việc và cuối cùng thực hiện công việc đó. Khi bạn chạy nhiều workers, các công việc sẽ được chia sẻ giữa chúng.

Khái niệm này đặc biệt hữu ích trong các ứng dụng web nơi không thể xử lý một công việc phức tạp trong một khoảng thời gian yêu cầu HTTP ngắn.

Chuẩn bị

Trong phần trước của bài hướng dẫn này, chúng ta đã gửi một tin nhắn chứa "Hello World!". Bây giờ chúng ta sẽ gửi các chuỗi đại diện cho các nhiệm vụ phức tạp. Chúng ta không có một nhiệm vụ thực tế, như ảnh cần được thay đổi kích thước hoặc tệp pdf cần được tạo ra, vì vậy hãy giả mạo bằng cách sử dụng hàm sleep(). Chúng ta sẽ lấy số lượng dấu chấm trong chuỗi làm độ phức tạp của nó; mỗi dấu chấm sẽ tương ứng với một giây "work". Ví dụ, một nhiệm vụ giả mạo được mô tả bằng "Hello..." sẽ mất ba giây.

Chúng ta sẽ điều chỉnh một chút mã send.php từ ví dụ trước của chúng ta, để cho phép các thông điệp tùy ý được gửi từ dòng lệnh. Chương trình này sẽ lên lịch cho các nhiệm vụ đến hàng đợi công việc của chúng ta, vì vậy hãy đặt tên nó là new_task.php:

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo ' [x] Sent ', $data, "\n";

Kịch bản receive.php cũ của chúng ta cũng cần một số thay đổi: nó cần giả mạo một giây công việc cho mỗi dấu chấm trong nội dung tin nhắn. Nó sẽ lấy các tin nhắn từ hàng đợi và thực hiện công việc, vì vậy hãy gọi nó là worker.php:

$callback = function ($msg) {
  echo ' [x] Received ', $msg->getBody(), "\n";
  sleep(substr_count($msg->getBody(), '.'));
  echo " [x] Done\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

Hãy mở 2 terminal và chạy 2 lệnh sau:

# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."

Round-robin dispatching

Một trong những lợi ích của việc sử dụng Hàng đợi công việc là khả năng song song hóa công việc một cách dễ dàng. Nếu chúng ta đang xây dựng một ngăn xếp công việc, chúng ta chỉ cần thêm nhiều workers hơn và như vậy, có thể mở rộng một cách dễ dàng.

Đầu tiên, hãy thử chạy hai tập lệnh worker.php cùng một lúc. Chúng sẽ cùng nhận các tin nhắn từ hàng đợi, nhưng cách thức là như thế nào? Hãy xem.

Bạn cần mở ba cửa sổ terminal. Hai cửa sổ sẽ chạy kịch bản worker.php. Các cửa sổ này sẽ là hai consumers của chúng ta - C1 và C2.

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C

Trong cửa sổ thứ ba, chúng ta sẽ gửi các nhiệm vụ mới. Sau khi bạn đã bắt đầu các consumer, bạn có thể gửi một số tin nhắn:

# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

Hãy xem những gì được giao cho workers của chúng ta:

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

Mặc định, RabbitMQ sẽ gửi mỗi tin nhắn cho consumer tiếp theo, theo thứ tự. Trung bình, mỗi consumer sẽ nhận cùng một số lượng tin nhắn. Cách phân phối tin nhắn này được gọi là round-robin. Hãy thử điều này với ba hoặc nhiều worker hơn.

Tóm lại:

Round-robin dispatching trong RabbitMQ là cách phân phối các tin nhắn từ một hàng đợi đến các consumer theo thứ tự tuần tự. Khi một tin nhắn được gửi đến hàng đợi, RabbitMQ sẽ gửi tin nhắn đó cho consumer tiếp theo trong danh sách các consumer đang chờ đợi. Nếu có nhiều consumer đang chờ đợi, mỗi tin nhắn sẽ được gửi đến consumer tiếp theo một cách lần lượt, giữa các consumer.

Điều này có nghĩa là mỗi consumer sẽ nhận được một phần của công việc, giúp phân phối công việc một cách công bằng giữa các consumer. Kỹ thuật round-robin dispatching giúp tối ưu hóa việc sử dụng tài nguyên và đảm bảo rằng không có consumer nào bị quá tải trong quá trình xử lý công việc từ hàng đợi.

Message acknowledgment

Việc thực hiện một nhiệm vụ có thể mất vài giây, bạn có thể tự hỏi điều gì sẽ xảy ra nếu một consumer bắt đầu một nhiệm long task và nó kết thúc trước khi hoàn thành. Với mã hiện tại của chúng ta, một khi RabbitMQ gửi một tin nhắn đến consumer, nó ngay lập tức đánh dấu đã xóa. Trong trường hợp này, nếu bạn kết thúc một worker, tin nhắn mà nó đang xử lý sẽ bị mất. Các tin nhắn đã được gửi đến worker cụ thể này nhưng chưa được xử lý xong cũng sẽ bị mất.

Nhưng chúng ta không muốn mất bất kỳ task nào. Nếu một worker chết, chúng ta muốn nhiệm vụ đó được gửi đến một worker khác.

Để đảm bảo một tin nhắn không bao giờ bị mất, RabbitMQ hỗ trợ các xác nhận tin nhắn. Một ack(nowledgement) được gửi lại bởi consumer để thông báo cho RabbitMQ biết một tin nhắn cụ thể đã được nhận, xử lý và rằng RabbitMQ có thể xóa nó.

Nếu một consumer chết (kênh của nó bị đóng, kết nối bị đóng, hoặc kết nối TCP bị mất) mà không gửi ack, RabbitMQ sẽ hiểu rằng một tin nhắn không được xử lý hoàn toàn và sẽ đưa lại nó vào hàng đợi. Nếu có các consumer khác đang trực tuyến cùng một lúc, nó sẽ nhanh chóng gửi lại nó cho một consumer khác. Như vậy, bạn có thể chắc chắn rằng không có tin nhắn nào bị mất, ngay cả khi các worker đôi khi chết.

Một thời gian chờ (30 phút mặc định) được áp dụng cho xác nhận giao nhận của consumer. Điều này giúp phát hiện các consumer lỗi (bị kẹt) không bao giờ xác nhận giao nhận. Bạn có thể tăng thời gian chờ này như mô tả trong Delivery Acknowledgement Timeout.

Các xác nhận tin nhắn trước đây đã được tắt bởi chính chúng ta. Đến lúc bật chúng bằng cách đặt tham số thứ tư của basic_consume thành false (true có nghĩa là không ack) và gửi một xác nhận chính xác từ worker, khi chúng ta đã hoàn thành một nhiệm vụ.

$callback = function ($msg) {
  echo ' [x] Received ', $msg->getBody(), "\n";
  sleep(substr_count($msg->getBody(), '.'));
  echo " [x] Done\n";
  $msg->ack();
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

Sử dụng mã này, bạn có thể đảm bảo rằng ngay cả khi bạn kết thúc một worker bằng CTRL+C trong khi nó đang xử lý một tin nhắn, không có gì bị mất. Ngay sau khi worker kết thúc, tất cả các tin nhắn chưa được xác nhận(unacknowledged) sẽ được gửi lại.

Acknowledgement(Xác nhận) phải được gửi trên cùng một kênh đã nhận. Cố gắng acknowledge (xác nhận) bằng cách sử dụng một kênh khác sẽ dẫn đến một channel-level protocol exception. Xem hướng dẫn tài liệu về các xác nhận để biết thêm thông tin.

Message durability

Chúng ta đã học cách đảm bảo rằng ngay cả khi consumer chết, nhiệm vụ vẫn không bị mất. Nhưng các nhiệm vụ của chúng ta vẫn sẽ bị mất nếu máy chủ RabbitMQ dừng hoạt động.

Khi RabbitMQ thoát hoặc gặp sự cố, nó sẽ quên các hàng đợi và tin nhắn trừ khi bạn báo cho nó biết không làm như vậy. Cần hai điều để đảm bảo rằng các tin nhắn không bị mất: Chúng ta cần đánh dấu cả hàng đợi và các tin nhắn là durable.

Đầu tiên, chúng ta cần đảm bảo rằng hàng đợi sẽ tồn tại sau khi một node RabbitMQ khởi động lại. Để làm điều này, chúng ta cần khai báo nó như là durable. Để làm điều này, chúng ta truyền tham số thứ ba vào queue_declaretrue:

$channel->queue_declare('hello', false, true, false, false);

Mặc dù lệnh này là đúng trong bản thân nó, nhưng nó sẽ không hoạt động trong thiết lập hiện tại của chúng ta.

Điều đó là vì chúng ta đã xác định một hàng đợi có tên là "hello" mà không phải là durable. RabbitMQ không cho phép bạn định nghĩa lại một hàng đợi hiện có với các tham số khác nhau và sẽ trả về một lỗi cho bất kỳ chương trình nào cố gắng làm điều đó. Nhưng có một cách để làm điều này một cách nhanh chóng - hãy khai báo một hàng đợi với tên khác nhau, ví dụ như task_queue:

$channel->queue_declare('task_queue', false, true, false, false);

Cờ này được đặt thành true cần được áp dụng cho cả producer and consumer code.

Tại thời điểm này, chúng ta chắc chắn rằng hàng đợi task_queue sẽ không bị mất ngay cả khi RabbitMQ khởi động lại. Bây giờ chúng ta cần đánh dấu các tin nhắn của chúng ta là persistent bằng cách đặt thuộc tính delivery_mode = 2 cho tin nhắn, mà AMQPMessage nhận như một phần của mảng thuộc tính.

$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

Note on message persistence

Đánh dấu tin nhắn là bền vững không đảm bảo hoàn toàn rằng một tin nhắn sẽ không bị mất. Mặc dù nó cho biết cho RabbitMQ lưu tin nhắn vào đĩa, vẫn có một khoảng thời gian ngắn khi RabbitMQ đã chấp nhận một tin nhắn nhưng chưa lưu nó. Hơn nữa, RabbitMQ không thực hiện fsync(2) cho mỗi tin nhắn - có thể chỉ được lưu vào bộ nhớ cache và không thực sự được ghi vào đĩa. Các cam kết về tính bền vững không mạnh mẽ, nhưng đó là đủ cho hàng đợi nhiệm vụ đơn giản của chúng ta. Nếu bạn cần một cam kết mạnh mẽ hơn, bạn có thể sử dụng xác nhận từ publisher.

Fair dispatch

Bạn có thể đã nhận thấy rằng dispatching (phân phối) vẫn không hoạt động chính xác như chúng ta muốn. Ví dụ, trong một tình huống với hai workers, khi ở worker 1 cần phải xử lý những tin nhắn nặng nề và còn worker 2 thì xử lý những task nhẹ nhàng hơn, một worker sẽ liên tục bận rộn và worker kia sẽ gần như không làm việc gì. Nhưng, RabbitMQ không biết gì về điều đó và vẫn sẽ phân phối tin nhắn một cách đồng đều.

Điều này xảy ra vì RabbitMQ chỉ phân phối một tin nhắn khi tin nhắn nhập vào hàng đợi. Nó không xem xét số lượng tin nhắn chưa được xác nhận cho một consumer. Nó chỉ mù quáng phân phối mỗi tin nhắn thứ n cho consumer thứ n.

Để khắc phục điều đó, chúng ta có thể sử dụng phương thức basic_qos với cài đặt prefetch_count = 1. Điều này cho biết cho RabbitMQ không cung cấp hơn một tin nhắn cho một worker tại một thời điểm. Hoặc, nói cách khác, đừng phân phối một tin nhắn mới cho một worker cho đến khi nó đã xử lý và xác nhận tin nhắn trước đó. Thay vào đó, nó sẽ phân phối nó cho worker tiếp theo chưa bận.

$channel->basic_qos(null, 1, false);

Note về queue size

Nếu tất cả các workers đều bận rộn, hàng đợi của bạn có thể đầy. Bạn sẽ muốn theo dõi điều đó và có thể thêm nhiều workers hơn hoặc sử dụng một chiến lược khác.

Code

new_task.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

worker.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] Received ', $msg->getBody(), "\n";
    sleep(substr_count($msg->getBody(), '.'));
    echo " [x] Done\n";
    $msg->ack();
};

$channel->basic_qos(null, 1, false);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->getMessage();
}

$channel->close();
$connection->close();

Sử dụng acknowledgments and prefetch, bạn có thể thiết lập một hàng đợi công việc. Các tùy chọn bền vững cho phép các nhiệm vụ tồn tại ngay cả khi RabbitMQ được khởi động lại.

 

Bình luận (0)

Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough

Bài viết liên quan

Learning English Everyday