Skip to content

Commit a50fe18

Browse files
james-zhou-inspire11tyler-dunkel
authored andcommitted
feat(dataproc): add dataproc job
1 parent d9e606c commit a50fe18

10 files changed

Lines changed: 656 additions & 0 deletions

File tree

src/enums/schemasMap.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,6 @@ export default {
4242
[services.computeProject]: 'gcpComputeProject',
4343
[services.dataprocCluster]: 'gcpDataprocCluster',
4444
[services.dataprocAutoscalingPolicy]: 'gcpDataprocAutoscalingPolicy',
45+
[services.dataprocJob]: 'gcpDataprocJob',
4546
tag: 'gcpTag',
4647
}

src/enums/serviceMap.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import GcpApiKey from '../services/apiKey'
3838
import GcpComputeProject from '../services/computeProject'
3939
import GcpDataprocCluster from '../services/dataprocCluster'
4040
import GcpDataprocAutoscalingPolicy from '../services/dataprocAutoscalingPolicy'
41+
import GcpDataprocJob from '../services/dataprocJob'
4142

4243
/**
4344
* serviceMap is an object that contains all currently supported services
@@ -82,5 +83,6 @@ export default {
8283
[services.computeProject]: GcpComputeProject,
8384
[services.dataprocCluster]: GcpDataprocCluster,
8485
[services.dataprocAutoscalingPolicy]: GcpDataprocAutoscalingPolicy,
86+
[services.dataprocJob]: GcpDataprocJob,
8587
tag: GcpTag,
8688
}

src/enums/services.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export default {
3030
// dataflow: 'dataflow',
3131
dataprocCluster: 'dataprocClusters',
3232
dataprocAutoscalingPolicy: 'dataprocAutoscalingPolicies',
33+
dataprocJob: 'dataprocJobs',
3334
// metastore: 'metastore',
3435
// pubsub: 'pubsub',
3536
// bigtable: 'bigtable',

src/services/dataprocJob/data.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { JobControllerClient } from '@google-cloud/dataproc'
2+
import CloudGraph from '@cloudgraph/sdk'
3+
import groupBy from 'lodash/groupBy'
4+
import { google } from '@google-cloud/dataproc/build/protos/protos'
5+
import gcpLoggerText from '../../properties/logger'
6+
import { GcpServiceInput } from '../../types'
7+
import { generateGcpErrorLog } from '../../utils'
8+
9+
const lt = { ...gcpLoggerText }
10+
const { logger } = CloudGraph
11+
const serviceName = 'Dataproc Job'
12+
13+
export interface RawGcpDataprocJob extends Omit<google.cloud.dataproc.v1.IJob, 'jobUuid'|'labels'> {
14+
id: string
15+
region: string
16+
projectId: string
17+
Labels: { [key: string]: string }
18+
}
19+
20+
export default async ({
21+
regions,
22+
config,
23+
}: GcpServiceInput): Promise<{
24+
[region: string]: RawGcpDataprocJob[]
25+
}> => {
26+
const jobList: RawGcpDataprocJob[] = []
27+
const { projectId } = config
28+
29+
for (const region of regions.split(',')) {
30+
/**
31+
* Get all the Dataproc Job
32+
*/
33+
34+
try {
35+
const dataprocClient = new JobControllerClient({
36+
...config,
37+
apiEndpoint: `${region}-dataproc.googleapis.com`,
38+
projectId,
39+
})
40+
41+
const iterable = dataprocClient.listJobsAsync({ projectId, region })
42+
for await (const { jobUuid, labels, ...response } of iterable) {
43+
if (response) {
44+
jobList.push({
45+
...response,
46+
id: jobUuid,
47+
projectId,
48+
region,
49+
Labels: labels,
50+
})
51+
}
52+
}
53+
} catch (error) {
54+
generateGcpErrorLog(serviceName, 'dataprocJob:listJobsAsync', error)
55+
}
56+
}
57+
58+
logger.debug(lt.foundResources(serviceName, jobList.length))
59+
return groupBy(jobList, 'region')
60+
}

