122 lines
3.8 KiB
PHP
122 lines
3.8 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace Diffhead\PHP\LaravelRabbitMQ\Command;
|
|
|
|
use Diffhead\PHP\LaravelRabbitMQ\Dto\ConsumerParameters;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Object\ExchangeDeclaration;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Object\QueueBindings;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Object\QueueDeclaration;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Service\Configuration;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Service\Connector;
|
|
use Diffhead\PHP\LaravelRabbitMQ\Service\Message;
|
|
use Exception;
|
|
use Illuminate\Console\Command;
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
use Psr\Log\LoggerInterface;
|
|
|
|
class Consume extends Command
|
|
{
|
|
/**
|
|
* @var string
|
|
*/
|
|
protected $signature = 'rabbitmq:consume {--connection=} {--queue=} {--exchange=} {--exchange-type=} {--exchange-is-default} {--routing-key=} {--tag=}';
|
|
|
|
/**
|
|
* @var string
|
|
*/
|
|
protected $description = 'Consume messages from RabbitMQ';
|
|
|
|
private ?ConnectionWithChannel $connection = null;
|
|
|
|
public function handle(
|
|
Configuration $configuration,
|
|
Connector $connector,
|
|
Message $handler,
|
|
LoggerInterface $logger
|
|
): void {
|
|
$params = ConsumerParameters::fromArray($this->options());
|
|
|
|
$config = $configuration->getConnection(
|
|
$params->connection->value() ?? 'default'
|
|
);
|
|
|
|
$queue = $this->getQueue($params);
|
|
|
|
$this->connection = $connector->connect($config, $queue);
|
|
|
|
$tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer';
|
|
|
|
$consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void {
|
|
try {
|
|
$this->info(sprintf('Received message: %s', $message->getBody()));
|
|
|
|
$mergedQueue = $this->getMergedQueue($queue, $message);
|
|
|
|
$handler->handle($mergedQueue, $message);
|
|
$message->ack();
|
|
|
|
$this->info('Message processed successfully.');
|
|
} catch (Exception $e) {
|
|
$message->nack();
|
|
|
|
$this->error(
|
|
sprintf('Processing error: %s', $e->getMessage())
|
|
);
|
|
|
|
$logger->error($e->getMessage());
|
|
}
|
|
};
|
|
|
|
$this->connection->channel()->basic_consume(
|
|
queue: $queue->name(),
|
|
consumer_tag: $tag,
|
|
callback: $consumer
|
|
);
|
|
|
|
$this->connection->channel()->consume();
|
|
}
|
|
|
|
public function __destruct()
|
|
{
|
|
$this->connection?->channel()->close();
|
|
$this->connection?->connection()->close();
|
|
}
|
|
|
|
private function getQueue(ConsumerParameters $params): Queue
|
|
{
|
|
$routingKey = $params->routingKey->value() ?? '';
|
|
|
|
return new Queue(
|
|
name: $params->queue->value() ?? 'default',
|
|
exchange: new Exchange(
|
|
name: $params->exchange->value() ?? 'amq.direct',
|
|
type: $params->exchangeType->value() ?? 'direct',
|
|
isDefault: $params->exchangeIsDefault->value(),
|
|
declaration: new ExchangeDeclaration()
|
|
),
|
|
declaration: new QueueDeclaration(),
|
|
bindings: new QueueBindings($routingKey),
|
|
);
|
|
}
|
|
|
|
private function getMergedQueue(Queue $queue, AMQPMessage $message): Queue
|
|
{
|
|
return new Queue(
|
|
name: $queue->name(),
|
|
exchange: $queue->exchange(),
|
|
declaration: $queue->declaration(),
|
|
bindings: new QueueBindings(
|
|
routingKey: $message->getRoutingKey(),
|
|
nowait: $queue->bindings()->nowait(),
|
|
arguments: $queue->bindings()->arguments(),
|
|
ticket: $queue->bindings()->ticket()
|
|
),
|
|
);
|
|
}
|
|
}
|