uawdijnntqw1x1x1
IP : 18.117.90.244
Hostname : axolotl
Kernel : Linux axolotl 4.9.0-13-amd64 #1 SMP Debian 4.9.228-1 (2020-07-05) x86_64
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
OS : Linux
PATH:
/
var
/
www
/
axolotl
/
data
/
www
/
kirov.axolotls.ru
/
bitrix
/
modules
/
pull
/
lib
/
protobuftransport.php
/
/
<?php namespace Bitrix\Pull; use Bitrix\Main\Config\Option; use Bitrix\Main\SystemException; use Bitrix\Main\Text\BinaryString; use Bitrix\Main\Type\DateTime; use Bitrix\Main\Web\HttpClient; use Bitrix\Pull\Protobuf; use Protobuf\MessageCollection; class ProtobufTransport { protected $hits = 0; protected $bytes = 0; /** * @param array $messages Messages to send to the pull server. */ public static function sendMessages(array $messages) { if(!Config::isProtobufUsed()) { throw new SystemException("Sending messages in protobuf format is not supported by queue server"); } $protobufMessages = static::convertMessages($messages); $requests = static::createRequests($protobufMessages); $requestBatches = static::createRequestBatches($requests); $queueServerUrl = \CHTTP::urlAddParams(Config::getPublishUrl(), ["binaryMode" => "true"]); foreach ($requestBatches as $requestBatch) { $urlWithSignature = $queueServerUrl; $httpClient = new HttpClient(["streamTimeout" => 1]); $bodyStream = $requestBatch->toStream(); if(\CPullOptions::IsServerShared()) { $signature = \CPullChannel::GetSignature($bodyStream->getContents()); $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, ["signature" => $signature]); } $httpClient->disableSslVerification(); $httpClient->query(HttpClient::HTTP_POST, $urlWithSignature, $bodyStream); } return true; } /** * Returns online status for each known channel in the list of private channel ids. * @param array $channels Array of private channel ids. * @return array Return online status for known channels in format [channelId => bool]. */ public static function getOnlineChannels(array $channels) { $result = []; $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest(); $channelBatches = []; $currentChannelBatch = 0; $requestsInChannelBatch = 0; foreach ($channels as $channelId) { $channel = new Protobuf\ChannelId(); $channel->setId(hex2bin($channelId)); $channel->setIsPrivate(true); $requestsInChannelBatch++; if($requestsInChannelBatch >= $maxChannelsPerRequest) { $currentChannelBatch++; $requestsInChannelBatch = 1; } $channelBatches[$currentChannelBatch][] = $channel; } $requests = []; foreach ($channelBatches as $channelBatchNumber => $channelBatch) { $channelsStatsRequest = new Protobuf\ChannelStatsRequest(); $channelsStatsRequest->setChannelsList(new MessageCollection($channelBatch)); $request = new Protobuf\Request(); $request->setChannelStats($channelsStatsRequest); $requests[] = $request; } $queueServerUrl = \CHTTP::urlAddParams(Config::getPublishUrl(), ["binaryMode" => "true"]); $requestBatches = static::createRequestBatches($requests); foreach ($requestBatches as $requestBatch) { $http = new HttpClient(); $http->disableSslVerification(); $urlWithSignature = $queueServerUrl; $bodyStream = $requestBatch->toStream(); if(\CPullOptions::IsServerShared()) { $signature = \CPullChannel::GetSignature($bodyStream->getContents()); $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, ["signature" => $signature]); } $binaryResponse = $http->post($urlWithSignature, $bodyStream); if($http->getStatus() != 200) { return []; } if(BinaryString::getLength($binaryResponse) == 0) { return []; } try { $responseBatch = Protobuf\ResponseBatch::fromStream($binaryResponse); } catch (\Exception $e) { return []; } $responses = $responseBatch->getResponsesList(); $response = $responses[0]; if(!($response instanceof Protobuf\Response)) { return[]; } if ($response->hasChannelStats()) { $stats = $response->getChannelStats(); /** @var Protobuf\ChannelStats $channel */ foreach ($stats->getChannelsList() as $channel) { if($channel->getIsOnline()) { $channelId = bin2hex($channel->getId()); $result[$channelId] = true; } } } } return $result; } /** * @param array $messages * @return Protobuf\IncomingMessage[] */ protected static function convertMessages(array $messages) { $result = []; foreach ($messages as $message) { $event = $message['event']; if(!is_array($message['channels']) || count($message['channels']) == 0 || !isset($event['module_id']) || !isset($event['command'])) { continue; } $result = array_merge($result, static::convertMessage($message['channels'], $event)); } return $result; } /** * @param array $channels * @param array $event * * @return Protobuf\IncomingMessage[] */ protected static function convertMessage(array $channels, array $event) { $result = []; $extra = is_array($event['extra']) ? $event['extra'] : []; $extra['server_time'] = $extra['server_time'] ?: new DateTime(); $extra['server_time_unix'] = $extra['server_time_unix'] ?: microtime(true); $extra['server_name'] = Option::get('main', 'server_name', $_SERVER['SERVER_NAME']); $extra['revision_web'] = PULL_REVISION_WEB; $extra['revision_mobile'] = PULL_REVISION_MOBILE; $body = Common::jsonEncode(array( 'module_id' => $event['module_id'], 'command' => $event['command'], 'params' => $event['params'] ?: [], 'extra' => $extra )); // for statistics $messageType = "{$event['module_id']}_{$event['command']}"; $messageType = preg_replace("/[^\w]/", "", $messageType); $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest(); $receivers = []; foreach ($channels as $channel) { $receiver = new Protobuf\Receiver(); $receiver->setIsPrivate(true); $receiver->setId(hex2bin($channel)); $receivers[] = $receiver; if(count($receivers) === $maxChannelsPerRequest) { $message = new Protobuf\IncomingMessage(); $message->setReceiversList(new MessageCollection($receivers)); $message->setExpiry($event['expiry']); $message->setBody($body); $message->setType($messageType); // for statistics $result[] = $message; $receivers = []; } } if(count($receivers) > 0) { $message = new Protobuf\IncomingMessage(); $message->setReceiversList(new MessageCollection($receivers)); $message->setExpiry($event['expiry']); $message->setBody($body); $result[] = $message; } return $result; } /** * @param Protobuf\Request[] $requests * @return Protobuf\RequestBatch[] */ protected static function createRequestBatches(array $requests) { $result = []; foreach ($requests as $request) { $batch = new Protobuf\RequestBatch(); $batch->addRequests($request); $result[] = $batch; } return $result; } /** * @param Protobuf\IncomingMessage[] $messages * @return Protobuf\Request[] */ protected static function createRequests(array $messages) { $result = []; $maxPayload = \CPullOptions::GetMaxPayload() - 200; $maxMessages = \CPullOptions::GetMaxMessagesPerRequest(); $currentMessageBatch = []; $currentBatchSize = 0; foreach ($messages as $message) { $messageSize = static::getMessageSize($message); if($currentBatchSize + $messageSize >= $maxPayload || count($currentMessageBatch) >= $maxMessages) { // finalize current request and start a new one $incomingMessagesRequest = new Protobuf\IncomingMessagesRequest(); $incomingMessagesRequest->setMessagesList(new MessageCollection($currentMessageBatch)); $request = new Protobuf\Request(); $request->setIncomingMessages($incomingMessagesRequest); $result[] = $request; $currentMessageBatch = []; $messageSize = 0; } // add the request to the current batch $currentMessageBatch[] = $message; $currentBatchSize += $messageSize; } if(count($currentMessageBatch) > 0) { $incomingMessagesRequest = new Protobuf\IncomingMessagesRequest(); $incomingMessagesRequest->setMessagesList(new MessageCollection($currentMessageBatch)); $request = new Protobuf\Request(); $request->setIncomingMessages($incomingMessagesRequest); $result[] = $request; } return $result; } /** * @param Protobuf\IncomingMessage $message * @param $maxReceivers * @return Protobuf\IncomingMessage[] */ protected static function splitReceivers(Protobuf\IncomingMessage $message, $maxReceivers) { $receivers = $message->getReceiversList(); if(count($receivers) <= $maxReceivers) { return [$message]; } $result = []; $currentReceivers = []; foreach ($receivers as $receiver) { if(count($currentReceivers) == $maxReceivers) { $subMessage = new Protobuf\IncomingMessage(); $subMessage->setBody($message->getBody()); $subMessage->setExpiry($message->getExpiry()); $subMessage->setReceiversList(new MessageCollection($currentReceivers)); $result[] = $subMessage; $currentReceivers = []; } $currentReceivers[] = $receiver; } if(count($currentReceivers) > 0) { $subMessage = new Protobuf\IncomingMessage(); $subMessage->setBody($message->getBody()); $subMessage->setExpiry($message->getExpiry()); $subMessage->setReceiversList(new MessageCollection($currentReceivers)); $result[] = $subMessage; } return $result; } protected static function getMessageSize(Protobuf\IncomingMessage $message) { $config = \Protobuf\Configuration::getInstance(); return $message->serializedSize($config->createComputeSizeContext()); } }
/var/www/axolotl/data/www/kirov.axolotls.ru/bitrix/modules/pull/lib/protobuftransport.php