Quay lại
Point-to-Point trong Rabbitmq

Như mình đã trình bày ở bài trước:

Bạn có thể nghĩ về Rabbitmq như một văn phòng bưu điện: khi bạn đặt thư mà bạn muốn gửi vào một hòm thư, bạn có thể chắc chắn rằng người chuyển thư sẽ giao thư đến người nhận của bạn. Trong phần mình hoạ này, RabbitMQ giống như một hòm thư, một văn phòng bưu điện, và một người chuyển thư. Sự khác biệt chính giữa RabbitMQ và văn phòng bưu điện là nó không xử lý giấy, thay vào đó nó chấp nhận, lưu trữ, và chuyển tiếp các khối dữ liệu nhị phân - các tin nhắn

Trước khi vào thực hành chúng ta cần phải cài đặt Rabbitmq:

Installing RabbitMQ

Bản latest release của RabbitMQ là 3.13.1. Hãy xem change log trong release notes. Hoặc bạn cũng có thể cài đặt trên community Docker image:

Cài bằng docker:

# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

Hoặc cài RabbitMQ Server : Hướng dẫn cài đặt

Thực Hành

Trước khi vào thực hành chúng ta cần phải biết một số khái niệm trong Rabbitmq:

  • Producing nghĩa là không gì khác ngoài việc gửi. Một chương trình gửi tin nhắn được gọi là một producer (P).
  • Hàng đợi (Queue) là tên gọi của hộp thư trong RabbitMQ. Mặc dù các tin nhắn lưu thông qua RabbitMQ và các ứng dụng của bạn, nhưng chúng chỉ có thể được lưu trữ trong một hàng đợi. Một Queue bị ràng buộc bởi memory & disk limits của máy chủ, nó về cơ bản là một bộ đệm tin nhắn lớn (large buffer that stores messages)
    • Nhiều producers có thể gửi các tin nhắn đi đến một hàng đợi, và nhiều consumers có thể cố gắng nhận dữ liệu từ một hàng đợi.

      Dưới đây là cách chúng ta biểu diễn một hàng đợi:

    • Tại sao nói một Queue chỉ bị ràng buộc bởi memory & disk limits của máy chủ?:
      • Memory Limits: Bộ nhớ của máy chủ sẽ ảnh hưởng đến khả năng lưu trữ các tin nhắn trong hàng đợi. Nếu bộ nhớ của máy chủ không đủ lớn, hàng đợi có thể không thể chứa được nhiều tin nhắn hoặc có thể gặp vấn đề về hiệu suất khi xử lý số lượng lớn tin nhắn.

      • Disk Limits: Nếu RabbitMQ được cấu hình để lưu trữ các tin nhắn trên đĩa, dung lượng ổ đĩa của máy chủ sẽ giới hạn số lượng tin nhắn mà hàng đợi có thể chứa được. Nếu dung lượng ổ đĩa không đủ lớn, hàng đợi có thể bị đầy và không thể tiếp nhận thêm tin nhắn mới.

        Việc nói rằng hàng đợi chỉ bị ràng buộc bởi giới hạn về bộ nhớ và ổ đĩa của máy chủ nhấn mạnh vào việc quản lý tài nguyên của hệ thống. Khi triển khai RabbitMQ trong môi trường sản xuất, cần phải đảm bảo rằng máy chủ được cấu hình với đủ tài nguyên để đáp ứng nhu cầu lưu trữ và xử lý tin nhắn của ứng dụng.

    • Buffer that stores messages
      • Trong ngữ cảnh của hệ thống thông điệp như RabbitMQ, "buffer" (bộ đệm) đề cập đến một vùng lưu trữ tạm thời dùng để chứa dữ liệu (trong trường hợp này là các tin nhắn) trước khi chúng được xử lý hoặc gửi đi. Trong một hàng đợi, các tin nhắn được lưu trữ trong bộ đệm cho đến khi chúng được tiêu thụ bởi các workers hoặc consumer hoặc được gửi đi đến người nhận. Điều này giúp giảm thiểu sự mất mát dữ liệu và duy trì trạng thái chờ đợi cho các quá trình xử lý.
  • Consuming có ý nghĩa tương tự như việc nhận. Một consumer là một chương trình chủ yếu đợi để nhận các tin nhắn (C).

