Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ac735d8bf5 | |||
| 6b044074ce | |||
| b81223a550 |
10
README.md
10
README.md
@@ -33,8 +33,9 @@ return [
|
|||||||
|
|
||||||
### 1. Creating regular events for publishing to a RabbitMQ
|
### 1. Creating regular events for publishing to a RabbitMQ
|
||||||
|
|
||||||
Create an event that implements the `Broadcast` interface. `Broadcast` interface
|
Create an event that implements the `Broadcast` interface.
|
||||||
extends `JsonSerializable` then your event should implements `jsonSerialize` method.
|
Your event should implements `JsonSerializable` interface if
|
||||||
|
default serializer is used.
|
||||||
|
|
||||||
```php
|
```php
|
||||||
namespace App\Events;
|
namespace App\Events;
|
||||||
@@ -42,8 +43,9 @@ namespace App\Events;
|
|||||||
use Diffhead\PHP\LaravelRabbitMQ\Event\Broadcast;
|
use Diffhead\PHP\LaravelRabbitMQ\Event\Broadcast;
|
||||||
use Diffhead\PHP\LaravelRabbitMQ\Trait\BroadcastEvent;
|
use Diffhead\PHP\LaravelRabbitMQ\Trait\BroadcastEvent;
|
||||||
use Illuminate\Foundation\Events\Dispatchable;
|
use Illuminate\Foundation\Events\Dispatchable;
|
||||||
|
use JsonSerializable;
|
||||||
|
|
||||||
class UserCreated implements Broadcast
|
class UserCreated implements Broadcast, JsonSerializable
|
||||||
{
|
{
|
||||||
use Dispatchable, BroadcastEvent;
|
use Dispatchable, BroadcastEvent;
|
||||||
|
|
||||||
@@ -83,7 +85,7 @@ customizing concrete event or override this behaviour globally by changing
|
|||||||
following env variables:
|
following env variables:
|
||||||
|
|
||||||
* `RABBITMQ_EVENT_CONNECTION=default`
|
* `RABBITMQ_EVENT_CONNECTION=default`
|
||||||
* `RABBITMQ_EVENT_QUEUE=default`
|
* `RABBITMQ_EVENT_QUEUE=`
|
||||||
* `RABBITMQ_EVENT_EXCHANGE=amq.direct`
|
* `RABBITMQ_EVENT_EXCHANGE=amq.direct`
|
||||||
* `RABBITMQ_EVENT_EXCHANGE_TYPE=direct`
|
* `RABBITMQ_EVENT_EXCHANGE_TYPE=direct`
|
||||||
* `RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT=true`
|
* `RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT=true`
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
"description": "A laravel package for events emitting between services using RabbitMQ as message broker.",
|
"description": "A laravel package for events emitting between services using RabbitMQ as message broker.",
|
||||||
"type": "library",
|
"type": "library",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"version": "1.1.0",
|
"version": "1.2.0",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"laravel", "rabbitmq", "event", "emit", "microservice",
|
"laravel", "rabbitmq", "event", "emit", "microservice",
|
||||||
"pipeline", "data exchanging", "message", "broker", "php8",
|
"pipeline", "data exchanging", "message", "broker", "php8",
|
||||||
|
|||||||
588
composer.lock
generated
588
composer.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -35,7 +35,7 @@ return [
|
|||||||
*/
|
*/
|
||||||
'defaults' => [
|
'defaults' => [
|
||||||
'connection' => env('RABBITMQ_EVENT_CONNECTION', 'default'),
|
'connection' => env('RABBITMQ_EVENT_CONNECTION', 'default'),
|
||||||
'queue' => env('RABBITMQ_EVENT_QUEUE', 'default'),
|
'queue' => env('RABBITMQ_EVENT_QUEUE', ''),
|
||||||
'exchange' => env('RABBITMQ_EVENT_EXCHANGE', 'amq.direct'),
|
'exchange' => env('RABBITMQ_EVENT_EXCHANGE', 'amq.direct'),
|
||||||
'exchange_type' => env('RABBITMQ_EVENT_EXCHANGE_TYPE', 'direct'),
|
'exchange_type' => env('RABBITMQ_EVENT_EXCHANGE_TYPE', 'direct'),
|
||||||
'exchange_is_default' => (bool) env('RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT', true),
|
'exchange_is_default' => (bool) env('RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT', true),
|
||||||
|
|||||||
@@ -46,27 +46,35 @@ class Consume extends Command
|
|||||||
);
|
);
|
||||||
|
|
||||||
$queue = $this->getQueue($params);
|
$queue = $this->getQueue($params);
|
||||||
|
$exchange = $this->getExchange($params);
|
||||||
|
|
||||||
$this->connection = $connector->connect($config, $queue);
|
$this->connection = $connector->connect($config, $exchange, $queue);
|
||||||
|
|
||||||
$tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer';
|
$tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer';
|
||||||
|
|
||||||
$consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void {
|
$consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void {
|
||||||
try {
|
try {
|
||||||
$this->info(sprintf('Received message: %s', $message->getBody()));
|
$this->info('Received message');
|
||||||
|
|
||||||
|
if ($message->getRoutingKey()) {
|
||||||
|
$this->info(
|
||||||
|
sprintf('Routing key: %s', $message->getRoutingKey())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->info($message->getBody());
|
||||||
|
|
||||||
$mergedQueue = $this->getMergedQueue($queue, $message);
|
$mergedQueue = $this->getMergedQueue($queue, $message);
|
||||||
|
|
||||||
$handler->handle($mergedQueue, $message);
|
$handler->handle($mergedQueue, $message);
|
||||||
$message->ack();
|
$message->ack();
|
||||||
|
|
||||||
$this->info('Message processed successfully.');
|
$this->info('Message processed');
|
||||||
} catch (Exception $e) {
|
} catch (Exception $e) {
|
||||||
$message->nack();
|
$message->nack();
|
||||||
|
|
||||||
$this->error(
|
$this->error('Error');
|
||||||
sprintf('Processing error: %s', $e->getMessage())
|
$this->error($e->getMessage());
|
||||||
);
|
|
||||||
|
|
||||||
$logger->error($e->getMessage());
|
$logger->error($e->getMessage());
|
||||||
}
|
}
|
||||||
@@ -89,18 +97,20 @@ class Consume extends Command
|
|||||||
|
|
||||||
private function getQueue(ConsumerParameters $params): Queue
|
private function getQueue(ConsumerParameters $params): Queue
|
||||||
{
|
{
|
||||||
$routingKey = $params->routingKey->value() ?? '';
|
|
||||||
|
|
||||||
return new Queue(
|
return new Queue(
|
||||||
name: $params->queue->value() ?? 'default',
|
name: $params->queue->value() ?? 'default',
|
||||||
exchange: new Exchange(
|
|
||||||
name: $params->exchange->value() ?? 'amq.direct',
|
|
||||||
type: $params->exchangeType->value() ?? 'direct',
|
|
||||||
isDefault: $params->exchangeIsDefault->value(),
|
|
||||||
declaration: new ExchangeDeclaration()
|
|
||||||
),
|
|
||||||
declaration: new QueueDeclaration(),
|
declaration: new QueueDeclaration(),
|
||||||
bindings: new QueueBindings($routingKey),
|
bindings: new QueueBindings($params->routingKey->toString()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getExchange(ConsumerParameters $params): Exchange
|
||||||
|
{
|
||||||
|
return new Exchange(
|
||||||
|
name: $params->exchange->toString() ?? 'amq.direct',
|
||||||
|
type: $params->exchangeType->toString() ?? 'direct',
|
||||||
|
isDefault: $params->exchangeIsDefault->toBool(),
|
||||||
|
declaration: new ExchangeDeclaration()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,7 +118,6 @@ class Consume extends Command
|
|||||||
{
|
{
|
||||||
return new Queue(
|
return new Queue(
|
||||||
name: $queue->name(),
|
name: $queue->name(),
|
||||||
exchange: $queue->exchange(),
|
|
||||||
declaration: $queue->declaration(),
|
declaration: $queue->declaration(),
|
||||||
bindings: new QueueBindings(
|
bindings: new QueueBindings(
|
||||||
routingKey: $message->getRoutingKey(),
|
routingKey: $message->getRoutingKey(),
|
||||||
|
|||||||
@@ -4,9 +4,7 @@ declare(strict_types=1);
|
|||||||
|
|
||||||
namespace Diffhead\PHP\LaravelRabbitMQ\Event;
|
namespace Diffhead\PHP\LaravelRabbitMQ\Event;
|
||||||
|
|
||||||
use JsonSerializable;
|
interface Broadcast
|
||||||
|
|
||||||
interface Broadcast extends JsonSerializable
|
|
||||||
{
|
{
|
||||||
public function getConnection(): string;
|
public function getConnection(): string;
|
||||||
public function getQueue(): string;
|
public function getQueue(): string;
|
||||||
|
|||||||
@@ -32,29 +32,34 @@ class PublishEvent implements ShouldQueue
|
|||||||
{
|
{
|
||||||
$config = $this->configuration->getConnection($event->getConnection());
|
$config = $this->configuration->getConnection($event->getConnection());
|
||||||
|
|
||||||
$queue = new Queue(
|
$exchange = new Exchange(
|
||||||
name: $event->getQueue(),
|
name: $event->getExchange(),
|
||||||
exchange: new Exchange(
|
type: $event->getExchangeType(),
|
||||||
name: $event->getExchange(),
|
isDefault: $event->getExchangeIsDefault(),
|
||||||
type: $event->getExchangeType(),
|
declaration: new ExchangeDeclaration()
|
||||||
isDefault: $event->getExchangeIsDefault(),
|
|
||||||
declaration: new ExchangeDeclaration()
|
|
||||||
),
|
|
||||||
declaration: new QueueDeclaration(),
|
|
||||||
bindings: new QueueBindings(
|
|
||||||
routingKey: $event->getRoutingKey()
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
$connection = $this->connector->connect($config, $queue);
|
$queue = null;
|
||||||
|
|
||||||
|
if ($event->getQueue()) {
|
||||||
|
$queue = new Queue(
|
||||||
|
name: $event->getQueue(),
|
||||||
|
declaration: new QueueDeclaration(),
|
||||||
|
bindings: new QueueBindings(
|
||||||
|
routingKey: $event->getRoutingKey()
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
$connection = $this->connector->connect($config, $exchange, $queue);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$message = $this->serializer->serialize($event);
|
$message = $this->serializer->serialize($event);
|
||||||
|
|
||||||
$connection->channel()->basic_publish(
|
$connection->channel()->basic_publish(
|
||||||
msg: $message,
|
msg: $message,
|
||||||
exchange: $queue->exchange()->name(),
|
exchange: $exchange->name(),
|
||||||
routing_key: $queue->bindings()->routingKey(),
|
routing_key: $event->getRoutingKey() ?? '',
|
||||||
);
|
);
|
||||||
} catch (Exception $e) {
|
} catch (Exception $e) {
|
||||||
throw $e;
|
throw $e;
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ class Queue
|
|||||||
{
|
{
|
||||||
public function __construct(
|
public function __construct(
|
||||||
private string $name = 'default',
|
private string $name = 'default',
|
||||||
private Exchange $exchange,
|
|
||||||
private QueueDeclaration $declaration,
|
private QueueDeclaration $declaration,
|
||||||
private QueueBindings $bindings,
|
private QueueBindings $bindings,
|
||||||
) {}
|
) {}
|
||||||
@@ -18,11 +17,6 @@ class Queue
|
|||||||
return $this->name;
|
return $this->name;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function exchange(): Exchange
|
|
||||||
{
|
|
||||||
return $this->exchange;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function declaration(): QueueDeclaration
|
public function declaration(): QueueDeclaration
|
||||||
{
|
{
|
||||||
return $this->declaration;
|
return $this->declaration;
|
||||||
|
|||||||
@@ -6,12 +6,14 @@ namespace Diffhead\PHP\LaravelRabbitMQ\Service;
|
|||||||
|
|
||||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
|
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
|
||||||
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
|
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
|
||||||
|
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
|
||||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
|
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
|
||||||
|
use PhpAmqpLib\Channel\AMQPChannel;
|
||||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||||
|
|
||||||
class Connector
|
class Connector
|
||||||
{
|
{
|
||||||
public function connect(Connection $config, Queue $queue): ConnectionWithChannel
|
public function connect(Connection $config, Exchange $exchange, ?Queue $queue = null): ConnectionWithChannel
|
||||||
{
|
{
|
||||||
$connection = new AMQPStreamConnection(
|
$connection = new AMQPStreamConnection(
|
||||||
$config->host(),
|
$config->host(),
|
||||||
@@ -23,6 +25,35 @@ class Connector
|
|||||||
|
|
||||||
$channel = $connection->channel();
|
$channel = $connection->channel();
|
||||||
|
|
||||||
|
$this->declareNotDefaultExchange($channel, $exchange);
|
||||||
|
|
||||||
|
if ($queue) {
|
||||||
|
$this->declareQueue($channel, $queue);
|
||||||
|
$this->bindQueue($channel, $queue, $exchange);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ConnectionWithChannel($connection, $channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function declareNotDefaultExchange(AMQPChannel $channel, Exchange $exchange): void
|
||||||
|
{
|
||||||
|
if (! $exchange->isDefault()) {
|
||||||
|
$channel->exchange_declare(
|
||||||
|
$exchange->name(),
|
||||||
|
$exchange->type(),
|
||||||
|
$exchange->declaration()->passive(),
|
||||||
|
$exchange->declaration()->durable(),
|
||||||
|
$exchange->declaration()->autoDelete(),
|
||||||
|
$exchange->declaration()->internal(),
|
||||||
|
$exchange->declaration()->nowait(),
|
||||||
|
$exchange->declaration()->arguments(),
|
||||||
|
$exchange->declaration()->ticket()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function declareQueue(AMQPChannel $channel, Queue $queue): void
|
||||||
|
{
|
||||||
$channel->queue_declare(
|
$channel->queue_declare(
|
||||||
$queue->name(),
|
$queue->name(),
|
||||||
$queue->declaration()->passive(),
|
$queue->declaration()->passive(),
|
||||||
@@ -33,30 +64,17 @@ class Connector
|
|||||||
$queue->declaration()->arguments(),
|
$queue->declaration()->arguments(),
|
||||||
$queue->declaration()->ticket()
|
$queue->declaration()->ticket()
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if (! $queue->exchange()->isDefault()) {
|
private function bindQueue(AMQPChannel $channel, Queue $queue, Exchange $exchange): void
|
||||||
$channel->exchange_declare(
|
{
|
||||||
$queue->exchange()->name(),
|
|
||||||
$queue->exchange()->type(),
|
|
||||||
$queue->exchange()->declaration()->passive(),
|
|
||||||
$queue->exchange()->declaration()->durable(),
|
|
||||||
$queue->exchange()->declaration()->autoDelete(),
|
|
||||||
$queue->exchange()->declaration()->internal(),
|
|
||||||
$queue->exchange()->declaration()->nowait(),
|
|
||||||
$queue->exchange()->declaration()->arguments(),
|
|
||||||
$queue->exchange()->declaration()->ticket()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
$channel->queue_bind(
|
$channel->queue_bind(
|
||||||
$queue->name(),
|
$queue->name(),
|
||||||
$queue->exchange()->name(),
|
$exchange->name(),
|
||||||
$queue->bindings()->routingKey(),
|
$queue->bindings()->routingKey(),
|
||||||
$queue->bindings()->nowait(),
|
$queue->bindings()->nowait(),
|
||||||
$queue->bindings()->arguments(),
|
$queue->bindings()->arguments(),
|
||||||
$queue->bindings()->ticket()
|
$queue->bindings()->ticket()
|
||||||
);
|
);
|
||||||
|
|
||||||
return new ConnectionWithChannel($connection, $channel);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,16 +11,16 @@ use PhpAmqpLib\Message\AMQPMessage;
|
|||||||
|
|
||||||
class Serializer implements SerializerInterface
|
class Serializer implements SerializerInterface
|
||||||
{
|
{
|
||||||
public function serialize(object $data): AMQPMessage
|
public function serialize(object $event): AMQPMessage
|
||||||
{
|
{
|
||||||
if ($data instanceof JsonSerializable) {
|
if ($event instanceof JsonSerializable) {
|
||||||
return new AMQPMessage(
|
return new AMQPMessage(
|
||||||
json_encode($data->jsonSerialize())
|
json_encode($event->jsonSerialize())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new InvalidArgumentException(
|
throw new InvalidArgumentException(
|
||||||
'Data should be an instance of BroadcastEvent'
|
'Event should be an instance of JsonSerializable'
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user