Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle index updates asynchronously #57

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ valantic_elastica_bridge:
should_skip_failing_documents: false
```

## Queue

[Set up a worker](https://symfony.com/doc/current/messenger.html#consuming-messages-running-the-worker) to process `elastica_bridge_index`. Alternatively you can route the transport to use the `sync` handler: `framework.messenger.transports.elastica_bridge_index: 'sync'`.

## Indexing

### Bulk
Expand Down Expand Up @@ -96,6 +100,8 @@ The bridge automatically listens to Pimcore events and updates documents as need

This can be globally disabled by calling `\Valantic\ElasticaBridgeBundle\EventListener\Pimcore\ChangeListener::disableListener();`.

You can also dispatch a `Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement` message to handle updates to related objects which are not triggered by the `ChangeListener`.

## Status

```
Expand Down
1 change: 1 addition & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Remove deprecated options `valantic_elastica_bridge.client.host` and `valantic_elastica_bridge.client.port`. Use `valantic_elastica_bridge.client.dsn` instead, e.g. `http://localhost:9200`
- Renamed `valantic_elastica_bridge.client.addSentryBreadcrumbs` to `valantic_elastica_bridge.client.should_add_sentry_breadcrumbs`
- See [README#Queue](./README#queue) to set up the **required** Symfony Messenger workers.

## Upgrade from v2 to v3

Expand Down
2 changes: 1 addition & 1 deletion src/Command/Cleanup.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private function getIndices(): array

$indices = [];

foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->usesBlueGreenIndices()) {
$indices[] = $indexConfig->getBlueGreenActiveElasticaIndex()->getName();
$indices[] = $indexConfig->getBlueGreenInactiveElasticaIndex()->getName();
Expand Down
20 changes: 4 additions & 16 deletions src/Command/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Process\Process;
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
use Valantic\ElasticaBridgeBundle\Enum\IndexBlueGreenSuffix;
use Valantic\ElasticaBridgeBundle\Exception\Index\BlueGreenIndicesIncorrectlySetupException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\LockService;
use Valantic\ElasticaBridgeBundle\Util\ElasticsearchResponse;

class Index extends BaseCommand
Expand All @@ -34,8 +32,7 @@ public function __construct(
private readonly IndexRepository $indexRepository,
private readonly ElasticsearchClient $esClient,
private readonly KernelInterface $kernel,
private readonly LockFactory $lockFactory,
private readonly ConfigurationRepository $configurationRepository,
private readonly LockService $lockService,
) {
parent::__construct();
}
Expand Down Expand Up @@ -73,7 +70,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$skippedIndices = [];

foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if (
is_array($this->input->getArgument(self::ARGUMENT_INDEX))
&& count($this->input->getArgument(self::ARGUMENT_INDEX)) > 0
Expand All @@ -84,7 +81,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
continue;
}

$lock = $this->getLock($indexConfig);
$lock = $this->lockService->getIndexingLock($indexConfig);

if (!$lock->acquire()) {
if ($this->input->getOption(self::OPTION_LOCK_RELEASE) === true) {
Expand Down Expand Up @@ -260,13 +257,4 @@ private function ensureCorrectBlueGreenIndexSetup(IndexInterface $indexConfig):

$this->output->writeln('<comment>-> Ensured indices are correctly set up with alias</comment>');
}

private function getLock(mixed $indexConfig): LockInterface
{
return $this->lockFactory
->createLock(
__METHOD__ . '->' . $indexConfig->getName(),
ttl: $this->configurationRepository->getIndexingLockTimeout()
);
}
}
2 changes: 1 addition & 1 deletion src/Command/PopulateIndex.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

private function getIndex(): ?IndexInterface
{
foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->getName() === $this->input->getOption(self::OPTION_CONFIG)) {
return $indexConfig;
}
Expand Down
12 changes: 10 additions & 2 deletions src/Command/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$this->output->writeln('');

foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
$this->processBundleIndex($indexConfig);
}

Expand All @@ -67,11 +67,19 @@ protected function execute(InputInterface $input, OutputInterface $output): int
->setHeaderTitle('Indices (managed by this bundle)');
$table->render();

$otherIndexNames = [];

foreach ($this->esClient->getCluster()->getIndexNames() as $indexName) {
if (in_array($indexName, $this->skipOtherIndices, true) || !$this->shouldProcessNonBundleIndex($indexName)) {
continue;
}
$this->processOtherIndex($indexName);
$otherIndexNames[] = $indexName;
}

sort($otherIndexNames);

foreach ($otherIndexNames as $otherIndexName) {
$this->processOtherIndex($otherIndexName);
}

$this->output->writeln('');
Expand Down
7 changes: 4 additions & 3 deletions src/EventListener/Pimcore/ChangeListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use Pimcore\Model\Document;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Valantic\ElasticaBridgeBundle\Exception\EventListener\PimcoreElementNotFoundException;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement;

