Added PublishEvent listener connection and queue configuration feature

This commit is contained in:
2025-12-31 16:39:48 +04:00
parent 7ae585ae29
commit 181d3de9f7
11 changed files with 347 additions and 319 deletions

View File

@@ -5,7 +5,7 @@ declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Command;
use Diffhead\PHP\LaravelRabbitMQ\Dto\ConsumerParameters;
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
use Diffhead\PHP\LaravelRabbitMQ\Object\ExchangeDeclaration;
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
@@ -31,7 +31,7 @@ class Consume extends Command
*/
protected $description = 'Consume messages from RabbitMQ';
private ?Connection $connection = null;
private ?ConnectionWithChannel $connection = null;
public function handle(
Configuration $configuration,
@@ -40,7 +40,10 @@ class Consume extends Command
LoggerInterface $logger
): void {
$params = ConsumerParameters::fromArray($this->options());
$config = $configuration->get($params->connection->value() ?? 'default');
$config = $configuration->getConnection(
$params->connection->value() ?? 'default'
);
$queue = $this->getQueue($params);

View File

@@ -13,11 +13,15 @@ use Diffhead\PHP\LaravelRabbitMQ\Object\QueueDeclaration;
use Diffhead\PHP\LaravelRabbitMQ\Service\Configuration;
use Diffhead\PHP\LaravelRabbitMQ\Service\Connector;
use Diffhead\PHP\LaravelRabbitMQ\Event\Broadcast;
use Diffhead\PHP\LaravelRabbitMQ\Object\Publishing;
use Exception;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\App;
class PublishEvent implements ShouldQueue
{
private Publishing $publishing;
public function __construct(
private Configuration $configuration,
private Connector $connector,
@@ -26,7 +30,7 @@ class PublishEvent implements ShouldQueue
public function handle(Broadcast $event): void
{
$config = $this->configuration->get($event->getConnection());
$config = $this->configuration->getConnection($event->getConnection());
$queue = new Queue(
name: $event->getQueue(),
@@ -59,4 +63,24 @@ class PublishEvent implements ShouldQueue
$connection->connection()->close();
}
}
public function viaConnection(): string
{
return $this->getOrCreatePublishing()->connection();
}
public function viaQueue(): string
{
return $this->getOrCreatePublishing()->queue();
}
private function getOrCreatePublishing(): Publishing
{
if (! isset($this->publishing)) {
$configuration = App::make(Configuration::class);
$this->publishing = $configuration->getPublishing();
}
return $this->publishing;
}
}

View File

@@ -1,41 +0,0 @@
<?php
declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
class Configuration
{
public function __construct(
private string $host,
private int $port,
private string $user,
private string $password,
private string $vhost,
) {}
public function host(): string
{
return $this->host;
}
public function port(): int
{
return $this->port;
}
public function user(): string
{
return $this->user;
}
public function password(): string
{
return $this->password;
}
public function vhost(): string
{
return $this->vhost;
}
}

View File

@@ -4,23 +4,38 @@ declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Connection
{
public function __construct(
private AMQPStreamConnection $connection,
private AMQPChannel $channel
private string $host,
private int $port,
private string $user,
private string $password,
private string $vhost,
) {}
public function connection(): AMQPStreamConnection
public function host(): string
{
return $this->connection;
return $this->host;
}
public function channel(): AMQPChannel
public function port(): int
{
return $this->channel;
return $this->port;
}
public function user(): string
{
return $this->user;
}
public function password(): string
{
return $this->password;
}
public function vhost(): string
{
return $this->vhost;
}
}

View File

@@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConnectionWithChannel
{
public function __construct(
private AMQPStreamConnection $connection,
private AMQPChannel $channel
) {}
public function connection(): AMQPStreamConnection
{
return $this->connection;
}
public function channel(): AMQPChannel
{
return $this->channel;
}
}

23
src/Object/Publishing.php Normal file
View File

@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
class Publishing
{
public function __construct(
private string $connection,
private string $queue,
) {}
public function connection(): string
{
return $this->connection;
}
public function queue(): string
{
return $this->queue;
}
}

View File

@@ -4,12 +4,13 @@ declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Service;
use Diffhead\PHP\LaravelRabbitMQ\Object\Configuration as ConfigurationObject;
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
use Diffhead\PHP\LaravelRabbitMQ\Object\Publishing;
use RuntimeException;
class Configuration
{
public function get(string $connection = 'default'): ConfigurationObject
public function getConnection(string $connection = 'default'): Connection
{
$config = config(sprintf('rabbitmq.connections.%s', $connection));
@@ -19,7 +20,7 @@ class Configuration
);
}
return new ConfigurationObject(
return new Connection(
$config['host'],
(int) $config['port'],
$config['user'],
@@ -27,4 +28,12 @@ class Configuration
$config['vhost'],
);
}
public function getPublishing(): Publishing
{
return new Publishing(
config('rabbitmq.event.publishing.connection', 'sync'),
config('rabbitmq.event.publishing.queue', 'default'),
);
}
}

View File

@@ -4,14 +4,14 @@ declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Service;
use Diffhead\PHP\LaravelRabbitMQ\Object\Configuration;
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Connector
{
public function connect(Configuration $config, Queue $queue): Connection
public function connect(Connection $config, Queue $queue): ConnectionWithChannel
{
$connection = new AMQPStreamConnection(
$config->host(),
@@ -57,6 +57,6 @@ class Connector
$queue->bindings()->ticket()
);
return new Connection($connection, $channel);
return new ConnectionWithChannel($connection, $channel);
}
}