Compare commits

...

3 Commits

Author SHA1 Message Date
8320fb4a84 Version 1.2.0
- Event publishing without queue
- Broadcastable event can be non json serializable
2026-02-27 00:44:57 +04:00
c481c961fb Broadcastable event shouldnt be a JsonSerializable everytime 2026-02-27 00:43:06 +04:00
b81223a550 Publishing without queue ability 2026-02-27 00:40:58 +04:00
10 changed files with 332 additions and 323 deletions

View File

@@ -83,7 +83,7 @@ customizing concrete event or override this behaviour globally by changing
following env variables: following env variables:
* `RABBITMQ_EVENT_CONNECTION=default` * `RABBITMQ_EVENT_CONNECTION=default`
* `RABBITMQ_EVENT_QUEUE=default` * `RABBITMQ_EVENT_QUEUE=`
* `RABBITMQ_EVENT_EXCHANGE=amq.direct` * `RABBITMQ_EVENT_EXCHANGE=amq.direct`
* `RABBITMQ_EVENT_EXCHANGE_TYPE=direct` * `RABBITMQ_EVENT_EXCHANGE_TYPE=direct`
* `RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT=true` * `RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT=true`

View File

@@ -3,7 +3,7 @@
"description": "A laravel package for events emitting between services using RabbitMQ as message broker.", "description": "A laravel package for events emitting between services using RabbitMQ as message broker.",
"type": "library", "type": "library",
"license": "MIT", "license": "MIT",
"version": "1.1.0", "version": "1.2.0",
"keywords": [ "keywords": [
"laravel", "rabbitmq", "event", "emit", "microservice", "laravel", "rabbitmq", "event", "emit", "microservice",
"pipeline", "data exchanging", "message", "broker", "php8", "pipeline", "data exchanging", "message", "broker", "php8",

501
composer.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -35,7 +35,7 @@ return [
*/ */
'defaults' => [ 'defaults' => [
'connection' => env('RABBITMQ_EVENT_CONNECTION', 'default'), 'connection' => env('RABBITMQ_EVENT_CONNECTION', 'default'),
'queue' => env('RABBITMQ_EVENT_QUEUE', 'default'), 'queue' => env('RABBITMQ_EVENT_QUEUE', ''),
'exchange' => env('RABBITMQ_EVENT_EXCHANGE', 'amq.direct'), 'exchange' => env('RABBITMQ_EVENT_EXCHANGE', 'amq.direct'),
'exchange_type' => env('RABBITMQ_EVENT_EXCHANGE_TYPE', 'direct'), 'exchange_type' => env('RABBITMQ_EVENT_EXCHANGE_TYPE', 'direct'),
'exchange_is_default' => (bool) env('RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT', true), 'exchange_is_default' => (bool) env('RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT', true),

View File

@@ -46,27 +46,35 @@ class Consume extends Command
); );
$queue = $this->getQueue($params); $queue = $this->getQueue($params);
$exchange = $this->getExchange($params);
$this->connection = $connector->connect($config, $queue); $this->connection = $connector->connect($config, $exchange, $queue);
$tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer'; $tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer';
$consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void { $consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void {
try { try {
$this->info(sprintf('Received message: %s', $message->getBody())); $this->info('Received message');
if ($message->getRoutingKey()) {
$this->info(
sprintf('Routing key: %s', $message->getRoutingKey())
);
}
$this->info($message->getBody());
$mergedQueue = $this->getMergedQueue($queue, $message); $mergedQueue = $this->getMergedQueue($queue, $message);
$handler->handle($mergedQueue, $message); $handler->handle($mergedQueue, $message);
$message->ack(); $message->ack();
$this->info('Message processed successfully.'); $this->info('Message processed');
} catch (Exception $e) { } catch (Exception $e) {
$message->nack(); $message->nack();
$this->error( $this->error('Error');
sprintf('Processing error: %s', $e->getMessage()) $this->error($e->getMessage());
);
$logger->error($e->getMessage()); $logger->error($e->getMessage());
} }
@@ -89,18 +97,20 @@ class Consume extends Command
private function getQueue(ConsumerParameters $params): Queue private function getQueue(ConsumerParameters $params): Queue
{ {
$routingKey = $params->routingKey->value() ?? '';
return new Queue( return new Queue(
name: $params->queue->value() ?? 'default', name: $params->queue->value() ?? 'default',
exchange: new Exchange(
name: $params->exchange->value() ?? 'amq.direct',
type: $params->exchangeType->value() ?? 'direct',
isDefault: $params->exchangeIsDefault->value(),
declaration: new ExchangeDeclaration()
),
declaration: new QueueDeclaration(), declaration: new QueueDeclaration(),
bindings: new QueueBindings($routingKey), bindings: new QueueBindings($params->routingKey->toString()),
);
}
private function getExchange(ConsumerParameters $params): Exchange
{
return new Exchange(
name: $params->exchange->toString() ?? 'amq.direct',
type: $params->exchangeType->toString() ?? 'direct',
isDefault: $params->exchangeIsDefault->toBool(),
declaration: new ExchangeDeclaration()
); );
} }
@@ -108,7 +118,6 @@ class Consume extends Command
{ {
return new Queue( return new Queue(
name: $queue->name(), name: $queue->name(),
exchange: $queue->exchange(),
declaration: $queue->declaration(), declaration: $queue->declaration(),
bindings: new QueueBindings( bindings: new QueueBindings(
routingKey: $message->getRoutingKey(), routingKey: $message->getRoutingKey(),

View File

@@ -4,9 +4,7 @@ declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Event; namespace Diffhead\PHP\LaravelRabbitMQ\Event;
use JsonSerializable; interface Broadcast
interface Broadcast extends JsonSerializable
{ {
public function getConnection(): string; public function getConnection(): string;
public function getQueue(): string; public function getQueue(): string;

View File

@@ -32,29 +32,34 @@ class PublishEvent implements ShouldQueue
{ {
$config = $this->configuration->getConnection($event->getConnection()); $config = $this->configuration->getConnection($event->getConnection());
$queue = new Queue( $exchange = new Exchange(
name: $event->getQueue(), name: $event->getExchange(),
exchange: new Exchange( type: $event->getExchangeType(),
name: $event->getExchange(), isDefault: $event->getExchangeIsDefault(),
type: $event->getExchangeType(), declaration: new ExchangeDeclaration()
isDefault: $event->getExchangeIsDefault(),
declaration: new ExchangeDeclaration()
),
declaration: new QueueDeclaration(),
bindings: new QueueBindings(
routingKey: $event->getRoutingKey()
),
); );
$connection = $this->connector->connect($config, $queue); $queue = null;
if ($event->getQueue()) {
$queue = new Queue(
name: $event->getQueue(),
declaration: new QueueDeclaration(),
bindings: new QueueBindings(
routingKey: $event->getRoutingKey()
),
);
}
$connection = $this->connector->connect($config, $exchange, $queue);
try { try {
$message = $this->serializer->serialize($event); $message = $this->serializer->serialize($event);
$connection->channel()->basic_publish( $connection->channel()->basic_publish(
msg: $message, msg: $message,
exchange: $queue->exchange()->name(), exchange: $exchange->name(),
routing_key: $queue->bindings()->routingKey(), routing_key: $event->getRoutingKey() ?? '',
); );
} catch (Exception $e) { } catch (Exception $e) {
throw $e; throw $e;

View File

@@ -8,7 +8,6 @@ class Queue
{ {
public function __construct( public function __construct(
private string $name = 'default', private string $name = 'default',
private Exchange $exchange,
private QueueDeclaration $declaration, private QueueDeclaration $declaration,
private QueueBindings $bindings, private QueueBindings $bindings,
) {} ) {}
@@ -18,11 +17,6 @@ class Queue
return $this->name; return $this->name;
} }
public function exchange(): Exchange
{
return $this->exchange;
}
public function declaration(): QueueDeclaration public function declaration(): QueueDeclaration
{ {
return $this->declaration; return $this->declaration;

View File

@@ -6,12 +6,14 @@ namespace Diffhead\PHP\LaravelRabbitMQ\Service;
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection; use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel; use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue; use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Connection\AMQPStreamConnection;
class Connector class Connector
{ {
public function connect(Connection $config, Queue $queue): ConnectionWithChannel public function connect(Connection $config, Exchange $exchange, ?Queue $queue = null): ConnectionWithChannel
{ {
$connection = new AMQPStreamConnection( $connection = new AMQPStreamConnection(
$config->host(), $config->host(),
@@ -23,6 +25,35 @@ class Connector
$channel = $connection->channel(); $channel = $connection->channel();
$this->declareNotDefaultExchange($channel, $exchange);
if ($queue) {
$this->declareQueue($channel, $queue);
$this->bindQueue($channel, $queue, $exchange);
}
return new ConnectionWithChannel($connection, $channel);
}
private function declareNotDefaultExchange(AMQPChannel $channel, Exchange $exchange): void
{
if (! $exchange->isDefault()) {
$channel->exchange_declare(
$exchange->name(),
$exchange->type(),
$exchange->declaration()->passive(),
$exchange->declaration()->durable(),
$exchange->declaration()->autoDelete(),
$exchange->declaration()->internal(),
$exchange->declaration()->nowait(),
$exchange->declaration()->arguments(),
$exchange->declaration()->ticket()
);
}
}
private function declareQueue(AMQPChannel $channel, Queue $queue): void
{
$channel->queue_declare( $channel->queue_declare(
$queue->name(), $queue->name(),
$queue->declaration()->passive(), $queue->declaration()->passive(),
@@ -33,30 +64,17 @@ class Connector
$queue->declaration()->arguments(), $queue->declaration()->arguments(),
$queue->declaration()->ticket() $queue->declaration()->ticket()
); );
}
if (! $queue->exchange()->isDefault()) { private function bindQueue(AMQPChannel $channel, Queue $queue, Exchange $exchange): void
$channel->exchange_declare( {
$queue->exchange()->name(),
$queue->exchange()->type(),
$queue->exchange()->declaration()->passive(),
$queue->exchange()->declaration()->durable(),
$queue->exchange()->declaration()->autoDelete(),
$queue->exchange()->declaration()->internal(),
$queue->exchange()->declaration()->nowait(),
$queue->exchange()->declaration()->arguments(),
$queue->exchange()->declaration()->ticket()
);
}
$channel->queue_bind( $channel->queue_bind(
$queue->name(), $queue->name(),
$queue->exchange()->name(), $exchange->name(),
$queue->bindings()->routingKey(), $queue->bindings()->routingKey(),
$queue->bindings()->nowait(), $queue->bindings()->nowait(),
$queue->bindings()->arguments(), $queue->bindings()->arguments(),
$queue->bindings()->ticket() $queue->bindings()->ticket()
); );
return new ConnectionWithChannel($connection, $channel);
} }
} }

View File

@@ -11,16 +11,16 @@ use PhpAmqpLib\Message\AMQPMessage;
class Serializer implements SerializerInterface class Serializer implements SerializerInterface
{ {
public function serialize(object $data): AMQPMessage public function serialize(object $event): AMQPMessage
{ {
if ($data instanceof JsonSerializable) { if ($event instanceof JsonSerializable) {
return new AMQPMessage( return new AMQPMessage(
json_encode($data->jsonSerialize()) json_encode($event->jsonSerialize())
); );
} }
throw new InvalidArgumentException( throw new InvalidArgumentException(
'Data should be an instance of BroadcastEvent' 'Event should be an instance of JsonSerializable'
); );
} }
} }