2022-10-17 21:25:03 +02:00
< ? php declare ( strict_types = 1 );
/*
* This file is part of the Monolog package .
*
* ( c ) Jordi Boggiano < j . boggiano @ seld . be >
*
* For the full copyright and license information , please view the LICENSE
* file that was distributed with this source code .
*/
namespace Monolog\Handler ;
2023-01-18 00:17:49 +01:00
use Monolog\Logger ;
2022-10-17 21:25:03 +02:00
use Monolog\Formatter\FormatterInterface ;
use Monolog\Formatter\JsonFormatter ;
use PhpAmqpLib\Message\AMQPMessage ;
use PhpAmqpLib\Channel\AMQPChannel ;
use AMQPExchange ;
2023-01-18 00:17:49 +01:00
/**
* @ phpstan - import - type Record from \Monolog\Logger
*/
2022-10-17 21:25:03 +02:00
class AmqpHandler extends AbstractProcessingHandler
{
/**
2023-01-18 00:17:49 +01:00
* @ var AMQPExchange | AMQPChannel $exchange
2022-10-17 21:25:03 +02:00
*/
2023-01-18 00:17:49 +01:00
protected $exchange ;
/** @var array<string, mixed> */
private $extraAttributes = [];
2022-10-17 21:25:03 +02:00
/**
* @ return array < string , mixed >
*/
public function getExtraAttributes () : array
{
return $this -> extraAttributes ;
}
/**
* Configure extra attributes to pass to the AMQPExchange ( if you are using the amqp extension )
*
* @ param array < string , mixed > $extraAttributes One of content_type , content_encoding ,
* message_id , user_id , app_id , delivery_mode ,
* priority , timestamp , expiration , type
* or reply_to , headers .
2023-01-18 00:17:49 +01:00
* @ return AmqpHandler
2022-10-17 21:25:03 +02:00
*/
public function setExtraAttributes ( array $extraAttributes ) : self
{
$this -> extraAttributes = $extraAttributes ;
return $this ;
}
/**
2023-01-18 00:17:49 +01:00
* @ var string
2022-10-17 21:25:03 +02:00
*/
2023-01-18 00:17:49 +01:00
protected $exchangeName ;
/**
* @ param AMQPExchange | AMQPChannel $exchange AMQPExchange ( php AMQP ext ) or PHP AMQP lib channel , ready for use
* @ param string | null $exchangeName Optional exchange name , for AMQPChannel ( PhpAmqpLib ) only
*/
public function __construct ( $exchange , ? string $exchangeName = null , $level = Logger :: DEBUG , bool $bubble = true )
2022-10-17 21:25:03 +02:00
{
2023-01-18 00:17:49 +01:00
if ( $exchange instanceof AMQPChannel ) {
$this -> exchangeName = ( string ) $exchangeName ;
} elseif ( ! $exchange instanceof AMQPExchange ) {
throw new \InvalidArgumentException ( 'PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required' );
} elseif ( $exchangeName ) {
@ trigger_error ( 'The $exchangeName parameter can only be passed when using PhpAmqpLib, if using an AMQPExchange instance configure it beforehand' , E_USER_DEPRECATED );
}
$this -> exchange = $exchange ;
parent :: __construct ( $level , $bubble );
}
/**
* { @ inheritDoc }
*/
protected function write ( array $record ) : void
{
$data = $record [ " formatted " ];
2022-10-17 21:25:03 +02:00
$routingKey = $this -> getRoutingKey ( $record );
if ( $this -> exchange instanceof AMQPExchange ) {
$attributes = [
'delivery_mode' => 2 ,
'content_type' => 'application/json' ,
];
2023-01-18 00:17:49 +01:00
if ( $this -> extraAttributes ) {
2022-10-17 21:25:03 +02:00
$attributes = array_merge ( $attributes , $this -> extraAttributes );
}
$this -> exchange -> publish (
$data ,
$routingKey ,
0 ,
$attributes
);
} else {
$this -> exchange -> basic_publish (
$this -> createAmqpMessage ( $data ),
$this -> exchangeName ,
$routingKey
);
}
}
/**
2023-01-18 00:17:49 +01:00
* { @ inheritDoc }
2022-10-17 21:25:03 +02:00
*/
public function handleBatch ( array $records ) : void
{
if ( $this -> exchange instanceof AMQPExchange ) {
parent :: handleBatch ( $records );
return ;
}
foreach ( $records as $record ) {
if ( ! $this -> isHandling ( $record )) {
continue ;
}
2023-01-18 00:17:49 +01:00
/** @var Record $record */
2022-10-17 21:25:03 +02:00
$record = $this -> processRecord ( $record );
$data = $this -> getFormatter () -> format ( $record );
$this -> exchange -> batch_basic_publish (
$this -> createAmqpMessage ( $data ),
$this -> exchangeName ,
$this -> getRoutingKey ( $record )
);
}
$this -> exchange -> publish_batch ();
}
/**
* Gets the routing key for the AMQP exchange
2023-01-18 00:17:49 +01:00
*
* @ phpstan - param Record $record
2022-10-17 21:25:03 +02:00
*/
2023-01-18 00:17:49 +01:00
protected function getRoutingKey ( array $record ) : string
2022-10-17 21:25:03 +02:00
{
2023-01-18 00:17:49 +01:00
$routingKey = sprintf ( '%s.%s' , $record [ 'level_name' ], $record [ 'channel' ]);
2022-10-17 21:25:03 +02:00
return strtolower ( $routingKey );
}
private function createAmqpMessage ( string $data ) : AMQPMessage
{
return new AMQPMessage (
$data ,
[
'delivery_mode' => 2 ,
'content_type' => 'application/json' ,
]
);
}
/**
2023-01-18 00:17:49 +01:00
* { @ inheritDoc }
2022-10-17 21:25:03 +02:00
*/
protected function getDefaultFormatter () : FormatterInterface
{
return new JsonFormatter ( JsonFormatter :: BATCH_MODE_JSON , false );
}
}