Lưu ý rằng producer, consumer và broker không cần phải đặt trên cùng một máy chủ; thực tế, trong hầu hết các ứng dụng, chúng không đặt trên cùng một máy chủ. Một ứng dụng cũng có thể vừa là producer và consumer.


Sau khi chúng ta đã hiểu một số khái niệm cơ bản chúng ta cùng nhau thực hành với một ví dụ đơn giản.

Trong phần hướng dẫn này, chúng ta sẽ viết hai chương trình bằng PHP giao tiếp bằng RabbitMQ. Hướng dẫn này sử dụng client library yêu cầu PHP 7.x hoặc 8.x.

Chương trình đầu tiên sẽ là một producer gửi một tin nhắn duy nhất, và chương trình thứ hai sẽ là một consumer nhận các tin nhắn và in chúng ra. Chúng ta sẽ bỏ qua một số chi tiết trong API php-amqplib, tập trung vào việc đơn giản này chỉ để bắt đầu. Đó là một "Hello World" của messaging.

Trong sơ đồ dưới đây, "P" là producer của chúng ta và "C" là consumer của chúng ta. Hộp ở giữa là một hàng đợi (Queue) - một bộ đệm tin nhắn mà RabbitMQ giữ cho consumer.

php-amqplib client library

RabbitMQ hỗ trợ nhiều giao thức khác nhau. Hướng dẫn này tập trung vào AMQP 0-9-1, đó là một giao thức mở, đa mục đích cho messaging. Có nhiều client library cho RabbitMQ trong nhiều ngôn ngữ khác nhau. Chúng ta sẽ sử dụng php-amqplib trong hướng dẫn này, và Composer cho quản lý các phụ thuộc.

Thêm một tệp composer.json vào dự án của bạn:

{ "require": { "php-amqplib/php-amqplib": "^3.6" } }

Miễn là bạn đã cài đặt và sử dụng Composer thành công, bạn có thể chạy lệnh sau:

php composer.phar install

Cũng có một cài đặt Composer cho Windows.

Bây giờ chúng ta đã cài đặt thư viện php-amqplib, chúng ta có thể viết một số mã code.

Sending

Chúng ta sẽ gọi chương trình gửi tin nhắn (người gửi) là send.php và chương trình nhận tin nhắn là receive.php. Người gửi sẽ kết nối đến RabbitMQ, gửi một tin nhắn duy nhất, sau đó thoát.

Trong send.php, chúng ta cần bao gồm thư viện và sử dụng các class cần thiết:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

?>

Sau đó chúng ta có thể tạo kết nối đến máy chủ:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

Kết nối trừu tượng hóa kết nối socket và đảm bảo việc thương thảo phiên bản giao thức và xác thực và cả việc này đều được thực hiện tự động. Ở đây, chúng ta kết nối đến một node RabbitMQ trên máy cục bộ (local machine) - do đó là localhost. Nếu chúng ta muốn kết nối đến một node trên một máy khác hoặc đến một máy chủ đang chứa một proxy recommended for PHP clients, chúng ta chỉ cần chỉ định tên máy chủ hoặc địa chỉ IP tại đây.

Tiếp theo, chúng ta tạo một kênh (channel), nơi mà hầu hết các API cho việc thực hiện các công việc được thực hiện.

Để gửi, chúng ta phải khai báo một hàng đợi để gửi đến; sau đó chúng ta có thể publish ( xuất bản) một tin nhắn đến hàng đợi:

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

Việc khai báo một hàng đợi là idempotent - nó chỉ được tạo ra nếu nó chưa tồn tại. Nội dung của tin nhắn là một mảng byte, vì vậy bạn có thể mã hóa bất kỳ thứ gì bạn muốn ở đó.

Cuối cùng, chúng ta đóng kênh và kết nối:

$channel->close();
$connection->close();