src/services/dataprocJob/format.ts

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
import cuid from 'cuid'
2+
import { google } from '@google-cloud/dataproc/build/protos/protos'
3+
import {
4+
GcpDataprocHadoopJob,
5+
GcpDataprocHiveJob,
6+
GcpDataprocJob,
7+
GcpDataprocJobStatus,
8+
GcpDataprocJobYarnApplication,
9+
GcpDataprocPigJob,
10+
GcpDataprocPrestoJob,
11+
GcpDataprocPysparkJob,
12+
GcpDataprocSparkJob,
13+
GcpDataprocspArkRJob,
14+
GcpDataprocSparkSqlJob,
15+
GcpKeyValue,
16+
} from '../../types/generated'
17+
import { RawGcpDataprocJob } from './data'
18+
import { toISOString } from '../../utils/dateutils'
19+
import { enumKeyToString, formatKeyValueMap, formatLabelsFromMap } from '../../utils/format'
20+
21+
const formatLoggingConfig = (loggingConfig: { [k: string]: google.cloud.dataproc.v1.LoggingConfig.Level }): GcpKeyValue[] => {
22+
return Object.keys(loggingConfig || {}).map(key => ({
23+
id: cuid(),
24+
key,
25+
value: enumKeyToString(google.cloud.dataproc.v1.LoggingConfig.Level, loggingConfig[key]),
26+
}))
27+
}
28+
29+
const formatStatus = ({
30+
state,
31+
details,
32+
stateStartTime,
33+
substate,
34+
}: google.cloud.dataproc.v1.IJobStatus): GcpDataprocJobStatus => {
35+
return {
36+
id: cuid(),
37+
state: enumKeyToString(google.cloud.dataproc.v1.JobStatus.State, state),
38+
details,
39+
stateStartTime: toISOString(stateStartTime?.seconds?.toString()) || '',
40+
substate: enumKeyToString(google.cloud.dataproc.v1.JobStatus.Substate, substate),
41+
}
42+
}
43+
44+
const formatYarnApplication = ({
45+
name,
46+
state,
47+
progress,
48+
trackingUrl,
49+
}: google.cloud.dataproc.v1.IYarnApplication): GcpDataprocJobYarnApplication => {
50+
return {
51+
id: cuid(),
52+
name,
53+
state: enumKeyToString(google.cloud.dataproc.v1.YarnApplication.State, state),
54+
progress,
55+
trackingUrl,
56+
}
57+
}
58+
59+
const formatHadoopJob = ({
60+
mainJarFileUri,
61+
mainClass,
62+
args,
63+
jarFileUris,
64+
fileUris,
65+
archiveUris,
66+
properties = {},
67+
loggingConfig = {},
68+
}: google.cloud.dataproc.v1.IHadoopJob): GcpDataprocHadoopJob => {
69+
return {
70+
mainJarFileUri,
71+
mainClass,
72+
args,
73+
jarFileUris,
74+
fileUris,
75+
archiveUris,
76+
properties: formatKeyValueMap(properties),
77+
loggingConfig: formatLoggingConfig(loggingConfig?.driverLogLevels || {}),
78+
}
79+
}
80+
81+
const formatSparkJob = ({
82+
mainJarFileUri,
83+
mainClass,
84+
args,
85+
jarFileUris,
86+
fileUris,
87+
archiveUris,
88+
properties = {},
89+
loggingConfig = {},
90+
}: google.cloud.dataproc.v1.ISparkJob): GcpDataprocSparkJob => {
91+
return {
92+
mainJarFileUri,
93+
mainClass,
94+
args,
95+
jarFileUris,
96+
fileUris,
97+
archiveUris,
98+
properties: formatKeyValueMap(properties || {}),
99+
loggingConfig: formatLoggingConfig(loggingConfig?.driverLogLevels || {}),
100+
}
101+
}
102+
103+
const formatPysparkJob = ({
104+
mainPythonFileUri,
105+
args = [],
106+
pythonFileUris = [],
107+
jarFileUris = [],
108+
fileUris = [],
109+
archiveUris = [],
110+
properties = {},
111+
loggingConfig = {},
112+
}: google.cloud.dataproc.v1.IPySparkJob): GcpDataprocPysparkJob => {
113+
return {
114+
mainPythonFileUri,
115+
args,
116+
pythonFileUris,
117+
jarFileUris,
118+
fileUris,
119+
archiveUris,
120+
properties: formatKeyValueMap(properties || {}),
121+
loggingConfig: formatLoggingConfig(loggingConfig?.driverLogLevels || {}),
122+
}
123+
}
124+
125+
const formatHiveJob = ({
126+
queryFileUri,
127+
queryList = {},
128+
continueOnFailure = false,
129+
scriptVariables = {},
130+
properties = {},
131+
jarFileUris = [],
132+
}: google.cloud.dataproc.v1.IHiveJob): GcpDataprocHiveJob => {
133+
return {
134+
queryFileUri,
135+
queryList: queryList?.queries || [],
136+
continueOnFailure,
137+
scriptVariables: formatKeyValueMap(scriptVariables || {}),
138+
properties: formatKeyValueMap(properties || {}),
139+
jarFileUris,
140+
}
141+
}
142+
143+
const formatPigJob = ({
144+
queryFileUri,
145+
queryList = {},
146+
continueOnFailure = false,
147+
scriptVariables = {},
148+
properties = {},
149+
jarFileUris = [],
150+
loggingConfig = {},
151+
}: google.cloud.dataproc.v1.IPigJob): GcpDataprocPigJob => {
152+
return {
153+
queryFileUri,
154+
queryList: queryList?.queries || [],
155+
continueOnFailure,
156+
scriptVariables: formatKeyValueMap(scriptVariables || {}),
157+
properties: formatKeyValueMap(properties || {}),
158+
jarFileUris,
159+
loggingConfig: formatLoggingConfig(loggingConfig?.driverLogLevels || {}),
160+
}
161+
}
162+
163+
const formatSparkRJob = ({
164+
mainRFileUri,
165+
args = [],
166+
fileUris = [],
167+
archiveUris = [],
168+
properties = {},
169+
loggingConfig = {},
170+
}: google.cloud.dataproc.v1.ISparkRJob): GcpDataprocspArkRJob => {
171+
return {
172+
mainRFileUri,
173+
args,
174+
fileUris,
175+
archiveUris,
176+
properties: formatKeyValueMap(properties || {}),
177+
loggingConfig: formatLoggingConfig(loggingConfig?.driverLogLevels || {}),
178+
}
179+
}
180+
181+
const formatSparkSqlJob = ({
182+
queryFileUri,
183+
queryList = {},
184+
scriptVariables = {},
185+
properties = {},
186+
jarFileUris = [],
187+
loggingConfig = {},
188+
}: google.cloud.dataproc.v1.ISparkSqlJob): GcpDataprocSparkSqlJob => {
189+
return {
190+
queryFileUri,
191+
queryList: queryList?.queries || [],
192+
scriptVariables: formatKeyValueMap(scriptVariables || {}),
193+
properties: formatKeyValueMap(properties || {}),
194+
jarFileUris: jarFileUris || [],
195+
loggingConfig: formatLoggingConfig(loggingConfig?.driverLogLevels || {}),
196+
}
197+
}
198+
199+
const formatPrestoJob = ({
200+
queryFileUri,
201+
queryList = {},
202+
continueOnFailure = false,
203+
outputFormat,
204+
clientTags = [],
205+
properties = {},
206+
loggingConfig = {},
207+
}: google.cloud.dataproc.v1.IPrestoJob): GcpDataprocPrestoJob => {
208+
return {
209+
queryFileUri,
210+
queryList: queryList?.queries || [],
211+
continueOnFailure: continueOnFailure || false,
212+
outputFormat,
213+
clientTags,
214+
properties: formatKeyValueMap(properties || {}),
215+
loggingConfig: formatLoggingConfig(loggingConfig?.driverLogLevels || {}),
216+
}
217+
}
218+
219+
export default ({
220+
service,
221+
region,
222+
}: {
223+
service: RawGcpDataprocJob
224+
region: string
225+
}): GcpDataprocJob => {
226+
const {
227+
id,
228+
projectId,
229+
reference,
230+
placement,
231+
hadoopJob = {},
232+
sparkJob = {},
233+
pysparkJob = {},
234+
hiveJob = {},
235+
pigJob = {},
236+
sparkRJob = {},
237+
sparkSqlJob = {},
238+
prestoJob = {},
239+
status = {},
240+
statusHistory = [],
241+
yarnApplications = [],
242+
driverOutputResourceUri,
243+
driverControlFilesUri,
244+
scheduling = {},
245+
done,
246+
Labels = {},
247+
} = service
248+
249+
return {
250+
id,
251+
projectId,
252+
region,
253+
name: reference?.jobId || '',
254+
placementClusterName: placement?.clusterName || '',
255+
placementClusterUuid: placement?.clusterUuid || '',
256+
placementClusterLabels: formatKeyValueMap(placement?.clusterLabels || {}),
257+
hadoopJob: formatHadoopJob(hadoopJob),
258+
sparkJob: formatSparkJob(sparkJob),
259+
pysparkJob: formatPysparkJob(pysparkJob),
260+
hiveJob: formatHiveJob( hiveJob),
261+
pigJob: formatPigJob(pigJob),
262+
sparkRJob: formatSparkRJob(sparkRJob),
263+
sparkSqlJob: formatSparkSqlJob(sparkSqlJob),
264+
prestoJob: formatPrestoJob(prestoJob),
265+
status: formatStatus(status),
266+
statusHistory: statusHistory?.map(formatStatus),
267+
yarnApplications: yarnApplications?.map(formatYarnApplication),
268+
driverOutputResourceUri,
269+
driverControlFilesUri,
270+
schedulingMaxFailuresPerHour: scheduling?.maxFailuresPerHour || 0,
271+
schedulingMaxFailuresTotal: scheduling?.maxFailuresTotal || 0,
272+
done,
273+
labels: formatLabelsFromMap(Labels),
274+
}
275+
}

src/services/dataprocJob/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import {Service} from '@cloudgraph/sdk'
2+
import BaseService from '../base'
3+
import format from './format'
4+
import getData from './data'
5+
import mutation from './mutation'
6+
7+
export default class GcpDataprocCluster extends BaseService implements Service {
8+
format = format.bind(this)
9+
10+
getData = getData.bind(this)
11+
12+
mutation = mutation
13+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export default `mutation($input: [AddgcpDataprocJobInput!]!) {
2+
addgcpDataprocJob(input: $input, upsert: true) {
3+
numUids
4+
}
5+
}`;

0 commit comments

Comments
 (0)