Skip to content

Commit a7d4354

Browse files
authored
Merge pull request #799 from MetaCell/feature/CH-55-61
CH-55 CH-61 add retry policy and customization to tasks
2 parents e04c2bd + f10dd86 commit a7d4354

File tree

10 files changed

+115
-50
lines changed

10 files changed

+115
-50
lines changed

applications/workflows/server/workflows_api/service/workflow_service.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from cloudharness.workflows import argo
1+
from cloudharness.workflows import argo_service
22
from workflows_api.models import OperationSearchResult, Operation, SearchResultData
33

4-
OperationNotFound = argo.WorkflowNotFound
5-
OperationException = argo.WorkflowException
6-
BadParam = argo.BadParam
4+
OperationNotFound = argo_service.WorkflowNotFound
5+
OperationException = argo_service.WorkflowException
6+
BadParam = argo_service.BadParam
77

88

9-
def argo_workflow_to_operation(workflow: argo.Workflow):
9+
def argo_workflow_to_operation(workflow: argo_service.Workflow):
1010
return Operation(name=workflow.name,
1111
status=workflow.status,
1212
create_time=workflow.create_time,
@@ -15,12 +15,12 @@ def argo_workflow_to_operation(workflow: argo.Workflow):
1515

1616
def delete_operation(name):
1717
"""deletes operation by id"""
18-
argo.delete_workflow(name)
18+
argo_service.delete_workflow(name)
1919

2020

2121
def get_operation(name):
2222
"""get operation by id"""
23-
return argo_workflow_to_operation(argo.get_workflow(name))
23+
return argo_workflow_to_operation(argo_service.get_workflow(name))
2424

2525

2626
def list_operations(status=None, continue_token=None, limit=None) -> OperationSearchResult:
@@ -38,7 +38,7 @@ def list_operations(status=None, continue_token=None, limit=None) -> OperationSe
3838
:rtype: OperationSearchResult
3939
"""
4040

41-
argo_raw_result = argo.get_workflows(status, limit=limit, continue_token=continue_token)
41+
argo_raw_result = argo_service.get_workflows(status, limit=limit, continue_token=continue_token)
4242
result = OperationSearchResult()
4343
result.items = tuple(argo_workflow_to_operation(item) for item in argo_raw_result.items)
4444
result.meta = SearchResultData(continue_token=argo_raw_result.continue_token)
@@ -51,4 +51,4 @@ def log_operation(name: str) -> str:
5151
:rtype: str
5252
"""
5353

54-
return argo.get_workflow_logs(name)
54+
return argo_service.get_workflow_logs(name)

docs/applications/development/workflows-api.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,18 @@ ttl_strategy={
219219
op = operations.ParallelOperation(..., ttl_strategy=ttl_strategy)
220220
```
221221

222+
## Retry strategy
223+
224+
By default, workflow tasks are retried 10 times. To set the value to something else, use
225+
the `retry_limit` parameter.
226+
227+
```python
228+
from cloudharness.workflows import operations, tasks
229+
230+
my_task = tasks.CustomTask('my-task-retry', 'myapp-mytask', retry_limit=2)
231+
op = operations.SingleTaskOperation('my-op-retry-', my_task)
232+
```
233+
222234
## Notify on exit
223235

224236
The parameter `on_exit_notify` adds an additional task to the workflow that notifies its completion in the events queue.
@@ -235,6 +247,32 @@ op = operations.ParallelOperation(..., on_exit_notify=on_exit_notify)
235247

236248
Synchronous operation types use this mechanism to wait for the result and get the value.
237249

250+
To customize the onExit strategy an additional `image` parameter can be specified.
251+
252+
```Python
253+
import json
254+
on_exit_notify={
255+
'queue': 'my_queue',
256+
'payload': json.dumps({'insert': 1})
257+
'image': "my-image"
258+
}
259+
op = operations.ParallelOperation(..., on_exit_notify=on_exit_notify)
260+
```
261+
262+
## Free template customization
263+
264+
To customize the task beyond the Task api, use
265+
the `template_overrides` parameter.
266+
267+
```python
268+
from cloudharness.workflows import operations, tasks
269+
270+
my_task = tasks.CustomTask('my-task-retry', 'myapp-mytask', retry_limit=2)
271+
op = operations.SingleTaskOperation('my-op-retry-', template_overrides=tasks.V1alpha1Template(
272+
memoize=True,
273+
))
274+
```
275+
238276
## Workflows query service api
239277

240278
Workflows can be queried and retrieved through the Python api

libraries/cloudharness-common/cloudharness/utils/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import collections
12

2-
def dict_merge(dct, merge_dct, add_keys=True):
3+
4+
def dict_merge(dct, merge_dct, add_keys=True, merge_none=True):
35
""" Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
46
updating only top-level keys, dict_merge recurses down into dicts nested
57
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
@@ -31,7 +33,7 @@ def dict_merge(dct, merge_dct, add_keys=True):
3133
if (k in dct and isinstance(dct[k], dict) and
3234
isinstance(merge_dct[k], collections.abc.Mapping)):
3335
dct[k] = dict_merge(dct[k], merge_dct[k], add_keys=add_keys)
34-
else:
36+
elif merge_none or (merge_dct[k] is not None):
3537
dct[k] = merge_dct[k]
3638

3739
return dct
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from . import argo_service as argo
2+
from . import argo_service

libraries/cloudharness-common/cloudharness/workflows/argo.py renamed to libraries/cloudharness-common/cloudharness/workflows/argo_service.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
from cloudharness import log, applications
1616

1717
ch_conf = conf.get_configuration()
18-
namespace = conf.get_namespace()
18+
try:
19+
namespace = conf.get_namespace()
20+
except Exception as e:
21+
log.error("Error getting namespace: %s", e)
22+
namespace = 'default'
1923

2024

2125
class WorkflowException(Exception):

libraries/cloudharness-common/cloudharness/workflows/operations.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from cloudharness import log
1010
from cloudharness.events.client import EventClient
1111
from cloudharness.utils import env, config
12-
from . import argo
12+
from . import argo_service
1313
from .tasks import Task, SendResultTask, CustomTask
1414
from .utils import PodExecutionContext, affinity_spec, is_accounts_present, name_from_path, volume_mount_template
1515
from argo.workflows.client import V1Toleration
@@ -60,7 +60,7 @@ class ContainerizedOperation(ManagedOperation):
6060
"""
6161

6262
def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None,
63-
shared_directory=None, *args, **kwargs):
63+
shared_directory=None, pod_gc=None, *args, **kwargs):
6464
"""
6565
:param basename:
6666
:param pod_context: PodExecutionContext - represents affinity with other pods in the system
@@ -76,6 +76,7 @@ def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list,
7676

7777
self.persisted = None
7878
shared_path = None
79+
self.pod_gc = pod_gc
7980
if shared_directory:
8081
if shared_directory is True:
8182
self.volumes = ['/mnt/shared']
@@ -126,6 +127,10 @@ def spec(self):
126127
'tolerations': [V1Toleration(key='cloudharness/temporary-job', operator='Equal', value='true').to_dict()],
127128
'serviceAccountName': SERVICE_ACCOUNT,
128129
'imagePullSecrets': [{'name': config.CloudharnessConfig.get_registry_secret()}],
130+
'podGC': self.pod_gc or {
131+
'strategy': 'OnWorkflowSuccess',
132+
'deleteDelayDuration': self.ttl_strategy['secondsAfterSuccess'] if self.ttl_strategy else "600s",
133+
},
129134
'volumes': [{
130135
# mount allvalues so we can use the cloudharness Python library
131136
'name': 'cloudharness-allvalues',
@@ -168,7 +173,7 @@ def add_on_exit_notify_handler(self, spec):
168173
payload = self.on_exit_notify['payload']
169174
exit_task = CustomTask(
170175
name="exit-handler",
171-
image_name='workflows-notify-queue',
176+
image_name=self.on_exit_notify.get('image', 'workflows-notify-queue'),
172177
workflow_result='{{workflow.status}}',
173178
queue_name=queue,
174179
payload=payload
@@ -206,7 +211,7 @@ def submit(self):
206211
log.debug("Submitting workflow\n" + yaml.dump(op))
207212

208213
# TODO use rest api for that? Include this into cloudharness.workflows?
209-
self.persisted = argo.submit_workflow(op)
214+
self.persisted = argo_service.submit_workflow(op)
210215
return self.persisted
211216

212217
def is_running(self):
@@ -216,7 +221,7 @@ def is_running(self):
216221
return False
217222

218223
def refresh(self):
219-
self.persisted = argo.get_workflow(self.persisted.name)
224+
self.persisted = argo_service.get_workflow(self.persisted.name)
220225

221226
def is_error(self):
222227
if self.persisted:
@@ -313,7 +318,7 @@ def execute(self, timeout=None):
313318
while not self.persisted.is_finished():
314319
time.sleep(POLLING_WAIT_SECONDS)
315320
log.debug(f"Polling argo workflow {self.persisted.name}")
316-
self.persisted = argo.get_workflow(self.persisted.name)
321+
self.persisted = argo_service.get_workflow(self.persisted.name)
317322
log.debug(f"Polling succeeded for \
318323
{self.persisted.name}. Current phase: {self.persisted.status}")
319324
if timeout and time.time() - start_time > timeout:

libraries/cloudharness-common/cloudharness/workflows/tasks.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
1-
from . import argo
2-
1+
import argo.workflows
2+
from cloudharness.utils import dict_merge
3+
from . import argo_service
4+
from argo.workflows.client import V1alpha1Template
35
from cloudharness.utils.env import get_cloudharness_variables, get_image_full_tag
46
from .utils import WORKFLOW_NAME_VARIABLE_NAME, PodExecutionContext, affinity_spec, is_accounts_present, volume_mount_template, volume_requires_affinity
57

68
SERVICE_ACCOUNT = 'argo-workflows'
79

810

9-
class Task(argo.ArgoObject):
11+
class Task(argo_service.ArgoObject):
1012
"""
1113
Abstract interface for a task.
1214
"""
1315

14-
def __init__(self, name, resources={}, volume_mounts=[], **env_args):
16+
def __init__(self, name, resources=None, volume_mounts=None, retry_limit=10, template_overrides: V1alpha1Template = None, **env_args):
1517
self.name = name.replace(' ', '-').lower()
16-
self.resources = resources
18+
self.resources = resources or {}
1719
self.__envs = get_cloudharness_variables()
18-
self.volume_mounts = volume_mounts
20+
self.volume_mounts = volume_mounts or []
1921
self.external_volumes = [
2022
v.split(':')[0] for v in self.volume_mounts if volume_requires_affinity(v)]
2123
for k in env_args:
2224
self.__envs[k] = str(env_args[k])
25+
self.retry_limit = retry_limit
26+
self.template_overrides = template_overrides.to_dict() if template_overrides else {}
2327

2428
def metadata_spec(self):
2529
return {
@@ -85,14 +89,14 @@ def volumes_mounts_spec(self):
8589

8690
class ContainerizedTask(Task):
8791

88-
def __init__(self, name, resources={}, image_pull_policy='IfNotPresent', command=None, **env_args):
89-
super().__init__(name, resources, **env_args)
92+
def __init__(self, name, resources={}, image_pull_policy='IfNotPresent', command=None, retry_limit=10, template_overrides: V1alpha1Template = None, **env_args):
93+
super().__init__(name, resources, retry_limit=retry_limit, template_overrides=template_overrides, **env_args)
9094
self.image_pull_policy = image_pull_policy
9195
self.command = command
9296

9397
def spec(self):
9498

95-
spec = {
99+
spec = dict_merge({
96100
'container': {
97101
'image': self.image_name,
98102
'env': self.envs,
@@ -104,9 +108,12 @@ def spec(self):
104108
'metadata': self.metadata_spec(),
105109
'name': self.name,
106110
'outputs': {},
107-
'affinity': self.affinity_spec()
111+
'affinity': self.affinity_spec(),
112+
'retryStrategy': {
113+
'limit': self.retry_limit
114+
}
108115

109-
}
116+
}, self.template_overrides, merge_none=False)
110117
if self.command is not None:
111118
spec['container']['command'] = self.command
112119
return spec
@@ -123,7 +130,7 @@ def __init__(self, name, source, **kwargs):
123130

124131
def spec(self):
125132

126-
return {
133+
return dict_merge({
127134
'name': self.name,
128135
'affinity': self.affinity_spec(),
129136
'metadata': self.metadata_spec(),
@@ -135,18 +142,18 @@ def spec(self):
135142
'volumeMounts': self.volumes_mounts_spec(),
136143
'command': [self.command]
137144
}
138-
}
145+
}, self.template_overrides, merge_none=False)
139146

140147
@property
141148
def command(self):
142149
raise NotImplemented
143150

144151

145152
class PythonTask(InlinedTask):
146-
def __init__(self, name, func):
153+
def __init__(self, name, func, **kwargs):
147154
import inspect
148155
super().__init__(name, (inspect.getsource(
149-
func) + f"\n{func.__name__}()").strip())
156+
func) + f"\n{func.__name__}()").strip(), **kwargs)
150157

151158
@property
152159
def image_name(self):

libraries/cloudharness-common/tests/test_applications.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
from cloudharness.utils.config import CloudharnessConfig, ConfigObject
21
from cloudharness.applications import ApplicationConfiguration, get_configuration
3-
from .test_env import set_default_environment
4-
5-
set_default_environment()
2+
from cloudharness.utils.config import CloudharnessConfig, ConfigObject
3+
from .test_env import set_test_environment
4+
set_test_environment()
65

76

87
conf_1 = {

libraries/cloudharness-common/tests/test_env.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from cloudharness.utils.config import CloudharnessConfig as conf
21
from cloudharness.utils.env import set_default_environment
3-
import pytest
2+
from cloudharness.utils.config import CloudharnessConfig as conf
43
import os
54
import yaml
6-
75
HERE = os.path.dirname(os.path.realpath(__file__))
86
os.environ["CH_VALUES_PATH"] = os.path.join(HERE, "values.yaml")
97

0 commit comments

Comments
 (0)