Việc đóng kết nối và kênh là một phần quan trọng của việc quản lý tài nguyên trong ứng dụng của bạn. Dưới đây là một số lý do:

  1. Giải phóng tài nguyên: Khi bạn hoàn thành việc sử dụng kết nối và kênh, việc đóng chúng giúp giải phóng tài nguyên hệ thống như bộ nhớ và các tài nguyên mạng. Điều này giúp giảm tải cho máy chủ RabbitMQ và cung cấp tài nguyên cho các ứng dụng khác sử dụng máy chủ.

  2. Đảm bảo tính ổn định: Việc đóng kết nối và kênh sau khi hoàn thành công việc giúp đảm bảo rằng không có tài nguyên bị lãng phí và mọi kết nối được giữ trong trạng thái ổn định.

  3. Ngăn chặn rò rỉ bộ nhớ: Trong một số trường hợp, việc không đóng kết nối và kênh có thể dẫn đến rò rỉ bộ nhớ, khiến cho ứng dụng của bạn sử dụng nguồn tài nguyên mà không cần thiết.

  4. Phản hồi ngược lại cho RabbitMQ: Bằng cách đóng kết nối và kênh một cách hợp lý, bạn cung cấp thông báo cho RabbitMQ rằng việc sử dụng kết nối và kênh đã hoàn tất, cho phép RabbitMQ thực hiện các tác vụ quản lý tài nguyên phù hợp.

Đây là toàn bộ lớp send.php.

Gửi không hoạt động!

Nếu đây là lần đầu tiên bạn sử dụng RabbitMQ và bạn không thấy thông báo "Sent", bạn có thể cảm thấy bối rối và tự hỏi có gì không ổn. Có thể máy chủ đã được khởi động mà không có đủ không gian đĩa trống (mặc định cần ít nhất 200 MB trống) và do đó từ chối nhận các tin nhắn. Kiểm tra tệp log của máy chủ để xác nhận và giảm giới hạn nếu cần thiết. Bạn cần phải thiết lập disk_free_limit.

Receiving

Đó là tất cả về người gửi của chúng ta. Người nhận của chúng ta lắng nghe các tin nhắn từ RabbitMQ, vì vậy khác với người gửi, người nhận sẽ tiếp tục chạy để lắng nghe các tin nhắn và in chúng ra.

Đoạn mã (trong receive.php) gần như có cùng các hàm include và sử dụng như send:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

Thiết lập giống như người gửi; chúng ta mở một kết nối và một kênh, và khai báo hàng đợi mà chúng ta sẽ consume. Lưu ý rằng điều này khớp với hàng đợi mà người gửi đã publishes.

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

Lưu ý rằng chúng ta cũng khai báo hàng đợi ở đây. Bởi vì chúng ta có thể khởi động consumer trước người gửi, chúng ta muốn đảm bảo rằng hàng đợi tồn tại trước khi chúng ta cố gắng consume (tiêu thụ) các tin nhắn từ nó.

Chúng ta cần phải request server gửi các tin nhắn từ hàng đợi cho chúng ta. Chúng ta sẽ định nghĩa một PHP callable sẽ nhận các tin nhắn được gửi từ máy chủ. Hãy nhớ rằng các tin nhắn được gửi không đồng bộ từ máy chủ đến các client.

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->get
}

Phương thức basic_consume() để thiết lập một consumer trên hàng đợi có tên là "hello". Khi consumer này nhận được một tin nhắn mới từ hàng đợi, nó sẽ gọi hàm callback $callback như chúng ta đã định nghĩa để xử lý.

Phương thức $channel->consume(); trong RabbitMQ được sử dụng để bắt đầu quá trình consume (tiêu thụ) tin nhắn từ một hàng đợi cụ thể. Khi gọi phương thức này, kênh sẽ chờ đợi và lắng nghe các tin nhắn mới từ hàng đợi mà bạn đã chỉ định trong phương thức basic_consume().

Khi có tin nhắn mới được gửi đến hàng đợi, kênh sẽ gọi hàm callback mà bạn đã đặt trong phương thức basic_consume(), cho phép bạn xử lý tin nhắn đó.

Tóm lại, $channel->consume(); là một phần quan trọng trong việc thiết lập và bắt đầu quá trình tiêu thụ tin nhắn từ hàng đợi, và nó giữ cho ứng dụng của bạn chạy và lắng nghe các tin nhắn mới từ RabbitMQ.

Dưới đây là toàn bộ class receive.php:

Test

Bây giờ chúng ta có thể chạy cả hai tập lệnh. Trong một terminal,

chạy consumer (người nhận - receiver):

php receive.php

sau đó, chạy publisher (người gửi - sender):

php send.php

 

Bình luận (0)

Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough
Michael Gough

Bài viết liên quan

Learning English Everyday