vendor/blackbit/data-director/EventListener/AutomaticImportListener.php line 80

Open in your IDE?
  1. <?php
  2. /**
  3.  * Copyright Blackbit digital Commerce GmbH <info@blackbit.de>
  4.  *
  5.  * This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version.
  6.  *
  7.  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  8.  *
  9.  * You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  10.  */
  11. namespace Blackbit\DataDirectorBundle\EventListener;
  12. use Blackbit\DataDirectorBundle\Controller\ImportController;
  13. use Blackbit\DataDirectorBundle\lib\Pim\Helper;
  14. use Blackbit\DataDirectorBundle\lib\Pim\Import\CallbackFunction;
  15. use Blackbit\DataDirectorBundle\lib\Pim\Item\ImporterInterface;
  16. use Blackbit\DataDirectorBundle\lib\Pim\Item\ItemMoldBuilder;
  17. use Blackbit\DataDirectorBundle\lib\Pim\RawData\Importer;
  18. use Blackbit\DataDirectorBundle\lib\Pim\Serializer;
  19. use Blackbit\DataDirectorBundle\model\Dataport;
  20. use Blackbit\DataDirectorBundle\model\DataportResource;
  21. use Blackbit\DataDirectorBundle\model\Fieldmapping;
  22. use Blackbit\DataDirectorBundle\model\ImportStatus;
  23. use Blackbit\DataDirectorBundle\model\PimcoreDbRepository;
  24. use Blackbit\DataDirectorBundle\model\Queue;
  25. use Blackbit\DataDirectorBundle\model\RawItem;
  26. use Blackbit\DataDirectorBundle\model\RawItemData;
  27. use Blackbit\DataDirectorBundle\Tools\Installer;
  28. use InvalidArgumentException;
  29. use Pimcore\Db;
  30. use Pimcore\Event\Model\AssetEvent;
  31. use Pimcore\Event\Model\ElementEventInterface;
  32. use Pimcore\Logger;
  33. use Pimcore\Model\AbstractModel;
  34. use Pimcore\Model\Asset;
  35. use Pimcore\Model\DataObject\AbstractObject;
  36. use Pimcore\Model\DataObject\Concrete;
  37. use Pimcore\Model\Element\DirtyIndicatorInterface;
  38. use Pimcore\Model\Element\ElementInterface;
  39. use Pimcore\Model\Element\Service;
  40. use Pimcore\Model\User;
  41. use Pimcore\Model\Version;
  42. use Pimcore\Tool;
  43. use Blackbit\DataDirectorBundle\lib\Pim\Cli;
  44. use Psr\Log\LoggerAwareInterface;
  45. use Psr\Log\LoggerAwareTrait;
  46. use Psr\Log\LoggerInterface;
  47. use Symfony\Component\HttpFoundation\Request;
  48. class AutomaticImportListener implements LoggerAwareInterface
  49. {
  50.     use LoggerAwareTrait;
  51.     private static $processCommandRegistered = [];
  52.     private static $dataports null;
  53.     /** @var ItemMoldBuilder */
  54.     private $itemMoldBuilder;
  55.     public function __construct(ItemMoldBuilder $itemMoldBuilder)
  56.     {
  57.         $this->itemMoldBuilder $itemMoldBuilder;
  58.     }
  59.     private static function getDataports() {
  60.         if(self::$dataports === null) {
  61.             self::$dataports Dataport::getInstance()->find([], 'name');
  62.             foreach(self::$dataports as &$dataport) {
  63.                 $dataport['sourceconfig'] = unserialize($dataport['sourceconfig'], ['allowed_classes' => false]);
  64.                 $dataport['targetconfig'] = unserialize($dataport['targetconfig'], ['allowed_classes' => false]);
  65.             }
  66.             unset($dataport);
  67.         }
  68.         return self::$dataports;
  69.     }
  70.     public function startImports(ElementEventInterface $e) {
  71.         if(\method_exists($e'getArgument')) {
  72.             try {
  73.                 $saveVersionOnly $e->getArgument('saveVersionOnly');
  74.                 if($saveVersionOnly) {
  75.                     return;
  76.                 }
  77.             } catch(\InvalidArgumentException $exception) {
  78.             }
  79.         }
  80.         $object $e->getElement();
  81.         $user Helper::getUser();
  82.         if($user instanceof User) {
  83.             PimcoreDbRepository::getInstance()->execute('UPDATE edit_lock SET date=? WHERE cid=? AND ctype=? AND userId=?', [time(), $object->getId(), Service::getElementType($object), $user->getId()]);
  84.         }
  85.         $dataports Dataport::getInstance();
  86.         $dataportResourceRepository DataportResource::getInstance();
  87.         $queue Queue::getInstance();
  88.         $queueAddItems = [];
  89.         foreach(self::getDataports() as $dataport) {
  90.             $originalLocale \Pimcore::getContainer()->get('pimcore.locale')->getLocale() ?? Tool::getDefaultLanguage();
  91.             try {
  92.                 $sourceConfig $dataport['sourceconfig'];
  93.                 $targetConfig $dataport['targetconfig'];
  94.                 if(!empty($sourceConfig['autoImport'])) {
  95.                     $parser $dataports->getParser($dataport['id']);
  96.                     if(method_exists($parser'disableLoggingNotFoundImportResource')) {
  97.                         $parser->disableLoggingNotFoundImportResource();
  98.                     }
  99.                     if(\method_exists($parser'setSourceFile') && \method_exists($parser'getFileConditionFromObject')) {
  100.                         if (empty($targetConfig['itemClass'])) {
  101.                             // Export
  102.                             if (empty($sourceConfig['incrementalExport'])) {
  103.                                 $dataportResources $dataportResourceRepository->find(['dataportId = ?' => $dataport['id']]);
  104.                             } else {
  105.                                 $dataportResources = [
  106.                                     $dataportResourceRepository->create(
  107.                                         [
  108.                                             'dataportId' => $dataport['id'],
  109.                                             'resource' => \json_encode([], JSON_UNESCAPED_SLASHES)
  110.                                         ]
  111.                                     )
  112.                                 ];
  113.                             }
  114.                             $dataportResourceIdsStillContainingObject = [];
  115.                             foreach ($dataportResources as $dataportResource) {
  116.                                 try {
  117.                                     $resourceSettings \json_decode($dataportResource['resource'], true);
  118.                                     $parser->setSourceFile($resourceSettings['file'] ?? null);
  119.                                     if (!empty($resourceSettings['locale'])) {
  120.                                         \Pimcore::getContainer()->get('pimcore.locale')->setLocale($resourceSettings['locale']);
  121.                                     } else {
  122.                                         $resourceSettings['locale'] = \Pimcore::getContainer()->get('pimcore.locale')->getLocale();
  123.                                         if ($resourceSettings['locale'] === null) {
  124.                                             $resourceSettings['locale'] = Tool::getDefaultLanguage();
  125.                                         }
  126.                                     }
  127.                                     $source $parser->getFileConditionFromObject($object);
  128.                                     if ($source !== null) {
  129.                                         $dataportResourceIdsStillContainingObject[$source][$resourceSettings['locale']][] = $dataportResource['id'];
  130.                                     } elseif (empty($sourceConfig['incrementalExport'])) {
  131.                                         $source $parser->getFileConditionFromObject($objectfalse);
  132.                                         if ($source !== null) {
  133.                                             $tmpSourceConfig $sourceConfig;
  134.                                             if (\method_exists($parser'setConfig')) {
  135.                                                 if (\method_exists($parser'getConfig')) {
  136.                                                     $tmpSourceConfig $parser->getConfig();
  137.                                                 }
  138.                                                 $tmpSourceConfig['file'] = '';
  139.                                                 $tmpSourceConfig['dataportId'] = $dataport['id'];
  140.                                                 $parser->setConfig($tmpSourceConfig);
  141.                                             }
  142.                                             $parser->setSourceFile($source);
  143.                                             $keyFields = [];
  144.                                             foreach ($tmpSourceConfig['fields'] as $fieldIndex => $field) {
  145.                                                 if (!empty($field['exportKey'])) {
  146.                                                     $keyFields[$fieldIndex] = $field;
  147.                                                 }
  148.                                             }
  149.                                             $hashs = [];
  150.                                             if ($dataport['sourcetype'] === 'pimcore') {
  151.                                                 $locales Tool::getValidLanguages();
  152.                                                 if ($dataportResource !== null) {
  153.                                                     $resource \json_decode($dataportResource['resource'], true);
  154.                                                     if ($resource['locale']) {
  155.                                                         $locales = [$resource['locale']];
  156.                                                     }
  157.                                                 }
  158.                                                 foreach ($locales as $language) {
  159.                                                     \Pimcore::getContainer()->get('pimcore.locale')->setLocale($language);
  160.                                                     foreach ($parser as $rawItemData) {
  161.                                                         if ($rawItemData === null) {
  162.                                                             continue;
  163.                                                         }
  164.                                                         $rawItemData array_filter(
  165.                                                             $rawItemData,
  166.                                                             static function ($fieldId) use ($keyFields) {
  167.                                                                 return isset($keyFields[$fieldId]);
  168.                                                             },
  169.                                                             ARRAY_FILTER_USE_KEY
  170.                                                         );
  171.                                                         $hashs[] = Importer::getHash($rawItemData);
  172.                                                     }
  173.                                                 }
  174.                                             } else {
  175.                                                 foreach ($parser as $rawItemData) {
  176.                                                     if ($rawItemData === null) {
  177.                                                         continue;
  178.                                                     }
  179.                                                     $rawItemData array_filter(
  180.                                                         $rawItemData,
  181.                                                         static function ($fieldId) use ($keyFields) {
  182.                                                             return isset($keyFields[$fieldId]);
  183.                                                         },
  184.                                                         ARRAY_FILTER_USE_KEY
  185.                                                     );
  186.                                                     $hashs[] = Importer::getHash($rawItemData);
  187.                                                 }
  188.                                             }
  189.                                             if(count($hashs) > 0) {
  190.                                                 $countDeletedRawItems Db::get()->executeUpdate('DELETE FROM '.Installer::TABLE_RAWITEM.' WHERE dataport_resource_id = ? AND hash IN ("'.implode('","'$hashs).'")', [$dataportResource['id']]);
  191.                                                 if ($countDeletedRawItems) {
  192.                                                     $this->queueProcessRawData($dataportResource['id'], $dataport['id']);
  193.                                                 }
  194.                                             }
  195.                                         }
  196.                                     }
  197.                                 } catch (\Throwable $e) {
  198.                                     if(!$e instanceof SkipTriggerAutomaticImportException) {
  199.                                         $error 'Check for automatic start for dataport #'.$dataport['id'].' failed: '.(string)$e;
  200.                                         if($this->logger) {
  201.                                             $this->logger->error($error);
  202.                                         } else {
  203.                                             Logger::error($error);
  204.                                         }
  205.                                     }
  206.                                 }
  207.                             }
  208.                             foreach($dataportResourceIdsStillContainingObject as $dataportResourceQuery => $dataportResourcesWithSameLocale) {
  209.                                 foreach($dataportResourcesWithSameLocale as $locale => $dataportResourceIds) {
  210.                                     if (empty($sourceConfig['incrementalExport'])) {
  211.                                         if ($dataport['sourcetype'] === 'pimcore' && $this->extractIds($dataportResourceQuery$match)) {
  212.                                             $ids $match[1];
  213.                                             sort($idsSORT_NUMERIC);
  214.                                             foreach ($ids as $id) {
  215.                                                 $queueAddItems[] = [
  216.                                                     'command' => 'data-director:extract '.$dataport['id'].' "'.str_replace([$match[0], '"''$''`'], ['='.$id'\\"''\\$''\\`'], $dataportResourceQuery).'"'.(!empty($locale) ? ' --locale='.$locale '').' --dataport-resource-id='.implode(','$dataportResourceIds),
  217.                                                     'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
  218.                                                     'worker_id' => $dataport['id']
  219.                                                 ];
  220.                                             }
  221.                                         } else {
  222.                                             $queueAddItems[] = [
  223.                                                 'command' => 'data-director:extract '.$dataport['id'].' "'.str_replace(['"''$''`'], ['\\"''\\$''\\`'], $dataportResourceQuery).'"'.(!empty($locale) ? ' --locale='.$locale '').' --dataport-resource-id='.implode(','$dataportResourceIds),
  224.                                                 'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
  225.                                                 'worker_id' => $dataport['id']
  226.                                             ];
  227.                                         }
  228.                                     } elseif ($dataport['sourcetype'] === 'pimcore' && ($ids $this->extractIds($dataportResourceQuery$match))) {
  229.                                         $ids $match[1];
  230.                                         sort($idsSORT_NUMERIC);
  231.                                         foreach ($ids as $id) {
  232.                                             $queueAddItems[] = [
  233.                                                 'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace([$match[0], '"''$''`'], ['='.$id'\\"''\\$''\\`'], $dataportResourceQuery).'"',
  234.                                                 'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
  235.                                                 'worker_id' => $dataport['id']
  236.                                             ];
  237.                                         }
  238.                                     } else {
  239.                                         $queueAddItems[] = [
  240.                                             'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace(['"''$''`'], ['\\"''\\$''\\`'], $dataportResourceQuery).'"',
  241.                                             'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
  242.                                             'worker_id' => $dataport['id']
  243.                                         ];
  244.                                     }
  245.                                 }
  246.                             }
  247.                         } else {
  248.                             try {
  249.                                 // Import
  250.                                 if (\method_exists($parser'getConfig')) {
  251.                                     $parser->setSourceFile($parser->getConfig()['file'] ?? null);
  252.                                 }
  253.                                 $source $parser->getFileConditionFromObject($object);
  254.                                 if ($source !== null) {
  255.                                     if ($dataport['sourcetype'] === 'pimcore' && preg_match('/ IN \(((\d+,?)+)\)/'$source$match)) {
  256.                                         $ids explode(','$match[1]);
  257.                                         sort($idsSORT_NUMERIC);
  258.                                         foreach ($ids as $id) {
  259.                                             $queueAddItems[] = [
  260.                                                 'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace([$match[0], '"''$''`'], ['='.$id'\\"''\\$''\\`'], $source).'"'.(($user instanceof User) ? ' --user='.$user->getId() : ''),
  261.                                                 'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
  262.                                                 'worker_id' => $dataport['id']
  263.                                             ];
  264.                                         }
  265.                                     } else {
  266.                                         $queueAddItems[] = [
  267.                                             'command' => 'data-director:complete '.$dataport['id'].' "'.str_replace(['"''$''`'], ['\\"''\\$''\\`'], $source).'"',
  268.                                             'triggered_by' => 'Element '.$object->getFullPath().' (#'.$object->getId().') got saved'.(($user instanceof User) ? ' by '.$user->getUsername() : ''),
  269.                                             'worker_id' => $dataport['id']
  270.                                         ];
  271.                                     }
  272.                                 }
  273.                             } catch(SkipTriggerAutomaticImportException $e) {
  274.                             }
  275.                         }
  276.                     }
  277.                 }
  278.             } catch (\Exception $e) {
  279.                 if($this->logger) {
  280.                     $this->logger->error('Automatic processing of dataport #'.$dataport['id'].' "'.$dataport['name'].'" failed:'. (string)$e);
  281.                 } else {
  282.                     Logger::error('Automatic processing of dataport #'.$dataport['id'].' "'.$dataport['name'].'" failed:'.(string)$e);
  283.                 }
  284.             } finally {
  285.                 \Pimcore::getContainer()->get('pimcore.locale')->setLocale($originalLocale);
  286.             }
  287.         }
  288.         if (count($queueAddItems) > 0) {
  289.             $queue->create($queueAddItems);
  290.         }
  291.     }
  292.     public function deleteRawdata(ElementEventInterface $e)
  293.     {
  294.         if (\method_exists($e'getArgument')) {
  295.             try {
  296.                 $saveVersionOnly $e->getArgument('saveVersionOnly');
  297.                 if ($saveVersionOnly) {
  298.                     return;
  299.                 }
  300.             } catch (\InvalidArgumentException $exception) {
  301.             }
  302.         }
  303.         $object $e->getElement();
  304.         $dataports Dataport::getInstance();
  305.         $objectType Service::getElementType($object);
  306.         $commandPrefix '"'.Cli::getPhpCli().'" '.realpath(PIMCORE_PROJECT_ROOT.DIRECTORY_SEPARATOR.'bin'.DIRECTORY_SEPARATOR.'console');
  307.         foreach (self::getDataports() as $dataport) {
  308.             $sourceConfig $dataport['sourceconfig'];
  309.             $targetConfig $dataport['targetconfig'];
  310.             if (!empty($sourceConfig['autoImport']) && empty($targetConfig['itemClass'])) {
  311.                 $itemMold $this->itemMoldBuilder->getItemMoldByClassId($sourceConfig['sourceClass'] ?? null);
  312.                 if ($object instanceof $itemMold) {
  313.                     Cli::exec($commandPrefix.' data-director:delete-rawdata --dataport='.$dataport['id'].' --object-id='.$object->getId().' --object-type='.$objectType);
  314.                 }
  315.             }
  316.         }
  317.     }
  318.     private function queueProcessRawData($dataportResourceId$dataportId) {
  319.         if (!isset(self::$processCommandRegistered[$dataportResourceId])) {
  320.             self::$processCommandRegistered[$dataportResourceId] = true;
  321.             $queue Queue::getInstance();
  322.             $cmd 'data-director:process ' $dataportId ' --dataport-resource-id=' $dataportResourceId;
  323.             register_shutdown_function(
  324.                 static function () use ($queue$cmd$dataportId) {
  325.                     $queue->create([
  326.                         'command' => $cmd,
  327.                         'triggered_by' => 'An item got deleted from raw data -> new result document has to be generated',
  328.                         'worker_id' => $dataportId
  329.                     ]);
  330.                 }
  331.             );
  332.         }
  333.     }
  334.     private function extractIds($query, &$match) {
  335.         $startTerm ' IN (';
  336.         $endTerm ')';
  337.         $start strpos($query$startTerm);
  338.         if($start === false) {
  339.             return null;
  340.         }
  341.         $end strpos($query$endTerm$start);
  342.         if($end === false) {
  343.             return null;
  344.         }
  345.         $match = [
  346.             substr($query$start$end $start+strlen($endTerm)),
  347.             explode(','substr($query$start strlen($startTerm), $end $start strlen($startTerm)))
  348.         ];
  349.         return true;
  350.     }
  351. }