Skip to content

Commit bac64c8

Browse files
committed
[TASK] Use bulk indexing in IndexingJob
This change register 100 node per IndexingJob to use bulk indexing and fast ES index processing.
1 parent e8dfef2 commit bac64c8

2 files changed

Lines changed: 53 additions & 49 deletions

File tree

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,22 @@ public function updateAliasCommand($indexPostfix) {
107107
*/
108108
protected function indexWorkspace($workspaceName, $indexPostfix) {
109109
$offset = 0;
110-
$batchSize = 1000;
110+
$batchSize = 100;
111111
while (TRUE) {
112-
$result = $this->nodeDataRepository->findAllBySiteAndWorkspace($workspaceName, $offset, 1000);
112+
$result = $this->nodeDataRepository->findAllBySiteAndWorkspace($workspaceName, $offset, $batchSize);
113113
if ($result === array()) {
114114
break;
115115
}
116+
$jobData = [];
116117
foreach ($result as $data) {
117-
$indexingJob = new IndexingJob(
118-
$indexPostfix,
119-
$data['nodeIdentifier'],
120-
$workspaceName,
121-
$data['dimensions']
122-
);
123-
$this->jobManager->queue('Flowpack.ElasticSearch.ContentRepositoryQueueIndexer', $indexingJob);
118+
$jobData[] = [
119+
'nodeIdentifier' => $data['nodeIdentifier'],
120+
'dimensions' => $data['dimensions']
121+
122+
];
124123
}
124+
$indexingJob = new IndexingJob($indexPostfix, $workspaceName, $jobData);
125+
$this->jobManager->queue('Flowpack.ElasticSearch.ContentRepositoryQueueIndexer', $indexingJob);
125126
$this->output('.');
126127
$offset += $batchSize;
127128
}

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/IndexingJob.php

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\NodeIndexer;
1515
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\LoggerInterface;
1616
use TYPO3\Flow\Annotations as Flow;
17+
use TYPO3\Flow\Utility\Algorithms;
1718
use TYPO3\Jobqueue\Common\Job\JobInterface;
1819
use TYPO3\Jobqueue\Common\Queue\Message;
1920
use TYPO3\Jobqueue\Common\Queue\QueueInterface;
@@ -29,66 +30,65 @@
2930
class IndexingJob implements JobInterface {
3031

3132
/**
32-
* @var string
33+
* @var NodeIndexer
34+
* @Flow\Inject
3335
*/
34-
protected $indexPostfix;
36+
protected $nodeIndexer;
3537

3638
/**
37-
* @var string
39+
* @var NodeDataRepository
40+
* @Flow\Inject
3841
*/
39-
protected $nodeIdentifier;
42+
protected $nodeDataRepository;
4043

4144
/**
42-
* @var string
45+
* @var NodeFactory
46+
* @Flow\Inject
4347
*/
44-
protected $workspaceName;
48+
protected $nodeFactory;
4549

4650
/**
47-
* @var string
51+
* @var ContextFactory
52+
* @Flow\Inject
4853
*/
49-
protected $dimensions;
54+
protected $contextFactory;
5055

5156
/**
57+
* @var LoggerInterface
5258
* @Flow\Inject
53-
* @var NodeIndexer
5459
*/
55-
protected $nodeIndexer;
60+
protected $logger;
5661

5762
/**
58-
* @Flow\Inject
59-
* @var NodeDataRepository
63+
* @var string
6064
*/
61-
protected $nodeDataRepository;
65+
protected $identifier;
6266

6367
/**
64-
* @Flow\Inject
65-
* @var NodeFactory
68+
* @var string
6669
*/
67-
protected $nodeFactory;
70+
protected $workspaceName;
6871

6972
/**
70-
* @Flow\Inject
71-
* @var ContextFactory
73+
* @var string
7274
*/
73-
protected $contextFactory;
75+
protected $indexPostfix;
7476

7577
/**
76-
* @Flow\Inject
77-
* @var LoggerInterface
78+
* @var array
7879
*/
79-
protected $logger;
80+
protected $nodes = [];
8081

8182
/**
8283
* @param string $indexPostfix
83-
* @param string $nodeIdentifier
8484
* @param string $workspaceName
85-
* @param array $dimensions
85+
* @param array $nodes
8686
*/
87-
public function __construct($indexPostfix, $nodeIdentifier, $workspaceName, array $dimensions = array()) {
88-
$this->indexPostfix = $indexPostfix;
89-
$this->nodeIdentifier = $nodeIdentifier;
87+
public function __construct($indexPostfix, $workspaceName, array $nodes) {
88+
$this->identifier = Algorithms::generateRandomString(24);
9089
$this->workspaceName = $workspaceName;
91-
$this->dimensions = $dimensions;
90+
$this->indexPostfix = $indexPostfix;
91+
$this->nodes = $nodes;
9292
}
9393

9494
/**
@@ -100,18 +100,21 @@ public function __construct($indexPostfix, $nodeIdentifier, $workspaceName, arra
100100
* @return boolean TRUE if the job was executed successfully and the message should be finished
101101
*/
102102
public function execute(QueueInterface $queue, Message $message) {
103-
/** @var NodeData $nodeData */
104-
$nodeData = $this->nodeDataRepository->findByIdentifier($this->nodeIdentifier);
105-
$context = $this->contextFactory->create([
106-
'workspaceName' => $this->workspaceName,
107-
'dimensions' => $this->dimensions
108-
]);
109-
$currentNode = $this->nodeFactory->createFromNodeData($nodeData, $context);
110-
if (!$currentNode instanceof NodeInterface) {
111-
return TRUE;
103+
foreach ($this->nodes as $node) {
104+
/** @var NodeData $nodeData */
105+
$nodeData = $this->nodeDataRepository->findByIdentifier($node['nodeIdentifier']);
106+
$context = $this->contextFactory->create([
107+
'workspaceName' => $this->workspaceName,
108+
'dimensions' => $node['dimensions']
109+
]);
110+
$currentNode = $this->nodeFactory->createFromNodeData($nodeData, $context);
111+
if (!$currentNode instanceof NodeInterface) {
112+
return TRUE;
113+
}
114+
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
115+
$this->nodeIndexer->indexNode($currentNode);
112116
}
113-
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
114-
$this->nodeIndexer->indexNode($currentNode);
117+
115118
$this->nodeIndexer->flush();
116119

117120
return TRUE;
@@ -123,7 +126,7 @@ public function execute(QueueInterface $queue, Message $message) {
123126
* @return string A job identifier
124127
*/
125128
public function getIdentifier() {
126-
return $this->nodeIdentifier;
129+
return $this->identifier;
127130
}
128131

129132
/**

0 commit comments

Comments
 (0)