diff --git a/README.md b/README.md index 25b1334..f732872 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ customizing concrete event or override this behaviour globally by changing following env variables: * `RABBITMQ_EVENT_CONNECTION=default` -* `RABBITMQ_EVENT_QUEUE=default` +* `RABBITMQ_EVENT_QUEUE=` * `RABBITMQ_EVENT_EXCHANGE=amq.direct` * `RABBITMQ_EVENT_EXCHANGE_TYPE=direct` * `RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT=true` diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 7e1f57d..a5f2c9a 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -35,7 +35,7 @@ return [ */ 'defaults' => [ 'connection' => env('RABBITMQ_EVENT_CONNECTION', 'default'), - 'queue' => env('RABBITMQ_EVENT_QUEUE', 'default'), + 'queue' => env('RABBITMQ_EVENT_QUEUE', ''), 'exchange' => env('RABBITMQ_EVENT_EXCHANGE', 'amq.direct'), 'exchange_type' => env('RABBITMQ_EVENT_EXCHANGE_TYPE', 'direct'), 'exchange_is_default' => (bool) env('RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT', true), diff --git a/src/Command/Consume.php b/src/Command/Consume.php index 4c7b602..7ec28d8 100644 --- a/src/Command/Consume.php +++ b/src/Command/Consume.php @@ -46,27 +46,35 @@ class Consume extends Command ); $queue = $this->getQueue($params); + $exchange = $this->getExchange($params); - $this->connection = $connector->connect($config, $queue); + $this->connection = $connector->connect($config, $exchange, $queue); $tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer'; $consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void { try { - $this->info(sprintf('Received message: %s', $message->getBody())); + $this->info('Received message'); + + if ($message->getRoutingKey()) { + $this->info( + sprintf('Routing key: %s', $message->getRoutingKey()) + ); + } + + $this->info($message->getBody()); $mergedQueue = $this->getMergedQueue($queue, $message); $handler->handle($mergedQueue, $message); $message->ack(); - $this->info('Message processed successfully.'); + $this->info('Message processed'); } catch (Exception $e) { $message->nack(); - $this->error( - sprintf('Processing error: %s', $e->getMessage()) - ); + $this->error('Error'); + $this->error($e->getMessage()); $logger->error($e->getMessage()); } @@ -89,18 +97,20 @@ class Consume extends Command private function getQueue(ConsumerParameters $params): Queue { - $routingKey = $params->routingKey->value() ?? ''; - return new Queue( name: $params->queue->value() ?? 'default', - exchange: new Exchange( - name: $params->exchange->value() ?? 'amq.direct', - type: $params->exchangeType->value() ?? 'direct', - isDefault: $params->exchangeIsDefault->value(), - declaration: new ExchangeDeclaration() - ), declaration: new QueueDeclaration(), - bindings: new QueueBindings($routingKey), + bindings: new QueueBindings($params->routingKey->toString()), + ); + } + + private function getExchange(ConsumerParameters $params): Exchange + { + return new Exchange( + name: $params->exchange->toString() ?? 'amq.direct', + type: $params->exchangeType->toString() ?? 'direct', + isDefault: $params->exchangeIsDefault->toBool(), + declaration: new ExchangeDeclaration() ); } @@ -108,7 +118,6 @@ class Consume extends Command { return new Queue( name: $queue->name(), - exchange: $queue->exchange(), declaration: $queue->declaration(), bindings: new QueueBindings( routingKey: $message->getRoutingKey(), diff --git a/src/Listener/PublishEvent.php b/src/Listener/PublishEvent.php index 39d8720..3fa1efa 100644 --- a/src/Listener/PublishEvent.php +++ b/src/Listener/PublishEvent.php @@ -32,29 +32,34 @@ class PublishEvent implements ShouldQueue { $config = $this->configuration->getConnection($event->getConnection()); - $queue = new Queue( - name: $event->getQueue(), - exchange: new Exchange( - name: $event->getExchange(), - type: $event->getExchangeType(), - isDefault: $event->getExchangeIsDefault(), - declaration: new ExchangeDeclaration() - ), - declaration: new QueueDeclaration(), - bindings: new QueueBindings( - routingKey: $event->getRoutingKey() - ), + $exchange = new Exchange( + name: $event->getExchange(), + type: $event->getExchangeType(), + isDefault: $event->getExchangeIsDefault(), + declaration: new ExchangeDeclaration() ); - $connection = $this->connector->connect($config, $queue); + $queue = null; + + if ($event->getQueue()) { + $queue = new Queue( + name: $event->getQueue(), + declaration: new QueueDeclaration(), + bindings: new QueueBindings( + routingKey: $event->getRoutingKey() + ), + ); + } + + $connection = $this->connector->connect($config, $exchange, $queue); try { $message = $this->serializer->serialize($event); $connection->channel()->basic_publish( msg: $message, - exchange: $queue->exchange()->name(), - routing_key: $queue->bindings()->routingKey(), + exchange: $exchange->name(), + routing_key: $event->getRoutingKey() ?? '', ); } catch (Exception $e) { throw $e; diff --git a/src/Object/Queue.php b/src/Object/Queue.php index 44c01ee..3fbea9a 100644 --- a/src/Object/Queue.php +++ b/src/Object/Queue.php @@ -8,7 +8,6 @@ class Queue { public function __construct( private string $name = 'default', - private Exchange $exchange, private QueueDeclaration $declaration, private QueueBindings $bindings, ) {} @@ -18,11 +17,6 @@ class Queue return $this->name; } - public function exchange(): Exchange - { - return $this->exchange; - } - public function declaration(): QueueDeclaration { return $this->declaration; diff --git a/src/Service/Connector.php b/src/Service/Connector.php index 7e7dec8..9a267da 100644 --- a/src/Service/Connector.php +++ b/src/Service/Connector.php @@ -6,12 +6,14 @@ namespace Diffhead\PHP\LaravelRabbitMQ\Service; use Diffhead\PHP\LaravelRabbitMQ\Object\Connection; use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel; +use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange; use Diffhead\PHP\LaravelRabbitMQ\Object\Queue; +use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; class Connector { - public function connect(Connection $config, Queue $queue): ConnectionWithChannel + public function connect(Connection $config, Exchange $exchange, ?Queue $queue = null): ConnectionWithChannel { $connection = new AMQPStreamConnection( $config->host(), @@ -23,6 +25,35 @@ class Connector $channel = $connection->channel(); + $this->declareNotDefaultExchange($channel, $exchange); + + if ($queue) { + $this->declareQueue($channel, $queue); + $this->bindQueue($channel, $queue, $exchange); + } + + return new ConnectionWithChannel($connection, $channel); + } + + private function declareNotDefaultExchange(AMQPChannel $channel, Exchange $exchange): void + { + if (! $exchange->isDefault()) { + $channel->exchange_declare( + $exchange->name(), + $exchange->type(), + $exchange->declaration()->passive(), + $exchange->declaration()->durable(), + $exchange->declaration()->autoDelete(), + $exchange->declaration()->internal(), + $exchange->declaration()->nowait(), + $exchange->declaration()->arguments(), + $exchange->declaration()->ticket() + ); + } + } + + private function declareQueue(AMQPChannel $channel, Queue $queue): void + { $channel->queue_declare( $queue->name(), $queue->declaration()->passive(), @@ -33,30 +64,17 @@ class Connector $queue->declaration()->arguments(), $queue->declaration()->ticket() ); + } - if (! $queue->exchange()->isDefault()) { - $channel->exchange_declare( - $queue->exchange()->name(), - $queue->exchange()->type(), - $queue->exchange()->declaration()->passive(), - $queue->exchange()->declaration()->durable(), - $queue->exchange()->declaration()->autoDelete(), - $queue->exchange()->declaration()->internal(), - $queue->exchange()->declaration()->nowait(), - $queue->exchange()->declaration()->arguments(), - $queue->exchange()->declaration()->ticket() - ); - } - + private function bindQueue(AMQPChannel $channel, Queue $queue, Exchange $exchange): void + { $channel->queue_bind( $queue->name(), - $queue->exchange()->name(), + $exchange->name(), $queue->bindings()->routingKey(), $queue->bindings()->nowait(), $queue->bindings()->arguments(), $queue->bindings()->ticket() ); - - return new ConnectionWithChannel($connection, $channel); } }