Skip to content

Commit fc7a179

Browse files
committed
TASK: Add RemovalJob to handle removal asynchronously
This complements the async handling of node indexing.
1 parent 5eabf7a commit fc7a179

2 files changed

Lines changed: 171 additions & 0 deletions

File tree

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Indexer/NodeIndexer.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use Flowpack\ElasticSearch\ContentRepositoryAdaptor;
55
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Command\NodeIndexQueueCommandController;
66
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\IndexingJob;
7+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\RemovalJob;
78
use Flowpack\JobQueue\Common\Job\JobManager;
89
use Neos\Flow\Annotations as Flow;
910
use Neos\Flow\Persistence\PersistenceManagerInterface;
@@ -50,4 +51,23 @@ public function indexNode(NodeInterface $node, $targetWorkspaceName = null)
5051
]);
5152
$this->jobManager->queue(NodeIndexQueueCommandController::LIVE_QUEUE_NAME, $indexingJob);
5253
}
54+
55+
/**
56+
* @param NodeInterface $node
57+
* @param string|null $targetWorkspaceName In case indexing is triggered during publishing, a target workspace name will be passed in
58+
*/
59+
public function removeNode(NodeInterface $node, $targetWorkspaceName = null)
60+
{
61+
if ($this->enableLiveAsyncIndexing !== true) {
62+
parent::removeNode($node, $targetWorkspaceName);
63+
return;
64+
}
65+
$removalJob = new RemovalJob($this->indexNamePostfix, $targetWorkspaceName, [
66+
[
67+
'nodeIdentifier' => $this->persistenceManager->getIdentifierByObject($node->getNodeData()),
68+
'dimensions' => $node->getDimensions()
69+
]
70+
]);
71+
$this->jobManager->queue(NodeIndexQueueCommandController::LIVE_QUEUE_NAME, $removalJob);
72+
}
5373
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer;
3+
4+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\NodeIndexer;
5+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Domain\Repository\NodeDataRepository;
6+
use Flowpack\JobQueue\Common\Job\JobInterface;
7+
use Flowpack\JobQueue\Common\Queue\Message;
8+
use Flowpack\JobQueue\Common\Queue\QueueInterface;
9+
use Neos\ContentRepository\Domain\Model\NodeData;
10+
use Neos\Flow\Annotations as Flow;
11+
use Neos\Flow\Utility\Algorithms;
12+
use Neos\ContentRepository\Domain\Factory\NodeFactory;
13+
use Neos\ContentRepository\Domain\Model\NodeInterface;
14+
use Neos\ContentRepository\Domain\Service\ContextFactoryInterface;
15+
16+
/**
17+
* Elasticsearch Node Removal Job
18+
*/
19+
class RemovalJob implements JobInterface
20+
{
21+
use LoggerTrait;
22+
23+
/**
24+
* @var NodeIndexer
25+
* @Flow\Inject
26+
*/
27+
protected $nodeIndexer;
28+
29+
/**
30+
* @var NodeDataRepository
31+
* @Flow\Inject
32+
*/
33+
protected $nodeDataRepository;
34+
35+
/**
36+
* @var NodeFactory
37+
* @Flow\Inject
38+
*/
39+
protected $nodeFactory;
40+
41+
/**
42+
* @var ContextFactoryInterface
43+
* @Flow\Inject
44+
*/
45+
protected $contextFactory;
46+
47+
/**
48+
* @var string
49+
*/
50+
protected $identifier;
51+
52+
/**
53+
* @var string
54+
*/
55+
protected $targetWorkspaceName;
56+
57+
/**
58+
* @var string
59+
*/
60+
protected $indexPostfix;
61+
62+
/**
63+
* @var array
64+
*/
65+
protected $nodes = [];
66+
67+
/**
68+
* @param string $indexPostfix
69+
* @param string $targetWorkspaceName In case indexing is triggered during publishing, a target workspace name will be passed in
70+
* @param array $nodes
71+
*/
72+
public function __construct($indexPostfix, $targetWorkspaceName, array $nodes)
73+
{
74+
$this->identifier = Algorithms::generateRandomString(24);
75+
$this->targetWorkspaceName = $targetWorkspaceName;
76+
$this->indexPostfix = $indexPostfix;
77+
$this->nodes = $nodes;
78+
}
79+
80+
/**
81+
* Execute the job removal of nodes.
82+
*
83+
* @param QueueInterface $queue
84+
* @param Message $message The original message
85+
* @return boolean TRUE if the job was executed successfully and the message should be finished
86+
*/
87+
public function execute(QueueInterface $queue, Message $message)
88+
{
89+
$this->nodeIndexer->withBulkProcessing(function () {
90+
$numberOfNodes = count($this->nodes);
91+
$startTime = microtime(true);
92+
foreach ($this->nodes as $node) {
93+
/** @var NodeData $nodeData */
94+
$nodeData = $this->nodeDataRepository->findByIdentifier($node['nodeIdentifier']);
95+
96+
// Skip this iteration if the nodedata can not be fetched (deleted node)
97+
if (!$nodeData instanceof NodeData) {
98+
$this->log(sprintf('action=removal step=failed node=%s message="Node data could not be loaded"', $node['nodeIdentifier']));
99+
continue;
100+
}
101+
102+
$context = $this->contextFactory->create([
103+
'workspaceName' => $this->targetWorkspaceName ?: $nodeData->getWorkspace()->getName(),
104+
'invisibleContentShown' => true,
105+
'removedContentShown' => true,
106+
'inaccessibleContentShown' => false,
107+
'dimensions' => $node['dimensions']
108+
]);
109+
$currentNode = $this->nodeFactory->createFromNodeData($nodeData, $context);
110+
111+
// Skip this iteration if the node can not be fetched from the current context
112+
if (!$currentNode instanceof NodeInterface) {
113+
$this->log(sprintf('action=removal step=failed node=%s message="Node could not be processed"', $node['nodeIdentifier']));
114+
continue;
115+
}
116+
117+
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
118+
$this->log(sprintf('action=removal step=started node=%s', $currentNode->getIdentifier()));
119+
120+
$this->nodeIndexer->removeNode($currentNode);
121+
}
122+
123+
$this->nodeIndexer->flush();
124+
$duration = microtime(true) - $startTime;
125+
$rate = $numberOfNodes / $duration;
126+
$this->log(sprintf('action=removal step=finished number_of_nodes=%d duration=%f nodes_per_second=%f', $numberOfNodes, $duration, $rate));
127+
});
128+
129+
return true;
130+
}
131+
132+
/**
133+
* Get an optional identifier for the job
134+
*
135+
* @return string A job identifier
136+
*/
137+
public function getIdentifier()
138+
{
139+
return $this->identifier;
140+
}
141+
142+
/**
143+
* Get a readable label for the job
144+
*
145+
* @return string A label for the job
146+
*/
147+
public function getLabel()
148+
{
149+
return sprintf('Elasticsearch Removal Job (%s)', $this->getIdentifier());
150+
}
151+
}

0 commit comments

Comments
 (0)