Skip to content

Commit 503b32f

Browse files
james-zhou-inspire11tyler-dunkel
authored andcommitted
feat(dataproc): add workflow template
1 parent 5cf4683 commit 503b32f

16 files changed

Lines changed: 716 additions & 18 deletions

File tree

src/enums/schemasMap.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,6 @@ export default {
4343
[services.dataprocCluster]: 'gcpDataprocCluster',
4444
[services.dataprocAutoscalingPolicy]: 'gcpDataprocAutoscalingPolicy',
4545
[services.dataprocJob]: 'gcpDataprocJob',
46+
[services.dataprocWorkflowTemplate]: 'gcpDataprocWorkflowTemplate',
4647
tag: 'gcpTag',
4748
}

src/enums/serviceMap.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import GcpComputeProject from '../services/computeProject'
3939
import GcpDataprocCluster from '../services/dataprocCluster'
4040
import GcpDataprocAutoscalingPolicy from '../services/dataprocAutoscalingPolicy'
4141
import GcpDataprocJob from '../services/dataprocJob'
42-
42+
import GcpDataprocWorkflowTemplate from '../services/dataprocWorkflowTemplate'
4343
/**
4444
* serviceMap is an object that contains all currently supported services
4545
* serviceMap is used by the serviceFactory to produce instances of service classes
@@ -84,5 +84,6 @@ export default {
8484
[services.dataprocCluster]: GcpDataprocCluster,
8585
[services.dataprocAutoscalingPolicy]: GcpDataprocAutoscalingPolicy,
8686
[services.dataprocJob]: GcpDataprocJob,
87+
[services.dataprocWorkflowTemplate]: GcpDataprocWorkflowTemplate,
8788
tag: GcpTag,
8889
}

src/enums/services.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export default {
3131
dataprocCluster: 'dataprocClusters',
3232
dataprocAutoscalingPolicy: 'dataprocAutoscalingPolicies',
3333
dataprocJob: 'dataprocJobs',
34+
dataprocWorkflowTemplate: 'dataprocWorkflowTemplates',
3435
// metastore: 'metastore',
3536
// pubsub: 'pubsub',
3637
// bigtable: 'bigtable',

src/services/dataprocCluster/connections.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import isEmpty from 'lodash/isEmpty'
22
import { ServiceConnection } from '@cloudgraph/sdk'
33
import { RawGcpDataprocCluster } from './data'
44
import services from '../../enums/services'
5+
import { RawGcpDataprocWorkflowTemplate } from '../dataprocWorkflowTemplate/data'
56

67
export default ({
78
service,
@@ -14,7 +15,7 @@ export default ({
1415
}): {
1516
[property: string]: ServiceConnection[]
1617
} => {
17-
const { id } = service
18+
const { id, Labels } = service
1819
const connections: ServiceConnection[] = []
1920

2021
/**
@@ -26,7 +27,6 @@ export default ({
2627
} = data.find(({ name }) => name === services.dataprocJob)
2728

2829
if (jobs?.data?.[region]) {
29-
3030
const filtered = jobs.data[region].filter(
3131
({ placement }) => placement.clusterUuid === id
3232
)
@@ -42,6 +42,35 @@ export default ({
4242
}
4343
}
4444

45+
/**
46+
* Find Dataproc Workflow Template
47+
*/
48+
const flatClusterLabels = Object.keys(Labels || {}).map(key => `${key}:${Labels[key]}`)
49+
50+
const templates: {
51+
name: string
52+
data: { [property: string]: any[] }
53+
} = data.find(({ name }) => name === services.dataprocWorkflowTemplate)
54+
55+
if (templates?.data?.[region]) {
56+
const filtered = templates.data[region]
57+
.filter((template: RawGcpDataprocWorkflowTemplate) => !isEmpty(
58+
Object.keys(template?.placement?.clusterSelector?.clusterLabels || {})
59+
.map(key => `${key}:${template.placement.clusterSelector.clusterLabels[key]}`)
60+
.filter(cluster => flatClusterLabels.includes(cluster))
61+
)
62+
)
63+
64+
for (const { id } of filtered) {
65+
connections.push({
66+
id,
67+
resourceType: services.dataprocWorkflowTemplate,
68+
relation: 'child',
69+
field: 'dataprocWorkflowTemplates',
70+
})
71+
}
72+
}
73+
4574
const result = {
4675
[id]: connections,
4776
}

src/services/dataprocCluster/format.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ const formatClusterConfigGke = ({
172172
}
173173
}
174174