/**
* An abstract listener for DataObject and Document listeners.
Expand All @@ -28,7 +29,7 @@ class ChangeListener implements EventSubscriberInterface
private static bool $isEnabled = true;

public function __construct(
private readonly PropagateChanges $propagateChanges,
private readonly MessageBusInterface $messageBus,
) {}

public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void
Expand All @@ -45,7 +46,7 @@ public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void
return;
}

$this->propagateChanges->handle($this->getFreshElement($element));
$this->messageBus->dispatch(new RefreshElement($this->getFreshElement($element)));
}

public static function enableListener(): void
Expand Down
45 changes: 45 additions & 0 deletions src/Messenger/Handler/AbstractRefreshHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\DataObject\Concrete;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Valantic\ElasticaBridgeBundle\Exception\EventListener\PimcoreElementNotFoundException;
use Valantic\ElasticaBridgeBundle\Messenger\Message\AbstractRefresh;

/** @template TModel of AbstractElement */
#[AsMessageHandler]
abstract class AbstractRefreshHandler
{
protected function resolveElement(AbstractRefresh $message): AbstractElement
{
/** @var class-string<TModel> $className */
$className = $message->className;

try {
$element = $className::getById($message->id);
} catch (\Throwable) {
throw new PimcoreElementNotFoundException($message->id, $message->className);
}

if (!$element instanceof AbstractElement) {
// The element in question was deleted so we need a skeleton.
/** @var TModel $element */
$element = new ($className)();
$element->setId($message->id);

if ($element instanceof Concrete) {
$element->setPublished(false);
}
}

if ($element === null) {
throw new PimcoreElementNotFoundException($message->id, $message->className);
}

return $element;
}
}
28 changes: 28 additions & 0 deletions src/Messenger/Handler/RefreshElementHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\Element\AbstractElement;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;

/**
* @template TModel of AbstractElement
*
* @extends AbstractRefreshHandler<TModel>
*/
class RefreshElementHandler extends AbstractRefreshHandler
{
public function __construct(
private readonly PropagateChanges $propagateChanges,
) {}

public function __invoke(RefreshElement $message): void
{
$element = $this->resolveElement($message);

$this->propagateChanges->handle($element);
}
}
37 changes: 37 additions & 0 deletions src/Messenger/Handler/RefreshElementInIndexHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\Element\AbstractElement;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\LockService;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;

/**
* @template TModel of AbstractElement
*
* @extends AbstractRefreshHandler<TModel>
*/
class RefreshElementInIndexHandler extends AbstractRefreshHandler
{
public function __construct(
private readonly PropagateChanges $propagateChanges,
private readonly LockService $lockService,
private readonly IndexRepository $indexRepository,
) {}

public function __invoke(RefreshElementInIndex $message): void
{
$index = $this->indexRepository->flattenedGet($message->index);
$element = $this->resolveElement($message);

if ($index->usesBlueGreenIndices() && !$this->lockService->getIndexingLock($index)->acquire()) {
$this->propagateChanges->handleIndex($element, $index, $index->getBlueGreenInactiveElasticaIndex());
}

$this->propagateChanges->handleIndex($element, $index);
}
}
20 changes: 20 additions & 0 deletions src/Messenger/Message/AbstractRefresh.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

abstract class AbstractRefresh
{
/** @var class-string<ElementInterface> */
public string $className;
public int $id;

protected function setElement(ElementInterface $element): void
{
$this->className = $element::class;
$this->id = $element->getId() ?? throw new \InvalidArgumentException('Pimcore ID is null.');
}
}
15 changes: 15 additions & 0 deletions src/Messenger/Message/RefreshElement.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

class RefreshElement extends AbstractRefresh
{
public function __construct(ElementInterface $element)
{
$this->setElement($element);
}
}
17 changes: 17 additions & 0 deletions src/Messenger/Message/RefreshElementInIndex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

class RefreshElementInIndex extends AbstractRefresh
{
public function __construct(
ElementInterface $element,
public readonly string $index,
) {
$this->setElement($element);
}
}
14 changes: 13 additions & 1 deletion src/Repository/IndexRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Valantic\ElasticaBridgeBundle\Repository;

use Valantic\ElasticaBridgeBundle\Exception\Repository\ItemNotFoundInRepositoryException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Index\TenantAwareInterface;

Expand All @@ -17,7 +18,7 @@ class IndexRepository extends AbstractRepository
/**
* @return \Generator<string,IndexInterface,void,void>
*/
public function flattened(): \Generator
public function flattenedAll(): \Generator
{
foreach ($this->all() as $indexConfig) {
if ($indexConfig instanceof TenantAwareInterface) {
Expand All @@ -32,4 +33,15 @@ public function flattened(): \Generator
}
}
}

public function flattenedGet(string $key): IndexInterface
{
foreach ($this->flattenedAll() as $candidateKey => $index) {
if ($candidateKey === $key) {
return $index;
}
}

throw new ItemNotFoundInRepositoryException($key);
}
}
2 changes: 2 additions & 0 deletions src/Resources/config/pimcore/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
imports:
- { resource: messenger.yaml }
8 changes: 8 additions & 0 deletions src/Resources/config/pimcore/messenger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
framework:
messenger:
enabled: true
transports:
elastica_bridge_index: 'doctrine://default?queue_name=elastica_bridge_index'
routing:
Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement: elastica_bridge_index
Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex: elastica_bridge_index
5 changes: 5 additions & 0 deletions src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ services:
Valantic\ElasticaBridgeBundle\Repository\DocumentRepository:
arguments:
- !tagged_iterator valantic.elastica_bridge.document

Valantic\ElasticaBridgeBundle\Messenger\Handler\:
resource: '../../Messenger/Handler/*'
tags:
- { name: messenger.message_handler }
Loading
Loading