RabbitMQ na VPS — exchanges, queues i PHP consumer
Opublikowano: 10 kwietnia 2026 · Kategoria: VPS
Wysyłka emaili podczas żądania HTTP sprawia że user czeka kilka sekund. Generowanie raportu PDF blokuje PHP na minutę. Importowanie 10 000 rekordów z CSV zajmuje pamięć procesu webowego. Rozwiązaniem jest kolejkowanie zadań przez message broker: aplikacja odkłada zadanie do kolejki i odpowiada natychmiast, a osobny worker przetwarza je asynchronicznie. RabbitMQ to jeden z najpopularniejszych message brokerów — dojrzały, niezawodny, z rozbudowanym management UI. Ten artykuł pokazuje kompletną instalację i integrację z PHP przez bibliotekę amqplib.
Instalacja RabbitMQ na Ubuntu 22.04 / 24.04
# Instalacja przez oficjalny skrypt packagecloud curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.deb.sh | sudo bash sudo apt update sudo apt install erlang-base erlang-asn1 erlang-crypto erlang-eldap \ erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools \ erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl \ erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl -y sudo apt install rabbitmq-server -y # Start i autostart sudo systemctl enable --now rabbitmq-server sudo systemctl status rabbitmq-server # Wlacz management UI (port 15672) sudo rabbitmq-plugins enable rabbitmq_management # Sprawdz wersje sudo rabbitmqctl status | grep -E "RabbitMQ|Erlang"
Użytkownicy, vhosty i uprawnienia
# Domyslny uzytkownik "guest" dziala TYLKO z localhost # Stworz uzytkownika admina dla management UI sudo rabbitmqctl add_user admin StrongPassword123 sudo rabbitmqctl set_user_tags admin administrator sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" # Stworz uzytkownika aplikacji z ograniczonymi uprawnieniami sudo rabbitmqctl add_user appuser AppPassword456 # Parametry: pattern dla configure, write, read sudo rabbitmqctl set_permissions -p / appuser "^app\." "^app\." "^app\." # Virtual hosty — separacja srodowisk sudo rabbitmqctl add_vhost production sudo rabbitmqctl add_vhost staging sudo rabbitmqctl set_permissions -p production appuser ".*" ".*" ".*" # Lista uzytkownikow i vhostow sudo rabbitmqctl list_users sudo rabbitmqctl list_vhosts
Exchanges — typy i zastosowania
| Exchange | Routing | Kiedy używać |
|---|---|---|
| direct | routing_key = binding_key (dokładny match) | Task queues, jeden typ zadania → jedna kolejka |
| topic | Wzorzec z * (1 słowo) i # (0+) | Multi-tenant routing, filtrowanie po kategoriach |
| fanout | Ignoruje routing_key, wszystkie queues | Broadcast: inwalidacja cache, notyfikacje push |
| headers | Nagłówki AMQP (x-match: all/any) | Złożone filtrowanie, rzadko używane |
| default | routing_key = nazwa kolejki | Prostota — bezpośredni publish do queue |
# Tworzenie exchange, queue i binding przez management HTTP API
# Direct exchange dla emaili
curl -u admin:StrongPassword123 -X PUT http://localhost:15672/api/exchanges/%2F/emails \
-H "content-type: application/json" \
-d '{"type":"direct","durable":true}'
# Utwórz kolejke durable z DLX i TTL
curl -u admin:StrongPassword123 -X PUT http://localhost:15672/api/queues/%2F/email-transactional \
-H "content-type: application/json" \
-d '{
"durable": true,
"arguments": {
"x-dead-letter-exchange": "emails-dlx",
"x-message-ttl": 86400000,
"x-max-length": 100000
}
}'
# Binding: exchange "emails" → queue przez klucz "transactional"
curl -u admin:StrongPassword123 -X POST \
http://localhost:15672/api/bindings/%2F/e/emails/q/email-transactional \
-H "content-type: application/json" \
-d '{"routing_key":"transactional"}' PHP AMQP — producent i konsument
Instalacja biblioteki php-amqplib przez Composer:
composer require php-amqplib/php-amqplib
<?php
// producer.php — wysyla zadania do kolejki
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'appuser', 'AppPassword456');
$channel = $connection->channel();
// Deklaracja exchange (idempotentna)
$channel->exchange_declare('emails', 'direct', false, true, false);
$payload = json_encode([
'to' => '[email protected]',
'subject' => 'Potwierdzenie zamowienia #12345',
'template' => 'order_confirmation',
'order_id' => 12345,
]);
$message = new AMQPMessage($payload, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // Zapis na dysk
'content_type' => 'application/json',
]);
// Publisher Confirms — czekaj na potwierdzenie od brokera
$channel->confirm_select();
$channel->basic_publish($message, 'emails', 'transactional');
$channel->wait_for_pending_acks(5.0);
echo "Wiadomosc wyslana\n";
$channel->close();
$connection->close(); <?php
// consumer.php — worker przetwarzajacy wiadomosci
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'appuser', 'AppPassword456');
$channel = $connection->channel();
// prefetch=1: pobierz jedna wiadomosc na raz (fair dispatch)
$channel->basic_qos(0, 1, false);
$callback = function (AMQPMessage $msg): void {
$data = json_decode($msg->getBody(), true);
try {
echo "Przetwarzam email do: {$data['to']}\n";
// ... logika wysylki emaila ...
$msg->ack(); // Potwierdzenie — wiadomosc usunieta z kolejki
echo "Email wyslany, ACK\n";
} catch (\Exception $e) {
echo "Blad: {$e->getMessage()}\n";
$msg->nack(false); // NACK, requeue=false → trafi do DLX
}
};
$channel->basic_consume('email-transactional', '', false, false, false, false, $callback);
echo "Worker gotowy, czekam na wiadomosci...\n";
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close(); Dead Letter Exchange — obsługa błędów i TTL
# Utwórz DLX exchange i dead-letter queue
curl -u admin:StrongPassword123 -X PUT http://localhost:15672/api/exchanges/%2F/emails-dlx \
-H "content-type: application/json" \
-d '{"type":"direct","durable":true}'
curl -u admin:StrongPassword123 -X PUT http://localhost:15672/api/queues/%2F/email-dead-letter \
-H "content-type: application/json" \
-d '{"durable":true}'
curl -u admin:StrongPassword123 -X POST \
http://localhost:15672/api/bindings/%2F/e/emails-dlx/q/email-dead-letter \
-H "content-type: application/json" \
-d '{"routing_key":"transactional"}'
# Wiadomosci odrzucone (nack requeue=false) lub po TTL laduja w email-dead-letter Monitoring przez CLI i management UI
# Monitoring kolejek (liczba wiadomosci oczekujacych i niepotwierdzonych)
sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
# Tunel SSH do management UI
ssh -L 15672:localhost:15672 user@vps-ip
# Nastepnie: http://localhost:15672
# Podejrzyj wiadomosc bez usuwania (dla debugowania)
curl -u admin:StrongPassword123 -X POST \
http://localhost:15672/api/queues/%2F/email-dead-letter/get \
-H "content-type: application/json" \
-d '{"count":5,"ackmode":"ack_requeue_true","encoding":"auto"}'
# Zuzycie pamieci i dysku
sudo rabbitmq-diagnostics memory_breakdown Klastrowanie — 3 węzły z Quorum Queues
# Na WSZYSTKICH węzłach — ustaw /etc/hosts
echo "10.0.0.1 rabbit1" | sudo tee -a /etc/hosts
echo "10.0.0.2 rabbit2" | sudo tee -a /etc/hosts
echo "10.0.0.3 rabbit3" | sudo tee -a /etc/hosts
# Skopiuj Erlang cookie z węzła 1 na pozostałe (musi byc identyczny)
sudo cat /var/lib/rabbitmq/.erlang.cookie
# Skopiuj wynik do /var/lib/rabbitmq/.erlang.cookie na rabbit2 i rabbit3
# sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
# Na rabbit2 i rabbit3 — dolacz do klastra
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app
# Sprawdz status klastra
sudo rabbitmqctl cluster_status
# Quorum queue (HA z automatyczna replikacja — zalecane zamiast classic mirrored)
curl -u admin:StrongPassword123 -X PUT http://localhost:15672/api/queues/%2F/tasks-ha \
-H "content-type: application/json" \
-d '{
"durable": true,
"arguments": {
"x-queue-type": "quorum",
"x-dead-letter-exchange": "emails-dlx"
}
}' Dobre praktyki
- Zawsze używaj durable queues i persistent messages — inaczej restart RabbitMQ wyczyści wszystkie wiadomości bez ostrzeżenia.
- prefetch_count=1 dla CPU-intensive tasks — fair dispatch zapobiega sytuacji gdzie jeden worker dostaje 1000 zadań, a pozostałe stoją bezczynnie.
- DLX jest obowiązkowy — bez niego błędnie przetworzone wiadomości giną bezpowrotnie lub zapętlają się w nieskończoność.
- Quorum queues zamiast classic mirrored — od RabbitMQ 3.10+ quorum queues są domyślnie zalecane w klastrach (lepsza spójność danych).
- Firewall dla portów AMQP — port 5672 i management UI (15672) powinny być dostępne tylko z sieci prywatnej lub przez tunel SSH.