175-
const formatClusterConfig = ({
175+
export const formatClusterConfig = ({
176176
configBucket,
177177
tempBucket,
178178
gceClusterConfig = {},

src/services/dataprocCluster/schema.graphql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,5 +197,6 @@ type gcpDataprocCluster implements gcpBaseResource
197197
hdfsMetrics: [gcpDataprocClusterMetric]
198198
yarnMetrics: [gcpDataprocClusterMetric]
199199
dataprocJobs: [gcpDataprocJob] @hasInverse(field: dataprocClusters)
200+
dataprocWorkflowTemplates: [gcpDataprocWorkflowTemplate] @hasInverse(field: dataprocClusters)
200201
project: [gcpProject] @hasInverse(field: dataprocClusters)
201202
}

src/services/dataprocJob/format.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import {
88
GcpDataprocJobYarnApplication,
99
GcpDataprocPigJob,
1010
GcpDataprocPrestoJob,
11-
GcpDataprocPysparkJob,
11+
GcpDataprocPySparkJob,
1212
GcpDataprocSparkJob,
13-
GcpDataprocspArkRJob,
13+
GcpDataprocSparkRJob,
1414
GcpDataprocSparkSqlJob,
1515
GcpKeyValue,
1616
} from '../../types/generated'
@@ -109,7 +109,7 @@ const formatPysparkJob = ({
109109
archiveUris = [],
110110
properties = {},
111111
loggingConfig = {},
112-
}: google.cloud.dataproc.v1.IPySparkJob): GcpDataprocPysparkJob => {
112+
}: google.cloud.dataproc.v1.IPySparkJob): GcpDataprocPySparkJob => {
113113
return {
114114
mainPythonFileUri,
115115
args,
@@ -167,7 +167,7 @@ const formatSparkRJob = ({
167167
archiveUris = [],
168168
properties = {},
169169
loggingConfig = {},
170-
}: google.cloud.dataproc.v1.ISparkRJob): GcpDataprocspArkRJob => {
170+
}: google.cloud.dataproc.v1.ISparkRJob): GcpDataprocSparkRJob => {
171171
return {
172172
mainRFileUri,
173173
args,
@@ -256,7 +256,7 @@ export default ({
256256
placementClusterLabels: formatKeyValueMap(placement?.clusterLabels || {}),
257257
hadoopJob: formatHadoopJob(hadoopJob),
258258
sparkJob: formatSparkJob(sparkJob),
259-
pysparkJob: formatPysparkJob(pysparkJob),
259+
pySparkJob: formatPysparkJob(pysparkJob),
260260
hiveJob: formatHiveJob( hiveJob),
261261
pigJob: formatPigJob(pigJob),
262262
sparkRJob: formatSparkRJob(sparkRJob),

src/services/dataprocJob/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import format from './format'
44
import getData from './data'
55
import mutation from './mutation'
66

7-
export default class GcpDataprocCluster extends BaseService implements Service {
7+
export default class GcpDataprocJob extends BaseService implements Service {
88
format = format.bind(this)
99

1010
getData = getData.bind(this)

src/services/dataprocJob/schema.graphql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type gcpDataprocSparkJob
5858
loggingConfig: [gcpKeyValue]
5959
}
6060

61-
type gcpDataprocPysparkJob
61+
type gcpDataprocPySparkJob
6262
@generate(
6363
query: { get: true, query: true, aggregate: true }
6464
mutation: { add: true, delete: false }
@@ -103,7 +103,7 @@ type gcpDataprocPigJob
103103
loggingConfig: [gcpKeyValue]
104104
}
105105

106-
type gcpDataprocspArkRJob
106+
type gcpDataprocSparkRJob
107107
@generate(
108108
query: { get: true, query: true, aggregate: true }
109109
mutation: { add: true, delete: false }
@@ -158,10 +158,10 @@ type gcpDataprocJob implements gcpBaseResource
158158
placementClusterLabels: [gcpKeyValue]
159159
hadoopJob: gcpDataprocHadoopJob
160160
sparkJob: gcpDataprocSparkJob
161-
pysparkJob: gcpDataprocPysparkJob
161+
pySparkJob: gcpDataprocPySparkJob
162162
hiveJob: gcpDataprocHiveJob
163163
pigJob: gcpDataprocPigJob
164-
sparkRJob: gcpDataprocspArkRJob
164+
sparkRJob: gcpDataprocSparkRJob
165165
sparkSqlJob: gcpDataprocSparkSqlJob
166166
prestoJob: gcpDataprocPrestoJob
167167
status: gcpDataprocJobStatus
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { WorkflowTemplateServiceClient } from '@google-cloud/dataproc'
2+
import CloudGraph from '@cloudgraph/sdk'
3+
import groupBy from 'lodash/groupBy'
4+
import { google } from '@google-cloud/dataproc/build/protos/protos'
5+
import gcpLoggerText from '../../properties/logger'
6+
import { GcpServiceInput } from '../../types'
7+
import { generateGcpErrorLog } from '../../utils'
8+
9+
const lt = { ...gcpLoggerText }
10+
const { logger } = CloudGraph
11+
const serviceName = 'Dataproc Job'
12+
13+
export interface RawGcpDataprocWorkflowTemplate extends Omit<google.cloud.dataproc.v1.IWorkflowTemplate, 'labels'> {
14+
region: string
15+
projectId: string
16+
Labels: { [key: string]: string }
17+
}
18+
19+
export default async ({
20+
regions,
21+
config,
22+
}: GcpServiceInput): Promise<{
23+
[region: string]: RawGcpDataprocWorkflowTemplate[]
24+
}> => {
25+
const workflowTemplateList: RawGcpDataprocWorkflowTemplate[] = []
26+
const { projectId } = config
27+
28+
for (const region of regions.split(',')) {
29+
/**
30+
* Get all the Dataproc Workflow Template
31+
*/
32+
33+
try {
34+
const dataprocClient = new WorkflowTemplateServiceClient({
35+
...config,
36+
apiEndpoint: `${region}-dataproc.googleapis.com`,
37+
projectId,
38+
})
39+
40+
const iterable = dataprocClient.listWorkflowTemplatesAsync({
41+
parent: `projects/${projectId}/regions/${region}`
42+
})
43+
for await (const { labels, ...response } of iterable) {
44+
if (response) {
45+
workflowTemplateList.push({
46+
...response,
47+
projectId,
48+
region,
49+
Labels: labels,
50+
})
51+
}
52+
}
53+
} catch (error) {
54+
generateGcpErrorLog(serviceName, 'dataprocWorkflowTemplate:listWorkflowTemplatesAsync', error)
55+
}
56+
}
57+
58+
logger.debug(lt.foundResources(serviceName, workflowTemplateList.length))
59+
return groupBy(workflowTemplateList, 'region')
60+
}
61+

0 commit comments

Comments
 (0)