1313
1414use Flowpack \ElasticSearch \ContentRepositoryAdaptor \Exception ;
1515use Flowpack \ElasticSearch \ContentRepositoryAdaptor \Mapping \NodeTypeMappingBuilder ;
16- use Flowpack \ElasticSearch \Domain \Model \Client ;
1716use Flowpack \ElasticSearch \Domain \Model \Document as ElasticSearchDocument ;
1817use Flowpack \ElasticSearch \Domain \Model \Index ;
1918use TYPO3 \Flow \Annotations as Flow ;
@@ -259,16 +258,18 @@ protected function appendToBulkRequest(NodeInterface $node, ElasticSearchDocumen
259258 [
260259 'update ' => [
261260 '_type ' => $ document ->getType ()->getName (),
262- '_id ' => $ document ->getId ()
261+ '_id ' => $ document ->getId (),
262+ '_index ' => $ this ->getIndexName (),
263+ '_retry_on_conflict ' => 3
263264 ]
264265 ],
265266 // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-update.html
266267 [
267268 'script ' => [
268269 'inline ' => '
269- fulltext = (ctx._source.containsKey("__fulltext") ? ctx._source.__fulltext : new LinkedHashMap ());
270- fulltextParts = (ctx._source.containsKey("__fulltextParts") ? ctx._source.__fulltextParts : new LinkedHashMap ());
271- ctx. _source = newData;
270+ fulltext = (ctx._source.containsKey("__fulltext") ? ctx._source.__fulltext : new HashMap ());
271+ fulltextParts = (ctx._source.containsKey("__fulltextParts") ? ctx._source.__fulltextParts : new HashMap ());
272+ _source = newData;
272273 ctx._source.__fulltext = fulltext;
273274 ctx._source.__fulltextParts = fulltextParts
274275 ' ,
@@ -308,7 +309,8 @@ protected function removeDuplicateDocuments($contextPath, $contextPathHash, Node
308309 {
309310 $ type = NodeTypeMappingBuilder::convertNodeTypeNameToMappingName ($ node ->getNodeType ()->getName ());
310311 $ this ->logger ->log (sprintf ('NodeIndexer: Check duplicate nodes for %s (%s). ContentContextHash: %s ' , $ contextPath , $ type , $ contextPathHash ), LOG_DEBUG , null , 'ElasticSearch (CR) ' );
311- $ result = $ this ->getIndex ()->request ('GET ' , '/_search?scroll=1m&search_type=scan ' , [], json_encode ([
312+ $ result = $ this ->getIndex ()->request ('GET ' , '/_search?scroll=1m ' , [], json_encode ([
313+ 'sort ' => ['_doc ' ],
312314 'query ' => [
313315 'bool ' => [
314316 'must ' => [
@@ -327,14 +329,33 @@ protected function removeDuplicateDocuments($contextPath, $contextPathHash, Node
327329 $ treatedContent = $ result ->getTreatedContent ();
328330 $ scrollId = $ treatedContent ['_scroll_id ' ];
329331
330- $ result = $ this ->getIndex ()->request ('GET ' , '/_search/scroll?scroll=1m ' , [], $ scrollId , false );
331- $ treatedContent = $ result ->getTreatedContent ();
332+
333+ $ mapHitToDeleteRequest = function ($ hit ) {
334+ $ bulkRequest [] = json_encode ([
335+ 'delete ' => [
336+ '_type ' => $ hit ['_type ' ],
337+ '_id ' => $ hit ['_id ' ]
338+ ]
339+ ]);
340+ };
341+
342+ $ bulkRequest = [];
332343 while (isset ($ treatedContent ['hits ' ]['hits ' ]) && $ treatedContent ['hits ' ]['hits ' ] !== []) {
333344 $ hits = $ treatedContent ['hits ' ]['hits ' ];
334- $ this -> logger -> log ( sprintf ( ' NodeIndexer: Check duplicate nodes for %s (%s), found %d document(s). ContentContextHash: %s ' , $ contextPath , $ type , count ( $ hits ), $ contextPathHash ), LOG_DEBUG , null , ' ElasticSearch (CR) ' );
345+ $ bulkRequest = array_merge ( $ bulkRequest , array_map ( $ mapHitToDeleteRequest , $ hits ));
335346 $ result = $ this ->getIndex ()->request ('GET ' , '/_search/scroll?scroll=1m ' , [], $ scrollId , false );
336347 $ treatedContent = $ result ->getTreatedContent ();
337348 }
349+
350+ $ this ->logger ->log (sprintf ('NodeIndexer: Check duplicate nodes for %s (%s), found %d document(s). ContentContextHash: %s ' , $ contextPath , $ type , count ($ bulkRequest ), $ contextPathHash ), LOG_DEBUG , null , 'ElasticSearch (CR) ' );
351+ if ($ bulkRequest !== []) {
352+ $ this ->getIndex ()->request ('POST ' , '/_bulk ' , [], implode ("\n" , $ bulkRequest ));
353+ }
354+ $ this ->searchClient ->request ('DELETE ' , '/_search/scroll ' , [], json_encode ([
355+ 'scroll_id ' => [
356+ $ scrollId
357+ ]
358+ ]));
338359 }
339360
340361 /**
@@ -376,29 +397,21 @@ protected function updateFulltext(NodeInterface $node, array $fulltextIndexOfNod
376397 // first, update the __fulltextParts, then re-generate the __fulltext from all __fulltextParts
377398 'script ' => [
378399 'inline ' => '
379- if (!ctx._source.containsKey("__fulltextParts")) {
380- ctx._source.__fulltextParts = new LinkedHashMap();
381- }
382- ctx._source.__fulltextParts[identifier] = fulltext;
383- ctx._source.__fulltext = new LinkedHashMap();
384-
385- Iterator<LinkedHashMap.Entry<String, LinkedHashMap>> fulltextByNode = ctx._source.__fulltextParts.entrySet().iterator();
386- for (fulltextByNode; fulltextByNode.hasNext();) {
387- Iterator<LinkedHashMap.Entry<String, String>> elementIterator = fulltextByNode.next().getValue().entrySet().iterator();
388- for (elementIterator; elementIterator.hasNext();) {
389- Map.Entry<String, String> element = elementIterator.next();
390- String value;
391-
392- if (ctx._source.__fulltext.containsKey(element.key)) {
393- value = ctx._source.__fulltext[element.key] + " " + element.value.trim();
394- } else {
395- value = element.value.trim();
396- }
397-
398- ctx._source.__fulltext[element.key] = value;
399- }
400- }
401- ' ,
400+ ctx._source.__fulltext = new HashMap();
401+ if (!ctx._source.containsKey("__fulltextParts")) {
402+ ctx._source.__fulltextParts = new HashMap();
403+ }
404+ ctx._source.__fulltextParts[identifier] = fulltext;
405+ ctx._source.__fulltextParts.each { originNodeIdentifier, partContent -> partContent.each { bucketKey, content ->
406+ if (ctx._source.__fulltext.containsKey(bucketKey)) {
407+ value = ctx._source.__fulltext[bucketKey] + " " + content.trim();
408+ } else {
409+ value = content.trim();
410+ }
411+ ctx._source.__fulltext[bucketKey] = value;
412+ }
413+ }
414+ ' ,
402415 'params ' => [
403416 'identifier ' => $ node ->getIdentifier (),
404417 'fulltext ' => $ fulltextIndexOfNode
0 commit comments