77use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \IndexingJob ;
88use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \LoggerTrait ;
99use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \UpdateAliasJob ;
10+ use Flowpack \ElasticSearch \Domain \Model \Mapping ;
11+ use Flowpack \JobQueue \Common \Exception ;
1012use Flowpack \JobQueue \Common \Job \JobManager ;
1113use Flowpack \JobQueue \Common \Queue \QueueManager ;
14+ use Neos \ContentRepository \Domain \Repository \WorkspaceRepository ;
1215use Neos \Flow \Annotations as Flow ;
1316use Neos \Flow \Cli \CommandController ;
1417use Neos \Flow \Persistence \PersistenceManagerInterface ;
15- use Neos \ContentRepository \Domain \Repository \WorkspaceRepository ;
1618use Neos \Utility \Files ;
1719
1820/**
1921 * Provides CLI features for index handling
20- *
2122 * @Flow\Scope("singleton")
2223 */
2324class NodeIndexQueueCommandController extends CommandController
@@ -73,14 +74,15 @@ class NodeIndexQueueCommandController extends CommandController
7374 * Index all nodes by creating a new index and when everything was completed, switch the index alias.
7475 *
7576 * @param string $workspace
77+ * @throws \Flowpack\JobQueue\Common\Exception
78+ * @throws \Neos\Flow\Mvc\Exception\StopActionException
7679 */
7780 public function buildCommand ($ workspace = null )
7881 {
7982 $ indexPostfix = time ();
8083 $ indexName = $ this ->createNextIndex ($ indexPostfix );
8184 $ this ->updateMapping ();
8285
83-
8486 $ this ->outputLine ();
8587 $ this ->outputLine ('<b>Indexing on %s ...</b> ' , [$ indexName ]);
8688
@@ -114,6 +116,7 @@ public function buildCommand($workspace = null)
114116 * @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
115117 * @param bool $verbose Output debugging information
116118 * @return void
119+ * @throws \Neos\Flow\Mvc\Exception\StopActionException
117120 */
118121 public function workCommand ($ queue = 'batch ' , $ exitAfter = null , $ limit = null , $ verbose = false )
119122 {
@@ -145,8 +148,8 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
145148 }
146149 try {
147150 $ message = $ this ->jobManager ->waitAndExecute ($ queueName , $ timeout );
148- } catch (JobQueueException $ exception ) {
149- $ numberOfJobExecutions ++;
151+ } catch (Exception $ exception ) {
152+ $ numberOfJobExecutions ++;
150153 $ this ->outputLine ('<error>%s</error> ' , [$ exception ->getMessage ()]);
151154 if ($ verbose && $ exception ->getPrevious () instanceof \Exception) {
152155 $ this ->outputLine (' Reason: %s ' , [$ exception ->getPrevious ()->getMessage ()]);
@@ -156,7 +159,7 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
156159 $ this ->quit (1 );
157160 }
158161 if ($ message !== null ) {
159- $ numberOfJobExecutions ++;
162+ $ numberOfJobExecutions ++;
160163 if ($ verbose ) {
161164 $ messagePayload = strlen ($ message ->getPayload ()) <= 50 ? $ message ->getPayload () : substr ($ message ->getPayload (), 0 , 50 ) . '... ' ;
162165 $ this ->outputLine ('<success>Successfully executed job "%s" (%s)</success> ' , [$ message ->getIdentifier (), $ messagePayload ]);
@@ -174,7 +177,6 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
174177 }
175178 $ this ->quit ();
176179 }
177-
178180 } while (true );
179181 }
180182
@@ -183,8 +185,12 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
183185 */
184186 public function flushCommand ()
185187 {
186- $ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->flush ();
187- $ this ->outputSystemReport ();
188+ try {
189+ $ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->flush ();
190+ $ this ->outputSystemReport ();
191+ } catch (Exception $ exception ) {
192+ $ this ->outputLine ('An error occurred: %s ' , [$ exception ->getMessage ()]);
193+ }
188194 $ this ->outputLine ();
189195 }
190196
@@ -198,7 +204,11 @@ protected function outputSystemReport()
198204 $ time = microtime (true ) - $ _SERVER ["REQUEST_TIME_FLOAT " ];
199205 $ this ->outputLine ('Execution time : %s seconds ' , [$ time ]);
200206 $ this ->outputLine ('Indexing Queue : %s ' , [self ::BATCH_QUEUE_NAME ]);
201- $ this ->outputLine ('Pending Jobs : %s ' , [$ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->count ()]);
207+ try {
208+ $ this ->outputLine ('Pending Jobs : %s ' , [$ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->count ()]);
209+ } catch (Exception $ exception ) {
210+ $ this ->outputLine ('Pending Jobs : Error, queue not found, %s ' , [$ exception ->getMessage ()]);
211+ }
202212 }
203213
204214 /**
@@ -251,6 +261,7 @@ protected function createNextIndex($indexPostfix)
251261 $ this ->nodeIndexer ->setIndexNamePostfix ($ indexPostfix );
252262 $ this ->nodeIndexer ->getIndex ()->create ();
253263 $ this ->log (sprintf ('action=indexing step=index-created index=%s ' , $ this ->nodeIndexer ->getIndexName ()), LOG_INFO );
264+
254265 return $ this ->nodeIndexer ->getIndexName ();
255266 }
256267
0 commit comments