Skip to content

Commit bc0c8f8

Browse files
authored
Merge pull request #151 from dfeyer/task-improve-errorhandling
TASK: Improve Error Handling during Bulk Indexing
2 parents 3c9996a + e04e145 commit bc0c8f8

9 files changed

Lines changed: 313 additions & 9 deletions

File tree

Classes/Command/NodeIndexCommandController.php

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
* source code.
1212
*/
1313

14+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\Error\ErrorInterface;
1415
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\LoggerInterface;
1516
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Mapping\NodeTypeMappingBuilder;
17+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Service\ErrorHandlingService;
1618
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Service\IndexWorkspaceTrait;
1719
use Flowpack\ElasticSearch\Domain\Model\Mapping;
1820
use Flowpack\ElasticSearch\Transfer\Exception\ApiException;
@@ -39,6 +41,12 @@ class NodeIndexCommandController extends CommandController
3941
{
4042
use IndexWorkspaceTrait;
4143

44+
/**
45+
* @Flow\Inject
46+
* @var ErrorHandlingService
47+
*/
48+
protected $errorHandlingService;
49+
4250
/**
4351
* @Flow\Inject
4452
* @var NodeIndexerInterface
@@ -258,7 +266,17 @@ public function buildCommand($limit = null, $update = false, $workspace = null,
258266

259267
$this->nodeIndexingManager->flushQueues();
260268

261-
$this->logger->log('Done. (indexed ' . $count . ' nodes)', LOG_INFO);
269+
if ($this->errorHandlingService->hasError()) {
270+
$this->outputLine();
271+
/** @var ErrorInterface $error */
272+
foreach ($this->errorHandlingService as $error) {
273+
$this->outputLine('<error>Error</error> ' . $error->message());
274+
}
275+
$this->outputLine();
276+
$this->outputLine('<error>Check your logs for more information</error>');
277+
} else {
278+
$this->logger->log('Done. (indexed ' . $count . ' nodes)', LOG_INFO);
279+
}
262280
$this->nodeIndexer->getIndex()->refresh();
263281

