Skip to content

Commit 683174a

Browse files
committed
TASK: Implement bulk indexing support
1 parent 6da5966 commit 683174a

2 files changed

Lines changed: 129 additions & 119 deletions

File tree

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/IndexingJob.php

Lines changed: 126 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -17,122 +17,131 @@
1717
/**
1818
* ElasticSearch Indexing Job Interface
1919
*/
20-
class IndexingJob implements JobInterface {
21-
22-
/**
23-
* @var NodeIndexer
24-
* @Flow\Inject
25-
*/
26-
protected $nodeIndexer;
27-
28-
/**
29-
* @var NodeDataRepository
30-
* @Flow\Inject
31-
*/
32-
protected $nodeDataRepository;
33-
34-
/**
35-
* @var NodeFactory
36-
* @Flow\Inject
37-
*/
38-
protected $nodeFactory;
39-
40-
/**
41-
* @var ContextFactory
42-
* @Flow\Inject
43-
*/
44-
protected $contextFactory;
45-
46-
/**
47-
* @var SystemLoggerInterface
48-
* @Flow\Inject
49-
*/
50-
protected $logger;
51-
52-
/**
53-
* @var string
54-
*/
55-
protected $identifier;
56-
57-
/**
58-
* @var string
59-
*/
60-
protected $workspaceName;
61-
62-
/**
63-
* @var string
64-
*/
65-
protected $indexPostfix;
66-
67-
/**
68-
* @var array
69-
*/
70-
protected $nodes = [];
71-
72-
/**
73-
* @param string $indexPostfix
74-
* @param string $workspaceName
75-
* @param array $nodes
76-
*/
77-
public function __construct($indexPostfix, $workspaceName, array $nodes) {
78-
$this->identifier = Algorithms::generateRandomString(24);
79-
$this->workspaceName = $workspaceName;
80-
$this->indexPostfix = $indexPostfix;
81-
$this->nodes = $nodes;
82-
}
83-
84-
/**
85-
* Execute the job
86-
* A job should finish itself after successful execution using the queue methods.
87-
*
88-
* @param QueueInterface $queue
89-
* @param Message $message The original message
90-
* @return boolean TRUE if the job was executed successfully and the message should be finished
91-
*/
92-
public function execute(QueueInterface $queue, Message $message) {
93-
foreach ($this->nodes as $node) {
94-
/** @var NodeData $nodeData */
95-
$nodeData = $this->nodeDataRepository->findByIdentifier($node['nodeIdentifier']);
96-
$context = $this->contextFactory->create([
97-
'workspaceName' => $this->workspaceName,
98-
'invisibleContentShown' => true,
99-
'inaccessibleContentShown' => false,
100-
'dimensions' => $node['dimensions']
101-
]);
102-
$currentNode = $this->nodeFactory->createFromNodeData($nodeData, $context);
103-
104-
// Skip this iteration if the node can not be fetched from the current context
105-
if (!$currentNode instanceof NodeInterface) {
106-
$this->logger->log(sprintf('Node with identifier %s could not be processed', $node['nodeIdentifier']));
107-
continue;
108-
}
109-
110-
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
111-
$this->logger->log(sprintf('Process indexing job for %s', $currentNode));
112-
$this->nodeIndexer->indexNode($currentNode);
113-
}
114-
115-
$this->nodeIndexer->flush();
116-
117-
return TRUE;
118-
}
119-
120-
/**
121-
* Get an optional identifier for the job
122-
*
123-
* @return string A job identifier
124-
*/
125-
public function getIdentifier() {
126-
return $this->identifier;
127-
}
128-
129-
/**
130-
* Get a readable label for the job
131-
*
132-
* @return string A label for the job
133-
*/
134-
public function getLabel() {
135-
return sprintf('ElasticSearch Indexing Job (%s)', $this->getIdentifier());
136-
}
20+
class IndexingJob implements JobInterface
21+
{
22+
23+
/**
24+
* @var NodeIndexer
25+
* @Flow\Inject
26+
*/
27+
protected $nodeIndexer;
28+
29+
/**
30+
* @var NodeDataRepository
31+
* @Flow\Inject
32+
*/
33+
protected $nodeDataRepository;
34+
35+
/**
36+
* @var NodeFactory
37+
* @Flow\Inject
38+
*/
39+
protected $nodeFactory;
40+
41+
/**
42+
* @var ContextFactory
43+
* @Flow\Inject
44+
*/
45+
protected $contextFactory;
46+
47+
/**
48+
* @var SystemLoggerInterface
49+
* @Flow\Inject
50+
*/
51+
protected $logger;
52+
53+
/**
54+
* @var string
55+
*/
56+
protected $identifier;
57+
58+
/**
59+
* @var string
60+
*/
61+
protected $workspaceName;
62+
63+
/**
64+
* @var string
65+
*/
66+
protected $indexPostfix;
67+
68+
/**
69+
* @var array
70+
*/
71+
protected $nodes = [];
72+
73+
/**
74+
* @param string $indexPostfix
75+
* @param string $workspaceName
76+
* @param array $nodes
77+
*/
78+
public function __construct($indexPostfix, $workspaceName, array $nodes)
79+
{
80+
$this->identifier = Algorithms::generateRandomString(24);
81+
$this->workspaceName = $workspaceName;
82+
$this->indexPostfix = $indexPostfix;
83+
$this->nodes = $nodes;
84+
}
85+
86+
/**
87+
* Execute the job
88+
* A job should finish itself after successful execution using the queue methods.
89+
*
90+
* @param QueueInterface $queue
91+
* @param Message $message The original message
92+
* @return boolean TRUE if the job was executed successfully and the message should be finished
93+
*/
94+
public function execute(QueueInterface $queue, Message $message)
95+
{
96+
$this->nodeIndexer->withBulkProcessing(function () {
97+
foreach ($this->nodes as $node) {
98+
/** @var NodeData $nodeData */
99+
$nodeData = $this->nodeDataRepository->findByIdentifier($node['nodeIdentifier']);
100+
$context = $this->contextFactory->create([
101+
'workspaceName' => $this->workspaceName,
102+
'invisibleContentShown' => true,
103+
'inaccessibleContentShown' => false,
104+
'dimensions' => $node['dimensions']
105+
]);
106+
$currentNode = $this->nodeFactory->createFromNodeData($nodeData, $context);
107+
108+
// Skip this iteration if the node can not be fetched from the current context
109+
if (!$currentNode instanceof NodeInterface) {
110+
$this->logger->log(sprintf('Node with identifier %s could not be processed', $node['nodeIdentifier']));
111+
continue;
112+
}
113+
114+
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
115+
$this->logger->log(sprintf('Process indexing job for %s', $currentNode));
116+
117+
$this->nodeIndexer->indexNode($currentNode);
118+
119+
}
120+
121+
$this->nodeIndexer->flush();
122+
});
123+
124+
return true;
125+
}
126+
127+
/**
128+
* Get an optional identifier for the job
129+
*
130+
* @return string A job identifier
131+
*/
132+
public function getIdentifier()
133+
{
134+
return $this->identifier;
135+
}
136+
137+
/**
138+
* Get a readable label for the job
139+
*
140+
* @return string A label for the job
141+
*/
142+
public function getLabel()
143+
{
144+
return sprintf('ElasticSearch Indexing Job (%s)', $this->getIdentifier());
145+
}
137146

138147
}

composer.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
"description": "Neos CMS ElasticSearch indexer based on beanstalkd (job queue)",
55
"license": "MIT",
66
"require": {
7-
"typo3/flow": "*",
8-
"typo3/jobqueue-beanstalkd": "*"
7+
"typo3/jobqueue-beanstalkd": "*",
8+
"flowpack/elasticsearch-contentrepositoryadaptor": "^2.0.2",
9+
"typo3/typo3cr-search": "^2.1.1"
910
},
1011
"autoload": {
1112
"psr-0": {

0 commit comments

Comments
 (0)