Skip to content

Commit 60a8ad3

Browse files
committed
#696 fix affinities for argo workflows
1 parent 6308d91 commit 60a8ad3

4 files changed

Lines changed: 105 additions & 13 deletions

File tree

libraries/cloudharness-common/cloudharness/workflows/operations.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,11 @@ def add_on_exit_notify_handler(self, spec):
178178
def modify_template(self, template):
179179
"""Hook to modify templates (e.g. add volumes)"""
180180

181-
template["metadata"] = {"labels": {c.key: c.value for c in self.pod_contexts}}
181+
if 'metadata' not in template:
182+
template['metadata'] = dict()
183+
if 'labels' not in template["metadata"]:
184+
template["metadata"]["labels"] = dict()
185+
template["metadata"]["labels"] = template["metadata"]["labels"] | {c.key: c.value for c in self.pod_contexts}
182186

183187
if self.volumes:
184188
if 'container' in template:

libraries/cloudharness-common/cloudharness/workflows/tasks.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from . import argo
22

33
from cloudharness.utils.env import get_cloudharness_variables, get_image_full_tag
4-
from .utils import WORKFLOW_NAME_VARIABLE_NAME, PodExecutionContext, affinity_spec, is_accounts_present, volume_mount_template
4+
from .utils import WORKFLOW_NAME_VARIABLE_NAME, PodExecutionContext, affinity_spec, is_accounts_present, volume_mount_template, volume_requires_affinity
55

66
SERVICE_ACCOUNT = 'argo-workflows'
77

@@ -16,9 +16,25 @@ def __init__(self, name, resources={}, volume_mounts=[], **env_args):
1616
self.resources = resources
1717
self.__envs = get_cloudharness_variables()
1818
self.volume_mounts = volume_mounts
19+
self.external_volumes = [
20+
v.split(':')[0] for v in self.volume_mounts if volume_requires_affinity(v)]
1921
for k in env_args:
2022
self.__envs[k] = str(env_args[k])
2123