264282
// TODO: smoke tests
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\Error;
3+
4+
/*
5+
* This file is part of the Flowpack.ElasticSearch.ContentRepositoryAdaptor package.
6+
*
7+
* (c) Contributors of the Neos Project - www.neos.io
8+
*
9+
* This package is Open Source Software. For the full copyright and license
10+
* information, please view the LICENSE file which was distributed with this
11+
* source code.
12+
*/
13+
14+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\LoggerInterface;
15+
use Neos\Flow\Annotations as Flow;
16+
17+
/**
18+
* Handle Bulk Indexing Error and build human readable output for analysis
19+
*/
20+
class BulkIndexingError implements ErrorInterface
21+
{
22+
/**
23+
* @Flow\Inject
24+
* @var LoggerInterface
25+
*/
26+
protected $logger;
27+
28+
/**
29+
* @var string
30+
*/
31+
protected $message;
32+
33+
/**
34+
* @var string
35+
*/
36+
protected $filename;
37+
38+
/**
39+
* @var array
40+
*/
41+
protected $currentBulkRequest;
42+
43+
/**
44+
* @var array
45+
*/
46+
protected $errors;
47+
48+
/**
49+
* @param array $currentBulkRequest
50+
* @param array $errors
51+
*/
52+
public function __construct(array $currentBulkRequest, array $errors)
53+
{
54+
$this->currentBulkRequest = $currentBulkRequest;
55+
$this->errors = json_decode($errors, true);
56+
57+
if (!file_exists(FLOW_PATH_DATA . 'Logs/Elasticsearch')) {
58+
mkdir(FLOW_PATH_DATA . 'Logs/Elasticsearch');
59+
}
60+
61+
$referenceCode = date('YmdHis', $_SERVER['REQUEST_TIME']) . substr(md5(rand()), 0, 6);
62+
63+
$this->filename = FLOW_PATH_DATA . 'Logs/Elasticsearch/' . $referenceCode . '.txt';
64+
$this->message = sprintf('Bulk indexing errors detected - See also: Data/Logs/Elasticsearch/%s on host: %s', basename($this->filename), gethostname());
65+
}
66+
67+
/**
68+
* Log the error message
69+
*
70+
* @return void
71+
*/
72+
public function log()
73+
{
74+
if (file_exists(FLOW_PATH_DATA . 'Logs/Elasticsearch') && is_dir(FLOW_PATH_DATA . 'Logs/Elasticsearch') && is_writable(FLOW_PATH_DATA . 'Logs/Elasticsearch')) {
75+
file_put_contents($this->filename, $this->renderErrors());
76+
$this->logger->log($this->message, LOG_ERR, [], 'Flowpack.ElasticSearch.ContentRepositoryAdaptor', __CLASS__, __FUNCTION__);
77+
} else {
78+
$this->logger->log(sprintf('Could not write indexing errors backtrace into %s because the directory could not be created or is not writable.', FLOW_PATH_DATA . 'Logs/Elasticsearch/'), LOG_WARNING, [], 'Flowpack.ElasticSearch.ContentRepositoryAdaptor', __CLASS__, __FUNCTION__);
79+
}
80+
}
81+
82+
/**
83+
* @return string
84+
*/
85+
public function message()
86+
{
87+
return $this->message;
88+
}
89+
90+
91+
/**
92+
* @return string
93+
*/
94+
protected function renderErrors()
95+
{
96+
$bulkRequest = json_encode($this->currentBulkRequest, JSON_PRETTY_PRINT);
97+
$errors = json_encode($this->errors, JSON_PRETTY_PRINT);
98+
return sprintf("Payload:\n========\n\n%s\n\nErrors:\n=======\n\n%s\n\n", $bulkRequest, $errors);
99+
}
100+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\Error;
3+
4+
/*
5+
* This file is part of the Flowpack.ElasticSearch.ContentRepositoryAdaptor package.
6+
*
7+
* (c) Contributors of the Neos Project - www.neos.io
8+
*
9+
* This package is Open Source Software. For the full copyright and license
10+
* information, please view the LICENSE file which was distributed with this
11+
* source code.
12+
*/
13+
14+
/**
15+
* Error Interface
16+
*/
17+
interface ErrorInterface
18+
{
19+
/**
20+
* Log the error message
21+
*
22+
* @return void
23+
*/
24+
public function log();
25+
26+
/**
27+
* Get a short log message for reporting
28+
*
29+
* @return string
30+
*/
31+
public function message();
32+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\Error;
3+
4+
/*
5+
* This file is part of the Flowpack.ElasticSearch.ContentRepositoryAdaptor package.
6+
*
7+
* (c) Contributors of the Neos Project - www.neos.io
8+
*
9+
* This package is Open Source Software. For the full copyright and license
10+
* information, please view the LICENSE file which was distributed with this
11+
* source code.
12+
*/
13+
14+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\LoggerInterface;
15+
use Neos\Flow\Annotations as Flow;
16+
17+
/**
18+
* Handle malformed bulk request error
19+
*/
20+
class MalformedBulkRequestError implements ErrorInterface
21+
{
22+
/**
23+
* @Flow\Inject
24+
* @var LoggerInterface
25+
*/
26+
protected $logger;
27+
28+
/**
29+
* @var string
30+
*/
31+
protected $message;
32+
33+
/**
34+
* @var array
35+
*/
36+
protected $tuple;
37+
38+
/**
39+
* @param string $message
40+
* @param array $tuple
41+
*/
42+
public function __construct($message, array $tuple)
43+
{
44+
$this->message = $message;
45+
$this->tuple = $tuple;
46+
}
47+
48+
/**
49+
* Log the error message
50+
*
51+
* @return void
52+
*/
53+
public function log()
54+
{
55+
$this->logger->log($this->message(), LOG_ERR, $this->tuple);
56+
}
57+
58+
/**
59+
* Get a short log message for reporting
60+
*
61+
* @return string
62+
*/
63+
public function message()
64+
{
65+
return $this->message;
66+
}
67+
}

Classes/Indexer/NodeIndexer.php

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Driver\SystemDriverInterface;
1919
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\ElasticSearchClient;
2020
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception;
21+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\Error\BulkIndexingError;
22+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\Error\MalformedBulkRequestError;
2123
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Mapping\NodeTypeMappingBuilder;
24+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Service\ErrorHandlingService;
2225
use Flowpack\ElasticSearch\Domain\Model\Document as ElasticSearchDocument;
2326
use Flowpack\ElasticSearch\Domain\Model\Index;
2427
use Flowpack\ElasticSearch\Transfer\Exception\ApiException;
@@ -46,6 +49,12 @@ class NodeIndexer extends AbstractNodeIndexer implements BulkNodeIndexerInterfac
4649
*/
4750
protected $indexNamePostfix = '';
4851

52+
/**
53+
* @Flow\Inject
54+
* @var ErrorHandlingService
55+
*/
56+
protected $errorHandlingService;
57+
4958
/**
5059
* @Flow\Inject
5160
* @var ElasticSearchClient
@@ -315,7 +324,9 @@ public function flush()
315324
foreach ($bulkRequestTuple as $bulkRequestItem) {
316325
$itemAsJson = json_encode($bulkRequestItem);
317326
if ($itemAsJson === false) {
318-
$this->logger->log('NodeIndexer: Bulk request item could not be encoded as JSON - ' . json_last_error_msg(), LOG_ERR, $bulkRequestItem, 'ElasticSearch (CR)');
327+
$this->errorHandlingService->log(
328+
new MalformedBulkRequestError('Indexing Error: Bulk request item could not be encoded as JSON - ' . json_last_error_msg(), $bulkRequestItem)
329+
);
319330
continue 2;
320331
}
321332
$tupleAsJson .= $itemAsJson . chr(10);
@@ -327,7 +338,9 @@ public function flush()
327338
$response = $this->requestDriver->bulk($this->getIndex(), $content);
328339
foreach ($response as $responseLine) {
329340
if (isset($response['errors']) && $response['errors'] !== false) {
330-
$this->logger->log('NodeIndexer: ' . json_encode($responseLine), LOG_ERR, null, 'ElasticSearch (CR)');
341+
$this->errorHandlingService->log(
342+
new BulkIndexingError($this->currentBulkRequest, $responseLine)
343+
);
331344
}
332345
}
333346
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryAdaptor\Service;
3+
4+
/*
5+
* This file is part of the Flowpack.ElasticSearch.ContentRepositoryAdaptor package.
6+
*
7+
* (c) Contributors of the Neos Project - www.neos.io
8+
*
9+
* This package is Open Source Software. For the full copyright and license
10+
* information, please view the LICENSE file which was distributed with this
11+
* source code.
12+
*/
13+
14+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\Error\ErrorInterface;
15+
use Neos\Flow\Annotations as Flow;
16+
17+
/**
18+
* Error Handling Service
19+
*
20+
* @Flow\Scope("singleton")
21+
*/
22+
class ErrorHandlingService implements \IteratorAggregate
23+
{
24+
/**
25+
* @var array
26+
*/
27+
protected $errors = [];
28+
29+
/**
30+
* @param ErrorInterface $error
31+
* @return void
32+
*/
33+
public function log(ErrorInterface $error)
34+
{
35+
$this->errors[] = $error;
36+
$error->log();
37+
}
38+
39+
/**
40+
* @return bool
41+
*/
42+
public function hasError()
43+
{
44+
return count($this->errors) > 0;
45+
}
46+
47+
/**
48+
* @return \ArrayIterator
49+
*/
50+
public function getIterator()
51+
{
52+
return new \ArrayIterator($this->errors);
53+
}
54+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Neos:
2+
ContentRepository:
3+
Search:
4+
elasticSearch:
5+
log:
6+
backendOptions:
7+
fileBackend:
8+
logFileURL: '%FLOW_PATH_DATA%Logs/ElasticSearch_Development.log'
9+
severityThreshold: '%LOG_DEBUG%'

0 commit comments

Comments
 (0)