Added PublishEvent listener connection and queue configuration feature
This commit is contained in:
@@ -5,7 +5,7 @@ declare(strict_types=1);
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Command;
|
||||
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Dto\ConsumerParameters;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\ExchangeDeclaration;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
|
||||
@@ -31,7 +31,7 @@ class Consume extends Command
|
||||
*/
|
||||
protected $description = 'Consume messages from RabbitMQ';
|
||||
|
||||
private ?Connection $connection = null;
|
||||
private ?ConnectionWithChannel $connection = null;
|
||||
|
||||
public function handle(
|
||||
Configuration $configuration,
|
||||
@@ -40,7 +40,10 @@ class Consume extends Command
|
||||
LoggerInterface $logger
|
||||
): void {
|
||||
$params = ConsumerParameters::fromArray($this->options());
|
||||
$config = $configuration->get($params->connection->value() ?? 'default');
|
||||
|
||||
$config = $configuration->getConnection(
|
||||
$params->connection->value() ?? 'default'
|
||||
);
|
||||
|
||||
$queue = $this->getQueue($params);
|
||||
|
||||
|
||||
@@ -13,11 +13,15 @@ use Diffhead\PHP\LaravelRabbitMQ\Object\QueueDeclaration;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Service\Configuration;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Service\Connector;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Event\Broadcast;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Publishing;
|
||||
use Exception;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Support\Facades\App;
|
||||
|
||||
class PublishEvent implements ShouldQueue
|
||||
{
|
||||
private Publishing $publishing;
|
||||
|
||||
public function __construct(
|
||||
private Configuration $configuration,
|
||||
private Connector $connector,
|
||||
@@ -26,7 +30,7 @@ class PublishEvent implements ShouldQueue
|
||||
|
||||
public function handle(Broadcast $event): void
|
||||
{
|
||||
$config = $this->configuration->get($event->getConnection());
|
||||
$config = $this->configuration->getConnection($event->getConnection());
|
||||
|
||||
$queue = new Queue(
|
||||
name: $event->getQueue(),
|
||||
@@ -59,4 +63,24 @@ class PublishEvent implements ShouldQueue
|
||||
$connection->connection()->close();
|
||||
}
|
||||
}
|
||||
|
||||
public function viaConnection(): string
|
||||
{
|
||||
return $this->getOrCreatePublishing()->connection();
|
||||
}
|
||||
|
||||
public function viaQueue(): string
|
||||
{
|
||||
return $this->getOrCreatePublishing()->queue();
|
||||
}
|
||||
|
||||
private function getOrCreatePublishing(): Publishing
|
||||
{
|
||||
if (! isset($this->publishing)) {
|
||||
$configuration = App::make(Configuration::class);
|
||||
$this->publishing = $configuration->getPublishing();
|
||||
}
|
||||
|
||||
return $this->publishing;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
|
||||
|
||||
class Configuration
|
||||
{
|
||||
public function __construct(
|
||||
private string $host,
|
||||
private int $port,
|
||||
private string $user,
|
||||
private string $password,
|
||||
private string $vhost,
|
||||
) {}
|
||||
|
||||
public function host(): string
|
||||
{
|
||||
return $this->host;
|
||||
}
|
||||
|
||||
public function port(): int
|
||||
{
|
||||
return $this->port;
|
||||
}
|
||||
|
||||
public function user(): string
|
||||
{
|
||||
return $this->user;
|
||||
}
|
||||
|
||||
public function password(): string
|
||||
{
|
||||
return $this->password;
|
||||
}
|
||||
|
||||
public function vhost(): string
|
||||
{
|
||||
return $this->vhost;
|
||||
}
|
||||
}
|
||||
@@ -4,23 +4,38 @@ declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
|
||||
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
|
||||
class Connection
|
||||
{
|
||||
public function __construct(
|
||||
private AMQPStreamConnection $connection,
|
||||
private AMQPChannel $channel
|
||||
private string $host,
|
||||
private int $port,
|
||||
private string $user,
|
||||
private string $password,
|
||||
private string $vhost,
|
||||
) {}
|
||||
|
||||
public function connection(): AMQPStreamConnection
|
||||
public function host(): string
|
||||
{
|
||||
return $this->connection;
|
||||
return $this->host;
|
||||
}
|
||||
|
||||
public function channel(): AMQPChannel
|
||||
public function port(): int
|
||||
{
|
||||
return $this->channel;
|
||||
return $this->port;
|
||||
}
|
||||
|
||||
public function user(): string
|
||||
{
|
||||
return $this->user;
|
||||
}
|
||||
|
||||
public function password(): string
|
||||
{
|
||||
return $this->password;
|
||||
}
|
||||
|
||||
public function vhost(): string
|
||||
{
|
||||
return $this->vhost;
|
||||
}
|
||||
}
|
||||
|
||||
26
src/Object/ConnectionWithChannel.php
Normal file
26
src/Object/ConnectionWithChannel.php
Normal file
@@ -0,0 +1,26 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
|
||||
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
|
||||
class ConnectionWithChannel
|
||||
{
|
||||
public function __construct(
|
||||
private AMQPStreamConnection $connection,
|
||||
private AMQPChannel $channel
|
||||
) {}
|
||||
|
||||
public function connection(): AMQPStreamConnection
|
||||
{
|
||||
return $this->connection;
|
||||
}
|
||||
|
||||
public function channel(): AMQPChannel
|
||||
{
|
||||
return $this->channel;
|
||||
}
|
||||
}
|
||||
23
src/Object/Publishing.php
Normal file
23
src/Object/Publishing.php
Normal file
@@ -0,0 +1,23 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Object;
|
||||
|
||||
class Publishing
|
||||
{
|
||||
public function __construct(
|
||||
private string $connection,
|
||||
private string $queue,
|
||||
) {}
|
||||
|
||||
public function connection(): string
|
||||
{
|
||||
return $this->connection;
|
||||
}
|
||||
|
||||
public function queue(): string
|
||||
{
|
||||
return $this->queue;
|
||||
}
|
||||
}
|
||||
@@ -4,12 +4,13 @@ declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Service;
|
||||
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Configuration as ConfigurationObject;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Publishing;
|
||||
use RuntimeException;
|
||||
|
||||
class Configuration
|
||||
{
|
||||
public function get(string $connection = 'default'): ConfigurationObject
|
||||
public function getConnection(string $connection = 'default'): Connection
|
||||
{
|
||||
$config = config(sprintf('rabbitmq.connections.%s', $connection));
|
||||
|
||||
@@ -19,7 +20,7 @@ class Configuration
|
||||
);
|
||||
}
|
||||
|
||||
return new ConfigurationObject(
|
||||
return new Connection(
|
||||
$config['host'],
|
||||
(int) $config['port'],
|
||||
$config['user'],
|
||||
@@ -27,4 +28,12 @@ class Configuration
|
||||
$config['vhost'],
|
||||
);
|
||||
}
|
||||
|
||||
public function getPublishing(): Publishing
|
||||
{
|
||||
return new Publishing(
|
||||
config('rabbitmq.event.publishing.connection', 'sync'),
|
||||
config('rabbitmq.event.publishing.queue', 'default'),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,14 +4,14 @@ declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Service;
|
||||
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Configuration;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
|
||||
class Connector
|
||||
{
|
||||
public function connect(Configuration $config, Queue $queue): Connection
|
||||
public function connect(Connection $config, Queue $queue): ConnectionWithChannel
|
||||
{
|
||||
$connection = new AMQPStreamConnection(
|
||||
$config->host(),
|
||||
@@ -57,6 +57,6 @@ class Connector
|
||||
$queue->bindings()->ticket()
|
||||
);
|
||||
|
||||
return new Connection($connection, $channel);
|
||||
return new ConnectionWithChannel($connection, $channel);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user