Compare commits

..

3 Commits

Author SHA1 Message Date
ac735d8bf5 Version 1.2.0
- Event publishing without queue
- Broadcastable event can be non json serializable
2026-02-27 01:16:02 +04:00
6b044074ce Broadcastable event shouldnt be a JsonSerializable everytime 2026-02-27 01:14:21 +04:00
b81223a550 Publishing without queue ability 2026-02-27 00:40:58 +04:00
10 changed files with 376 additions and 374 deletions

View File

@@ -33,8 +33,9 @@ return [
### 1. Creating regular events for publishing to a RabbitMQ
Create an event that implements the `Broadcast` interface. `Broadcast` interface
extends `JsonSerializable` then your event should implements `jsonSerialize` method.
Create an event that implements the `Broadcast` interface.
Your event should implements `JsonSerializable` interface if
default serializer is used.
```php
namespace App\Events;
@@ -42,8 +43,9 @@ namespace App\Events;
use Diffhead\PHP\LaravelRabbitMQ\Event\Broadcast;
use Diffhead\PHP\LaravelRabbitMQ\Trait\BroadcastEvent;
use Illuminate\Foundation\Events\Dispatchable;
use JsonSerializable;
class UserCreated implements Broadcast
class UserCreated implements Broadcast, JsonSerializable
{
use Dispatchable, BroadcastEvent;
@@ -83,7 +85,7 @@ customizing concrete event or override this behaviour globally by changing
following env variables:
* `RABBITMQ_EVENT_CONNECTION=default`
* `RABBITMQ_EVENT_QUEUE=default`
* `RABBITMQ_EVENT_QUEUE=`
* `RABBITMQ_EVENT_EXCHANGE=amq.direct`
* `RABBITMQ_EVENT_EXCHANGE_TYPE=direct`
* `RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT=true`

View File

@@ -3,7 +3,7 @@
"description": "A laravel package for events emitting between services using RabbitMQ as message broker.",
"type": "library",
"license": "MIT",
"version": "1.1.0",
"version": "1.2.0",
"keywords": [
"laravel", "rabbitmq", "event", "emit", "microservice",
"pipeline", "data exchanging", "message", "broker", "php8",

588
composer.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -35,7 +35,7 @@ return [
*/
'defaults' => [
'connection' => env('RABBITMQ_EVENT_CONNECTION', 'default'),
'queue' => env('RABBITMQ_EVENT_QUEUE', 'default'),
'queue' => env('RABBITMQ_EVENT_QUEUE', ''),
'exchange' => env('RABBITMQ_EVENT_EXCHANGE', 'amq.direct'),
'exchange_type' => env('RABBITMQ_EVENT_EXCHANGE_TYPE', 'direct'),
'exchange_is_default' => (bool) env('RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT', true),

View File

@@ -46,27 +46,35 @@ class Consume extends Command
);
$queue = $this->getQueue($params);
$exchange = $this->getExchange($params);
$this->connection = $connector->connect($config, $queue);
$this->connection = $connector->connect($config, $exchange, $queue);
$tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer';
$consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void {
try {
$this->info(sprintf('Received message: %s', $message->getBody()));
$this->info('Received message');
if ($message->getRoutingKey()) {
$this->info(
sprintf('Routing key: %s', $message->getRoutingKey())
);
}
$this->info($message->getBody());
$mergedQueue = $this->getMergedQueue($queue, $message);
$handler->handle($mergedQueue, $message);
$message->ack();
$this->info('Message processed successfully.');
$this->info('Message processed');
} catch (Exception $e) {
$message->nack();
$this->error(
sprintf('Processing error: %s', $e->getMessage())
);
$this->error('Error');
$this->error($e->getMessage());
$logger->error($e->getMessage());
}
@@ -89,18 +97,20 @@ class Consume extends Command
private function getQueue(ConsumerParameters $params): Queue
{
$routingKey = $params->routingKey->value() ?? '';
return new Queue(
name: $params->queue->value() ?? 'default',
exchange: new Exchange(
name: $params->exchange->value() ?? 'amq.direct',
type: $params->exchangeType->value() ?? 'direct',
isDefault: $params->exchangeIsDefault->value(),
declaration: new ExchangeDeclaration()
),
declaration: new QueueDeclaration(),
bindings: new QueueBindings($routingKey),
bindings: new QueueBindings($params->routingKey->toString()),
);
}
private function getExchange(ConsumerParameters $params): Exchange
{
return new Exchange(
name: $params->exchange->toString() ?? 'amq.direct',
type: $params->exchangeType->toString() ?? 'direct',
isDefault: $params->exchangeIsDefault->toBool(),
declaration: new ExchangeDeclaration()
);
}
@@ -108,7 +118,6 @@ class Consume extends Command
{
return new Queue(
name: $queue->name(),
exchange: $queue->exchange(),
declaration: $queue->declaration(),
bindings: new QueueBindings(
routingKey: $message->getRoutingKey(),

View File

@@ -4,9 +4,7 @@ declare(strict_types=1);
namespace Diffhead\PHP\LaravelRabbitMQ\Event;
use JsonSerializable;
interface Broadcast extends JsonSerializable
interface Broadcast
{
public function getConnection(): string;
public function getQueue(): string;

View File

@@ -32,29 +32,34 @@ class PublishEvent implements ShouldQueue
{
$config = $this->configuration->getConnection($event->getConnection());
$queue = new Queue(
name: $event->getQueue(),
exchange: new Exchange(
name: $event->getExchange(),
type: $event->getExchangeType(),
isDefault: $event->getExchangeIsDefault(),
declaration: new ExchangeDeclaration()
),
declaration: new QueueDeclaration(),
bindings: new QueueBindings(
routingKey: $event->getRoutingKey()
),
$exchange = new Exchange(
name: $event->getExchange(),
type: $event->getExchangeType(),
isDefault: $event->getExchangeIsDefault(),
declaration: new ExchangeDeclaration()
);
$connection = $this->connector->connect($config, $queue);
$queue = null;
if ($event->getQueue()) {
$queue = new Queue(
name: $event->getQueue(),
declaration: new QueueDeclaration(),
bindings: new QueueBindings(
routingKey: $event->getRoutingKey()
),
);
}
$connection = $this->connector->connect($config, $exchange, $queue);
try {
$message = $this->serializer->serialize($event);
$connection->channel()->basic_publish(
msg: $message,
exchange: $queue->exchange()->name(),
routing_key: $queue->bindings()->routingKey(),
exchange: $exchange->name(),
routing_key: $event->getRoutingKey() ?? '',
);
} catch (Exception $e) {
throw $e;

View File

@@ -8,7 +8,6 @@ class Queue
{
public function __construct(
private string $name = 'default',
private Exchange $exchange,
private QueueDeclaration $declaration,
private QueueBindings $bindings,
) {}
@@ -18,11 +17,6 @@ class Queue
return $this->name;
}
public function exchange(): Exchange
{
return $this->exchange;
}
public function declaration(): QueueDeclaration
{
return $this->declaration;

View File

@@ -6,12 +6,14 @@ namespace Diffhead\PHP\LaravelRabbitMQ\Service;
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Connector
{
public function connect(Connection $config, Queue $queue): ConnectionWithChannel
public function connect(Connection $config, Exchange $exchange, ?Queue $queue = null): ConnectionWithChannel
{
$connection = new AMQPStreamConnection(
$config->host(),
@@ -23,6 +25,35 @@ class Connector
$channel = $connection->channel();
$this->declareNotDefaultExchange($channel, $exchange);
if ($queue) {
$this->declareQueue($channel, $queue);
$this->bindQueue($channel, $queue, $exchange);
}
return new ConnectionWithChannel($connection, $channel);
}
private function declareNotDefaultExchange(AMQPChannel $channel, Exchange $exchange): void
{
if (! $exchange->isDefault()) {
$channel->exchange_declare(
$exchange->name(),
$exchange->type(),
$exchange->declaration()->passive(),
$exchange->declaration()->durable(),
$exchange->declaration()->autoDelete(),
$exchange->declaration()->internal(),
$exchange->declaration()->nowait(),
$exchange->declaration()->arguments(),
$exchange->declaration()->ticket()
);
}
}
private function declareQueue(AMQPChannel $channel, Queue $queue): void
{
$channel->queue_declare(
$queue->name(),
$queue->declaration()->passive(),
@@ -33,30 +64,17 @@ class Connector
$queue->declaration()->arguments(),
$queue->declaration()->ticket()
);
}
if (! $queue->exchange()->isDefault()) {
$channel->exchange_declare(
$queue->exchange()->name(),
$queue->exchange()->type(),
$queue->exchange()->declaration()->passive(),
$queue->exchange()->declaration()->durable(),
$queue->exchange()->declaration()->autoDelete(),
$queue->exchange()->declaration()->internal(),
$queue->exchange()->declaration()->nowait(),
$queue->exchange()->declaration()->arguments(),
$queue->exchange()->declaration()->ticket()
);
}
private function bindQueue(AMQPChannel $channel, Queue $queue, Exchange $exchange): void
{
$channel->queue_bind(
$queue->name(),
$queue->exchange()->name(),
$exchange->name(),
$queue->bindings()->routingKey(),
$queue->bindings()->nowait(),
$queue->bindings()->arguments(),
$queue->bindings()->ticket()
);
return new ConnectionWithChannel($connection, $channel);
}
}

View File

@@ -11,16 +11,16 @@ use PhpAmqpLib\Message\AMQPMessage;
class Serializer implements SerializerInterface
{
public function serialize(object $data): AMQPMessage
public function serialize(object $event): AMQPMessage
{
if ($data instanceof JsonSerializable) {
if ($event instanceof JsonSerializable) {
return new AMQPMessage(
json_encode($data->jsonSerialize())
json_encode($event->jsonSerialize())
);
}
throw new InvalidArgumentException(
'Data should be an instance of BroadcastEvent'
'Event should be an instance of JsonSerializable'
);
}
}