Version 1.0.0
This commit is contained in:
118
src/Command/Consume.php
Normal file
118
src/Command/Consume.php
Normal file
@@ -0,0 +1,118 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Command;
|
||||
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Dto\ConsumerParameters;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\ExchangeDeclaration;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\QueueBindings;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\QueueDeclaration;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Service\Configuration;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Service\Connector;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Service\Message;
|
||||
use Exception;
|
||||
use Illuminate\Console\Command;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
class Consume extends Command
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $signature = 'rabbitmq:consume {--connection=} {--queue=} {--exchange=} {--exchange-type=} {--exchange-is-default} {--routing-key=} {--tag=}';
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $description = 'Consume messages from RabbitMQ';
|
||||
|
||||
private ?Connection $connection = null;
|
||||
|
||||
public function handle(
|
||||
Configuration $configuration,
|
||||
Connector $connector,
|
||||
Message $handler,
|
||||
LoggerInterface $logger
|
||||
): void {
|
||||
$params = ConsumerParameters::fromArray($this->options());
|
||||
$config = $configuration->get($params->connection->value() ?? 'default');
|
||||
|
||||
$queue = $this->getQueue($params);
|
||||
|
||||
$this->connection = $connector->connect($config, $queue);
|
||||
|
||||
$tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer';
|
||||
|
||||
$consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void {
|
||||
try {
|
||||
$this->info(sprintf('Received message: %s', $message->getBody()));
|
||||
|
||||
$mergedQueue = $this->getMergedQueue($queue, $message);
|
||||
|
||||
$handler->handle($mergedQueue, $message);
|
||||
$message->ack();
|
||||
|
||||
$this->info('Message processed successfully.');
|
||||
} catch (Exception $e) {
|
||||
$message->nack();
|
||||
|
||||
$this->error(
|
||||
sprintf('Processing error: %s', $e->getMessage())
|
||||
);
|
||||
|
||||
$logger->error($e->getMessage());
|
||||
}
|
||||
};
|
||||
|
||||
$this->connection->channel()->basic_consume(
|
||||
queue: $queue->name(),
|
||||
consumer_tag: $tag,
|
||||
callback: $consumer
|
||||
);
|
||||
|
||||
$this->connection->channel()->consume();
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->connection?->channel()->close();
|
||||
$this->connection?->connection()->close();
|
||||
}
|
||||
|
||||
private function getQueue(ConsumerParameters $params): Queue
|
||||
{
|
||||
$routingKey = $params->routingKey->value() ?? '';
|
||||
|
||||
return new Queue(
|
||||
name: $params->queue->value() ?? 'default',
|
||||
exchange: new Exchange(
|
||||
name: $params->exchange->value() ?? 'amq.direct',
|
||||
type: $params->exchangeType->value() ?? 'direct',
|
||||
isDefault: $params->exchangeIsDefault->value(),
|
||||
declaration: new ExchangeDeclaration()
|
||||
),
|
||||
declaration: new QueueDeclaration(),
|
||||
bindings: new QueueBindings($routingKey),
|
||||
);
|
||||
}
|
||||
|
||||
private function getMergedQueue(Queue $queue, AMQPMessage $message): Queue
|
||||
{
|
||||
return new Queue(
|
||||
name: $queue->name(),
|
||||
exchange: $queue->exchange(),
|
||||
declaration: $queue->declaration(),
|
||||
bindings: new QueueBindings(
|
||||
routingKey: $message->getRoutingKey(),
|
||||
nowait: $queue->bindings()->nowait(),
|
||||
arguments: $queue->bindings()->arguments(),
|
||||
ticket: $queue->bindings()->ticket()
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user