24+
def metadata_spec(self):
25+
return {
26+
'labels': {
27+
'usesvolume': self.external_volumes[0],
28+
**{f'usesvolume-{v}': 'true' for v in self.external_volumes}
29+
}
30+
} if self.external_volumes else {}
31+
32+
def affinity_spec(self):
33+
return affinity_spec(([PodExecutionContext('usesvolume', self.external_volumes[0], True)] if self.external_volumes else []) + [
34+
PodExecutionContext(f'usesvolume-{v}', 'true', True)
35+
for v in self.external_volumes
36+
] )
37+
2238
@property
2339
def image_name(self):
2440
raise NotImplemented
@@ -75,6 +91,7 @@ def __init__(self, name, resources={}, image_pull_policy='IfNotPresent', command
7591
self.command = command
7692

7793
def spec(self):
94+
7895
spec = {
7996
'container': {
8097
'image': self.image_name,
@@ -84,11 +101,10 @@ def spec(self):
84101
'volumeMounts': self.volumes_mounts_spec(),
85102
},
86103
'inputs': {},
87-
'metadata': {},
104+
'metadata': self.metadata_spec(),
88105
'name': self.name,
89106
'outputs': {},
90-
'affinity': affinity_spec([PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volume_mounts if
91-
':' in v])
107+
'affinity': self.affinity_spec()
92108

93109
}
94110
if self.command is not None:
@@ -106,12 +122,11 @@ def __init__(self, name, source, **kwargs):
106122
self.source = source
107123

108124
def spec(self):
125+
109126
return {
110127
'name': self.name,
111-
'affinity': affinity_spec([
112-
PodExecutionContext('usesvolume', v.split(':')[0], True)
113-
for v in self.volume_mounts if ':' in v
114-
]),
128+
'affinity': self.affinity_spec(),
129+
'metadata': self.metadata_spec(),
115130
'script':
116131
{
117132
'image': self.image_name,

libraries/cloudharness-common/cloudharness/workflows/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ def get_workflow_name():
2525
remove = name.split("-")[-1]
2626
return name[0:-len(remove) - 1]
2727

28+
def volume_requires_affinity(v):
29+
return ':' in v and 'rwx' not in v[-4:]
2830

2931
def get_shared_directory():
3032
return os.getenv(SHARED_DIRECTORY_VARIABLE_NAME)

libraries/cloudharness-common/tests/test_workflow.py

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
set_test_environment()
99

10-
from cloudharness.workflows import operations, tasks
10+
from cloudharness.workflows import operations, tasks, utils
1111
from cloudharness import set_debug
1212
from cloudharness.workflows import argo
1313
from cloudharness.utils.config import CloudharnessConfig
@@ -23,7 +23,12 @@ def check_wf(wf):
2323

2424
assert wf["kind"] == "Workflow"
2525
assert "spec" in wf
26-
26+
27+
def test_volume_affinity_check():
28+
assert not utils.volume_requires_affinity("a")
29+
assert utils.volume_requires_affinity("a:b")
30+
assert utils.volume_requires_affinity("a:b:ro")
31+
assert not utils.volume_requires_affinity("a:b:rwx")
2732

2833
def test_sync_workflow():
2934
def f():
@@ -119,9 +124,44 @@ def test_single_task_shared():
119124
if accounts_offset == 1:
120125
assert wf['spec']['volumes'][1]['secret']['secretName'] == 'accounts'
121126
assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 2 + accounts_offset
127+
assert wf['spec']['templates'][0]['metadata']['labels']['usesvolume'] == 'myclaim'
128+
129+
affinity_glob = \
130+
wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector'][
131+
'matchExpressions'][0]
132+
assert affinity_glob['key'] == 'usesvolume'
133+
assert affinity_glob['values'][0] == 'myclaim'
134+
122135
if execute:
123136
print(op.execute())
124137

138+
def test_pipeline_shared():
139+
shared_directory = 'myclaim:/mnt/shared'
140+
task_write = operations.CustomTask('download-file', 'workflows-extract-download',
141+
url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md')
142+
task_script = tasks.BashTask('print-file', source="ls -la")
143+
op = operations.PipelineOperation('test-custom-connected-op-', [task_write, task_script],
144+
shared_directory=shared_directory, shared_volume_size=100)
145+
wf = op.to_workflow()
146+
147+
accounts_offset = 1 if is_accounts_present() else 0
148+
assert len(op.volumes) == 1
149+
assert len(wf['spec']['volumes']) == 2 + accounts_offset
150+
assert wf['spec']['volumes'][1+accounts_offset]['persistentVolumeClaim']['claimName'] == 'myclaim'
151+
if accounts_offset == 1:
152+
assert wf['spec']['volumes'][1]['secret']['secretName'] == 'accounts'
153+
assert len(wf['spec']['templates'][1]['container']['volumeMounts']) == 2 + accounts_offset
154+
assert wf['spec']['templates'][1]['metadata']['labels']['usesvolume'] == 'myclaim'
155+
assert wf['spec']['templates'][2]['metadata']['labels']['usesvolume'] == 'myclaim'
156+
affinity_glob = \
157+
wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector'][
158+
'matchExpressions'][0]
159+
assert affinity_glob['key'] == 'usesvolume'
160+
assert affinity_glob['values'][0] == 'myclaim'
161+
162+
if execute:
163+
print(op.execute())
164+
125165
def test_single_task_shared_rwx():
126166
shared_directory = 'myclaim:/mnt/shared:rwx'
127167
task_write = operations.CustomTask('download-file', 'workflows-extract-download',
@@ -139,9 +179,39 @@ def test_single_task_shared_rwx():
139179
assert wf['spec']['volumes'][1]['secret']['secretName'] == 'accounts'
140180
assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 2 + accounts_offset
141181

182+
183+
184+
142185
assert not 'affinity' in wf['spec'], "Pod affinity should not be added for rwx volumes"
143-
186+
144187
def test_single_task_volume_notshared():
188+
189+
task_write = operations.CustomTask('download-file', 'workflows-extract-download', volume_mounts=["a:b"],
190+
url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md')
191+
op = operations.SingleTaskOperation('test-custom-connected-op-', task_write, shared_volume_size=100)
192+
wf = op.to_workflow()
193+
194+
accounts_offset = 1 if is_accounts_present() else 0
195+
assert len(op.volumes) == 0
196+
assert len(wf['spec']['volumes']) == 2 + accounts_offset
197+
assert wf['spec']['volumes'][1+accounts_offset]['persistentVolumeClaim']['claimName'] == 'a'
198+
if accounts_offset == 1:
199+
assert wf['spec']['volumes'][1]['secret']['secretName'] == 'accounts'
200+
assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 2 + accounts_offset
201+
202+
203+
assert 'affinity' not in wf['spec']
204+
205+
affinity_tpl = \
206+
wf['spec']['templates'][0]['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector'][
207+
'matchExpressions'][0]
208+
assert affinity_tpl['key'] == 'usesvolume'
209+
assert affinity_tpl['values'][0] == 'a'
210+
assert wf['spec']['templates'][0]['metadata']['labels']['usesvolume'] == 'a'
211+
if execute:
212+
print(op.execute())
213+
214+
def test_single_task_volumes_notshared():
145215
shared_directory = 'myclaim:/mnt/shared'
146216
task_write = operations.CustomTask('download-file', 'workflows-extract-download', volume_mounts=["a:b"],
147217
url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md')
@@ -167,7 +237,7 @@ def test_single_task_volume_notshared():
167237
'matchExpressions'][0]
168238
assert affinity_tpl['key'] == 'usesvolume'
169239
assert affinity_tpl['values'][0] == 'a'
170-
240+
assert wf['spec']['templates'][0]['metadata']['labels']['usesvolume']
171241
if execute:
172242
print(op.execute())
173243

@@ -202,6 +272,7 @@ def test_single_task_shared_multiple():
202272
print(op.execute())
203273

204274

275+
205276
def test_single_task_shared_script():
206277
shared_directory = 'myclaim:/mnt/shared'
207278
task_write = tasks.BashTask('download-file', source="ls -la")

0 commit comments

Comments
 (0)