Skip to content

Commit e0e309d

Browse files
committed
TASK: Add an internal work CLI command
1 parent d1f3d82 commit e0e309d

2 files changed

Lines changed: 63 additions & 1 deletion

File tree

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use TYPO3\Flow\Persistence\PersistenceManagerInterface;
1616
use TYPO3\Flow\Utility\Files;
1717
use TYPO3\TYPO3CR\Domain\Repository\WorkspaceRepository;
18+
use Flowpack\JobQueue\Common\Exception as JobQueueException;
1819

1920
/**
2021
* Provides CLI features for index handling
@@ -107,6 +108,64 @@ public function buildCommand($workspace = null)
107108
$this->outputLine();
108109
}
109110

111+
/**
112+
* @param int $exitAfter If set, this command will exit after the given amount of seconds
113+
* @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
114+
* @param bool $verbose Output debugging information
115+
* @return void
116+
*/
117+
public function workCommand($exitAfter = null, $limit = null, $verbose = false)
118+
{
119+
if ($verbose) {
120+
$this->output('Watching queue <b>"%s"</b>', [self::QUEUE_NAME]);
121+
if ($exitAfter !== null) {
122+
$this->output(' for <b>%d</b> seconds', [$exitAfter]);
123+
}
124+
$this->outputLine('...');
125+
}
126+
$startTime = time();
127+
$timeout = null;
128+
$numberOfJobExecutions = 0;
129+
do {
130+
$message = null;
131+
if ($exitAfter !== null) {
132+
$timeout = max(1, $exitAfter - (time() - $startTime));
133+
}
134+
try {
135+
$message = $this->jobManager->waitAndExecute(self::QUEUE_NAME, $timeout);
136+
} catch (JobQueueException $exception) {
137+
$numberOfJobExecutions ++;
138+
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
139+
if ($verbose && $exception->getPrevious() instanceof \Exception) {
140+
$this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]);
141+
}
142+
} catch (\Exception $exception) {
143+
$this->outputLine('<error>Unexpected exception during job execution: %s, aborting...</error>', [$exception->getMessage()]);
144+
$this->quit(1);
145+
}
146+
if ($message !== null) {
147+
$numberOfJobExecutions ++;
148+
if ($verbose) {
149+
$messagePayload = strlen($message->getPayload()) <= 50 ? $message->getPayload() : substr($message->getPayload(), 0, 50) . '...';
150+
$this->outputLine('<success>Successfully executed job "%s" (%s)</success>', [$message->getIdentifier(), $messagePayload]);
151+
}
152+
}
153+
if ($exitAfter !== null && (time() - $startTime) >= $exitAfter) {
154+
if ($verbose) {
155+
$this->outputLine('Quitting after %d seconds due to <i>--exit-after</i> flag', [time() - $startTime]);
156+
}
157+
$this->quit();
158+
}
159+
if ($limit !== null && $numberOfJobExecutions >= $limit) {
160+
if ($verbose) {
161+
$this->outputLine('Quitting after %d executed job%s due to <i>--limit</i> flag', [$numberOfJobExecutions, $numberOfJobExecutions > 1 ? 's' : '']);
162+
}
163+
$this->quit();
164+
}
165+
166+
} while (true);
167+
}
168+
110169
/**
111170
* Flush the index queue
112171
*/
@@ -117,6 +176,9 @@ public function flushCommand()
117176
$this->outputLine();
118177
}
119178

179+
/**
180+
* Output system report for CLI commands
181+
*/
120182
protected function outputSystemReport()
121183
{
122184
$this->outputLine();

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ How to process indexing job
1313

1414
You can use this CLI command to process indexing job:
1515

16-
flow job:work --queue Flowpack.ElasticSearch.ContentRepositoryQueueIndexer
16+
flow nodeindexqueue:work
1717

1818
You can use tools like ```supervisord``` to manage long runing process. Bellow you can
1919
found a basic configuration:

0 commit comments

Comments
 (0)