Compare commits
3 Commits
v1.1.0
...
8320fb4a84
| Author | SHA1 | Date | |
|---|---|---|---|
| 8320fb4a84 | |||
| c481c961fb | |||
| b81223a550 |
@@ -83,7 +83,7 @@ customizing concrete event or override this behaviour globally by changing
|
||||
following env variables:
|
||||
|
||||
* `RABBITMQ_EVENT_CONNECTION=default`
|
||||
* `RABBITMQ_EVENT_QUEUE=default`
|
||||
* `RABBITMQ_EVENT_QUEUE=`
|
||||
* `RABBITMQ_EVENT_EXCHANGE=amq.direct`
|
||||
* `RABBITMQ_EVENT_EXCHANGE_TYPE=direct`
|
||||
* `RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT=true`
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
"description": "A laravel package for events emitting between services using RabbitMQ as message broker.",
|
||||
"type": "library",
|
||||
"license": "MIT",
|
||||
"version": "1.1.0",
|
||||
"version": "1.2.0",
|
||||
"keywords": [
|
||||
"laravel", "rabbitmq", "event", "emit", "microservice",
|
||||
"pipeline", "data exchanging", "message", "broker", "php8",
|
||||
|
||||
501
composer.lock
generated
501
composer.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -35,7 +35,7 @@ return [
|
||||
*/
|
||||
'defaults' => [
|
||||
'connection' => env('RABBITMQ_EVENT_CONNECTION', 'default'),
|
||||
'queue' => env('RABBITMQ_EVENT_QUEUE', 'default'),
|
||||
'queue' => env('RABBITMQ_EVENT_QUEUE', ''),
|
||||
'exchange' => env('RABBITMQ_EVENT_EXCHANGE', 'amq.direct'),
|
||||
'exchange_type' => env('RABBITMQ_EVENT_EXCHANGE_TYPE', 'direct'),
|
||||
'exchange_is_default' => (bool) env('RABBITMQ_EVENT_EXCHANGE_IS_DEFAULT', true),
|
||||
|
||||
@@ -46,27 +46,35 @@ class Consume extends Command
|
||||
);
|
||||
|
||||
$queue = $this->getQueue($params);
|
||||
$exchange = $this->getExchange($params);
|
||||
|
||||
$this->connection = $connector->connect($config, $queue);
|
||||
$this->connection = $connector->connect($config, $exchange, $queue);
|
||||
|
||||
$tag = $params->tag->value() ?? 'rabbitmq-laravel-consumer';
|
||||
|
||||
$consumer = function (AMQPMessage $message) use ($handler, $queue, $logger): void {
|
||||
try {
|
||||
$this->info(sprintf('Received message: %s', $message->getBody()));
|
||||
$this->info('Received message');
|
||||
|
||||
if ($message->getRoutingKey()) {
|
||||
$this->info(
|
||||
sprintf('Routing key: %s', $message->getRoutingKey())
|
||||
);
|
||||
}
|
||||
|
||||
$this->info($message->getBody());
|
||||
|
||||
$mergedQueue = $this->getMergedQueue($queue, $message);
|
||||
|
||||
$handler->handle($mergedQueue, $message);
|
||||
$message->ack();
|
||||
|
||||
$this->info('Message processed successfully.');
|
||||
$this->info('Message processed');
|
||||
} catch (Exception $e) {
|
||||
$message->nack();
|
||||
|
||||
$this->error(
|
||||
sprintf('Processing error: %s', $e->getMessage())
|
||||
);
|
||||
$this->error('Error');
|
||||
$this->error($e->getMessage());
|
||||
|
||||
$logger->error($e->getMessage());
|
||||
}
|
||||
@@ -89,18 +97,20 @@ class Consume extends Command
|
||||
|
||||
private function getQueue(ConsumerParameters $params): Queue
|
||||
{
|
||||
$routingKey = $params->routingKey->value() ?? '';
|
||||
|
||||
return new Queue(
|
||||
name: $params->queue->value() ?? 'default',
|
||||
exchange: new Exchange(
|
||||
name: $params->exchange->value() ?? 'amq.direct',
|
||||
type: $params->exchangeType->value() ?? 'direct',
|
||||
isDefault: $params->exchangeIsDefault->value(),
|
||||
declaration: new ExchangeDeclaration()
|
||||
),
|
||||
declaration: new QueueDeclaration(),
|
||||
bindings: new QueueBindings($routingKey),
|
||||
bindings: new QueueBindings($params->routingKey->toString()),
|
||||
);
|
||||
}
|
||||
|
||||
private function getExchange(ConsumerParameters $params): Exchange
|
||||
{
|
||||
return new Exchange(
|
||||
name: $params->exchange->toString() ?? 'amq.direct',
|
||||
type: $params->exchangeType->toString() ?? 'direct',
|
||||
isDefault: $params->exchangeIsDefault->toBool(),
|
||||
declaration: new ExchangeDeclaration()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -108,7 +118,6 @@ class Consume extends Command
|
||||
{
|
||||
return new Queue(
|
||||
name: $queue->name(),
|
||||
exchange: $queue->exchange(),
|
||||
declaration: $queue->declaration(),
|
||||
bindings: new QueueBindings(
|
||||
routingKey: $message->getRoutingKey(),
|
||||
|
||||
@@ -4,9 +4,7 @@ declare(strict_types=1);
|
||||
|
||||
namespace Diffhead\PHP\LaravelRabbitMQ\Event;
|
||||
|
||||
use JsonSerializable;
|
||||
|
||||
interface Broadcast extends JsonSerializable
|
||||
interface Broadcast
|
||||
{
|
||||
public function getConnection(): string;
|
||||
public function getQueue(): string;
|
||||
|
||||
@@ -32,29 +32,34 @@ class PublishEvent implements ShouldQueue
|
||||
{
|
||||
$config = $this->configuration->getConnection($event->getConnection());
|
||||
|
||||
$queue = new Queue(
|
||||
name: $event->getQueue(),
|
||||
exchange: new Exchange(
|
||||
name: $event->getExchange(),
|
||||
type: $event->getExchangeType(),
|
||||
isDefault: $event->getExchangeIsDefault(),
|
||||
declaration: new ExchangeDeclaration()
|
||||
),
|
||||
declaration: new QueueDeclaration(),
|
||||
bindings: new QueueBindings(
|
||||
routingKey: $event->getRoutingKey()
|
||||
),
|
||||
$exchange = new Exchange(
|
||||
name: $event->getExchange(),
|
||||
type: $event->getExchangeType(),
|
||||
isDefault: $event->getExchangeIsDefault(),
|
||||
declaration: new ExchangeDeclaration()
|
||||
);
|
||||
|
||||
$connection = $this->connector->connect($config, $queue);
|
||||
$queue = null;
|
||||
|
||||
if ($event->getQueue()) {
|
||||
$queue = new Queue(
|
||||
name: $event->getQueue(),
|
||||
declaration: new QueueDeclaration(),
|
||||
bindings: new QueueBindings(
|
||||
routingKey: $event->getRoutingKey()
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
$connection = $this->connector->connect($config, $exchange, $queue);
|
||||
|
||||
try {
|
||||
$message = $this->serializer->serialize($event);
|
||||
|
||||
$connection->channel()->basic_publish(
|
||||
msg: $message,
|
||||
exchange: $queue->exchange()->name(),
|
||||
routing_key: $queue->bindings()->routingKey(),
|
||||
exchange: $exchange->name(),
|
||||
routing_key: $event->getRoutingKey() ?? '',
|
||||
);
|
||||
} catch (Exception $e) {
|
||||
throw $e;
|
||||
|
||||
@@ -8,7 +8,6 @@ class Queue
|
||||
{
|
||||
public function __construct(
|
||||
private string $name = 'default',
|
||||
private Exchange $exchange,
|
||||
private QueueDeclaration $declaration,
|
||||
private QueueBindings $bindings,
|
||||
) {}
|
||||
@@ -18,11 +17,6 @@ class Queue
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
public function exchange(): Exchange
|
||||
{
|
||||
return $this->exchange;
|
||||
}
|
||||
|
||||
public function declaration(): QueueDeclaration
|
||||
{
|
||||
return $this->declaration;
|
||||
|
||||
@@ -6,12 +6,14 @@ namespace Diffhead\PHP\LaravelRabbitMQ\Service;
|
||||
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Connection;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\ConnectionWithChannel;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Exchange;
|
||||
use Diffhead\PHP\LaravelRabbitMQ\Object\Queue;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
|
||||
class Connector
|
||||
{
|
||||
public function connect(Connection $config, Queue $queue): ConnectionWithChannel
|
||||
public function connect(Connection $config, Exchange $exchange, ?Queue $queue = null): ConnectionWithChannel
|
||||
{
|
||||
$connection = new AMQPStreamConnection(
|
||||
$config->host(),
|
||||
@@ -23,6 +25,35 @@ class Connector
|
||||
|
||||
$channel = $connection->channel();
|
||||
|
||||
$this->declareNotDefaultExchange($channel, $exchange);
|
||||
|
||||
if ($queue) {
|
||||
$this->declareQueue($channel, $queue);
|
||||
$this->bindQueue($channel, $queue, $exchange);
|
||||
}
|
||||
|
||||
return new ConnectionWithChannel($connection, $channel);
|
||||
}
|
||||
|
||||
private function declareNotDefaultExchange(AMQPChannel $channel, Exchange $exchange): void
|
||||
{
|
||||
if (! $exchange->isDefault()) {
|
||||
$channel->exchange_declare(
|
||||
$exchange->name(),
|
||||
$exchange->type(),
|
||||
$exchange->declaration()->passive(),
|
||||
$exchange->declaration()->durable(),
|
||||
$exchange->declaration()->autoDelete(),
|
||||
$exchange->declaration()->internal(),
|
||||
$exchange->declaration()->nowait(),
|
||||
$exchange->declaration()->arguments(),
|
||||
$exchange->declaration()->ticket()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private function declareQueue(AMQPChannel $channel, Queue $queue): void
|
||||
{
|
||||
$channel->queue_declare(
|
||||
$queue->name(),
|
||||
$queue->declaration()->passive(),
|
||||
@@ -33,30 +64,17 @@ class Connector
|
||||
$queue->declaration()->arguments(),
|
||||
$queue->declaration()->ticket()
|
||||
);
|
||||
}
|
||||
|
||||
if (! $queue->exchange()->isDefault()) {
|
||||
$channel->exchange_declare(
|
||||
$queue->exchange()->name(),
|
||||
$queue->exchange()->type(),
|
||||
$queue->exchange()->declaration()->passive(),
|
||||
$queue->exchange()->declaration()->durable(),
|
||||
$queue->exchange()->declaration()->autoDelete(),
|
||||
$queue->exchange()->declaration()->internal(),
|
||||
$queue->exchange()->declaration()->nowait(),
|
||||
$queue->exchange()->declaration()->arguments(),
|
||||
$queue->exchange()->declaration()->ticket()
|
||||
);
|
||||
}
|
||||
|
||||
private function bindQueue(AMQPChannel $channel, Queue $queue, Exchange $exchange): void
|
||||
{
|
||||
$channel->queue_bind(
|
||||
$queue->name(),
|
||||
$queue->exchange()->name(),
|
||||
$exchange->name(),
|
||||
$queue->bindings()->routingKey(),
|
||||
$queue->bindings()->nowait(),
|
||||
$queue->bindings()->arguments(),
|
||||
$queue->bindings()->ticket()
|
||||
);
|
||||
|
||||
return new ConnectionWithChannel($connection, $channel);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,16 +11,16 @@ use PhpAmqpLib\Message\AMQPMessage;
|
||||
|
||||
class Serializer implements SerializerInterface
|
||||
{
|
||||
public function serialize(object $data): AMQPMessage
|
||||
public function serialize(object $event): AMQPMessage
|
||||
{
|
||||
if ($data instanceof JsonSerializable) {
|
||||
if ($event instanceof JsonSerializable) {
|
||||
return new AMQPMessage(
|
||||
json_encode($data->jsonSerialize())
|
||||
json_encode($event->jsonSerialize())
|
||||
);
|
||||
}
|
||||
|
||||
throw new InvalidArgumentException(
|
||||
'Data should be an instance of BroadcastEvent'
|
||||
'Event should be an instance of JsonSerializable'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user