Spaces:
No application file
No application file
| namespace Mautic\ChannelBundle\Model; | |
| use Doctrine\ORM\EntityManagerInterface; | |
| use Mautic\ChannelBundle\ChannelEvents; | |
| use Mautic\ChannelBundle\Entity\MessageQueue; | |
| use Mautic\ChannelBundle\Event\MessageQueueBatchProcessEvent; | |
| use Mautic\ChannelBundle\Event\MessageQueueEvent; | |
| use Mautic\ChannelBundle\Event\MessageQueueProcessEvent; | |
| use Mautic\CoreBundle\Helper\CoreParametersHelper; | |
| use Mautic\CoreBundle\Helper\UserHelper; | |
| use Mautic\CoreBundle\Model\FormModel; | |
| use Mautic\CoreBundle\Security\Permissions\CorePermissions; | |
| use Mautic\CoreBundle\Translation\Translator; | |
| use Mautic\LeadBundle\Entity\Lead; | |
| use Mautic\LeadBundle\Model\CompanyModel; | |
| use Mautic\LeadBundle\Model\LeadModel; | |
| use Psr\Log\LoggerInterface; | |
| use Symfony\Component\EventDispatcher\EventDispatcherInterface; | |
| use Symfony\Component\Routing\Generator\UrlGeneratorInterface; | |
| use Symfony\Contracts\EventDispatcher\Event; | |
| /** | |
| * @extends FormModel<MessageQueue> | |
| */ | |
| class MessageQueueModel extends FormModel | |
| { | |
| /** | |
| * @var string A default message reschedule interval | |
| */ | |
| public const DEFAULT_RESCHEDULE_INTERVAL = 'PT15M'; | |
| public function __construct( | |
| protected LeadModel $leadModel, | |
| protected CompanyModel $companyModel, | |
| CoreParametersHelper $coreParametersHelper, | |
| EntityManagerInterface $em, | |
| CorePermissions $security, | |
| EventDispatcherInterface $dispatcher, | |
| UrlGeneratorInterface $router, | |
| Translator $translator, | |
| UserHelper $userHelper, | |
| LoggerInterface $mauticLogger | |
| ) { | |
| parent::__construct($em, $security, $dispatcher, $router, $translator, $userHelper, $mauticLogger, $coreParametersHelper); | |
| } | |
| /** | |
| * @return \Mautic\ChannelBundle\Entity\MessageQueueRepository | |
| */ | |
| public function getRepository() | |
| { | |
| return $this->em->getRepository(MessageQueue::class); | |
| } | |
| /** | |
| * @param int $attempts | |
| * @param int $priority | |
| * @param mixed $messageQueue | |
| * @param string $statTableName | |
| * @param string $statContactColumn | |
| * @param string $statSentColumn | |
| */ | |
| public function processFrequencyRules( | |
| array &$leads, | |
| $channel, | |
| $channelId, | |
| $campaignEventId = null, | |
| $attempts = 3, | |
| $priority = MessageQueue::PRIORITY_NORMAL, | |
| $messageQueue = null, | |
| $statTableName = 'email_stats', | |
| $statContactColumn = 'lead_id', | |
| $statSentColumn = 'date_sent' | |
| ): array { | |
| $leadIds = array_keys($leads); | |
| $leadIds = array_combine($leadIds, $leadIds); | |
| /** @var \Mautic\LeadBundle\Entity\FrequencyRuleRepository $frequencyRulesRepo */ | |
| $frequencyRulesRepo = $this->em->getRepository(\Mautic\LeadBundle\Entity\FrequencyRule::class); | |
| $defaultFrequencyNumber = $this->coreParametersHelper->get($channel.'_frequency_number'); | |
| $defaultFrequencyTime = $this->coreParametersHelper->get($channel.'_frequency_time'); | |
| $dontSendTo = $frequencyRulesRepo->getAppliedFrequencyRules( | |
| $channel, | |
| $leadIds, | |
| $defaultFrequencyNumber, | |
| $defaultFrequencyTime, | |
| $statTableName, | |
| $statContactColumn, | |
| $statSentColumn | |
| ); | |
| $queuedContacts = []; | |
| foreach ($dontSendTo as $frequencyRuleMet) { | |
| // We only deal with date intervals here (no time intervals) so it's safe to use 'P' | |
| $scheduleInterval = new \DateInterval('P1'.substr($frequencyRuleMet['frequency_time'], 0, 1)); | |
| if ($messageQueue && isset($messageQueue[$frequencyRuleMet['lead_id']])) { | |
| $this->reschedule($messageQueue[$frequencyRuleMet['lead_id']], $scheduleInterval); | |
| } else { | |
| // Queue this message to be processed by frequency and priority | |
| $this->queue( | |
| [$leads[$frequencyRuleMet['lead_id']]], | |
| $channel, | |
| $channelId, | |
| $scheduleInterval, | |
| $attempts, | |
| $priority, | |
| $campaignEventId | |
| ); | |
| } | |
| $queuedContacts[$frequencyRuleMet['lead_id']] = $frequencyRuleMet['lead_id']; | |
| unset($leads[$frequencyRuleMet['lead_id']]); | |
| } | |
| return $queuedContacts; | |
| } | |
| /** | |
| * Adds messages to the queue. | |
| * | |
| * @param array $leads | |
| * @param string $channel | |
| * @param int $channelId | |
| * @param int $maxAttempts | |
| * @param int $priority | |
| * @param int|null $campaignEventId | |
| * @param array $options | |
| */ | |
| public function queue( | |
| $leads, | |
| $channel, | |
| $channelId, | |
| \DateInterval $scheduledInterval, | |
| $maxAttempts = 1, | |
| $priority = 1, | |
| $campaignEventId = null, | |
| $options = [] | |
| ): bool { | |
| $messageQueues = []; | |
| $scheduledDate = (new \DateTime())->add($scheduledInterval); | |
| foreach ($leads as $lead) { | |
| $leadId = (is_array($lead)) ? $lead['id'] : $lead->getId(); | |
| if (!empty($this->getRepository()->findMessage($channel, $channelId, $leadId))) { | |
| continue; | |
| } | |
| $messageQueue = new MessageQueue(); | |
| if ($campaignEventId) { | |
| $messageQueue->setEvent($this->em->getReference(\Mautic\CampaignBundle\Entity\Event::class, $campaignEventId)); | |
| } | |
| $messageQueue->setChannel($channel); | |
| $messageQueue->setChannelId($channelId); | |
| $messageQueue->setDatePublished(new \DateTime()); | |
| $messageQueue->setMaxAttempts($maxAttempts); | |
| $messageQueue->setLead( | |
| ($lead instanceof Lead) ? $lead : $this->em->getReference(Lead::class, $leadId) | |
| ); | |
| $messageQueue->setPriority($priority); | |
| $messageQueue->setScheduledDate($scheduledDate); | |
| $messageQueue->setOptions($options); | |
| $messageQueues[] = $messageQueue; | |
| } | |
| if ($messageQueues) { | |
| $this->saveEntities($messageQueues); | |
| $messageQueueRepository = $this->getRepository(); | |
| $messageQueueRepository->detachEntities($messageQueues); | |
| } | |
| return true; | |
| } | |
| public function sendMessages($channel = null, $channelId = null): int | |
| { | |
| // Note when the process started for batch purposes | |
| $processStarted = new \DateTime(); | |
| $limit = 50; | |
| $counter = 0; | |
| foreach ($this->getRepository()->getQueuedMessages($limit, $processStarted, $channel, $channelId) as $queue) { | |
| $counter += $this->processMessageQueue($queue); | |
| $event = $queue->getEvent(); | |
| $lead = $queue->getLead(); | |
| if ($event) { | |
| $this->em->detach($event); | |
| } | |
| $this->em->detach($lead); | |
| $this->em->detach($queue); | |
| } | |
| return $counter; | |
| } | |
| public function processMessageQueue($queue): int | |
| { | |
| if (!is_array($queue)) { | |
| if (!$queue instanceof MessageQueue) { | |
| throw new \InvalidArgumentException('$queue must be an instance of '.MessageQueue::class); | |
| } | |
| $queue = [$queue->getId() => $queue]; | |
| } | |
| $counter = 0; | |
| $contacts = []; | |
| $byChannel = []; | |
| // Lead entities will not have profile fields populated due to the custom field use - therefore to optimize resources, | |
| // get a list of leads to fetch details all at once along with company details for dynamic email content, etc | |
| /** @var MessageQueue $message */ | |
| foreach ($queue as $message) { | |
| if ($message->getLead()) { | |
| $contacts[$message->getId()] = $message->getLead()->getId(); | |
| } | |
| } | |
| if (!empty($contacts)) { | |
| $contactData = $this->leadModel->getRepository()->getContacts($contacts); | |
| foreach ($contacts as $messageId => $contactId) { | |
| $queue[$messageId]->getLead()->setFields($contactData[$contactId]); | |
| } | |
| } | |
| // Group queue by channel and channel ID - this make it possible for processing listeners to batch process such as | |
| // sending emails in batches to 3rd party transactional services via HTTP APIs | |
| foreach ($queue as $key => $message) { | |
| if (MessageQueue::STATUS_SENT == $message->getStatus()) { | |
| unset($queue[$key]); | |
| continue; | |
| } | |
| $messageChannel = $message->getChannel(); | |
| $messageChannelId = $message->getChannelId(); | |
| if (!$messageChannelId) { | |
| $messageChannelId = 0; | |
| } | |
| if (!isset($byChannel[$messageChannel])) { | |
| $byChannel[$messageChannel] = []; | |
| } | |
| if (!isset($byChannel[$messageChannel][$messageChannelId])) { | |
| $byChannel[$messageChannel][$messageChannelId] = []; | |
| } | |
| $byChannel[$messageChannel][$messageChannelId][] = $message; | |
| } | |
| // First try to batch process each channel | |
| foreach ($byChannel as $messageChannel => $channelMessages) { | |
| foreach ($channelMessages as $messageChannelId => $messages) { | |
| $event = new MessageQueueBatchProcessEvent($messages, $messageChannel, $messageChannelId); | |
| $ignore = null; | |
| $this->dispatchEvent('process_batch_message_queue', $ignore, false, $event); | |
| } | |
| } | |
| unset($byChannel); | |
| // Now check to see if the message was processed by the listener and if not | |
| // send it through a single process event listener | |
| foreach ($queue as $message) { | |
| if (!$message->isProcessed()) { | |
| $event = new MessageQueueProcessEvent($message); | |
| $this->dispatchEvent('process_message_queue', $message, false, $event); | |
| } | |
| if ($message->isSuccess()) { | |
| ++$counter; | |
| $message->setSuccess(); | |
| $message->setLastAttempt(new \DateTime()); | |
| $message->setDateSent(new \DateTime()); | |
| $message->setStatus(MessageQueue::STATUS_SENT); | |
| } elseif ($message->isFailed()) { | |
| // Failure such as email delivery issue or something so retry in a short time | |
| $this->reschedule($message, new \DateInterval(self::DEFAULT_RESCHEDULE_INTERVAL)); | |
| } // otherwise assume the listener did something such as rescheduling the message | |
| } | |
| // add listener | |
| $this->saveEntities($queue); | |
| return $counter; | |
| } | |
| /** | |
| * @param bool $persist | |
| */ | |
| public function reschedule($message, \DateInterval $rescheduleInterval, $leadId = null, $channel = null, $channelId = null, $persist = false): void | |
| { | |
| if (!$message instanceof MessageQueue && $leadId && $channel && $channelId) { | |
| $message = $this->getRepository()->findMessage($channel, $channelId, $leadId); | |
| $persist = true; | |
| } | |
| if (!$message) { | |
| return; | |
| } | |
| $message->setAttempts($message->getAttempts() + 1); | |
| $message->setLastAttempt(new \DateTime()); | |
| $rescheduleTo = clone $message->getScheduledDate(); | |
| $rescheduleTo->add($rescheduleInterval); | |
| $message->setScheduledDate($rescheduleTo); | |
| $message->setStatus(MessageQueue::STATUS_RESCHEDULED); | |
| if ($persist) { | |
| $this->saveEntity($message); | |
| } | |
| // Mark as processed for listeners | |
| $message->setProcessed(); | |
| } | |
| /** | |
| * @deprecated to be removed in 3.0; use reschedule method instead | |
| * | |
| * @param string $rescheduleInterval | |
| * @param bool $persist | |
| */ | |
| public function rescheduleMessage($message, $rescheduleInterval = null, $leadId = null, $channel = null, $channelId = null, $persist = false): void | |
| { | |
| $rescheduleInterval = null == $rescheduleInterval ? self::DEFAULT_RESCHEDULE_INTERVAL : ('P'.$rescheduleInterval); | |
| $this->reschedule($message, new \DateInterval($rescheduleInterval), $leadId, $channel, $channelId, $persist); | |
| } | |
| /** | |
| * @param array $channelIds | |
| */ | |
| public function getQueuedChannelCount($channel, $channelIds = []): int | |
| { | |
| return $this->getRepository()->getQueuedChannelCount($channel, $channelIds); | |
| } | |
| /** | |
| * @throws \Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException | |
| */ | |
| protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null): ?Event | |
| { | |
| switch ($action) { | |
| case 'process_message_queue': | |
| $name = ChannelEvents::PROCESS_MESSAGE_QUEUE; | |
| break; | |
| case 'process_batch_message_queue': | |
| $name = ChannelEvents::PROCESS_MESSAGE_QUEUE_BATCH; | |
| break; | |
| case 'post_save': | |
| $name = ChannelEvents::MESSAGE_QUEUED; | |
| break; | |
| default: | |
| return null; | |
| } | |
| if ($this->dispatcher->hasListeners($name)) { | |
| if (empty($event)) { | |
| $event = new MessageQueueEvent($entity, $isNew); | |
| $event->setEntityManager($this->em); | |
| } | |
| $this->dispatcher->dispatch($event, $name); | |
| return $event; | |
| } else { | |
| return null; | |
| } | |
| } | |
| } | |