Skip to content

Commit 81a56c9

Browse files
james-zhou-inspire11tyler-dunkel
authored andcommitted
feat(dataproc): add dataproc cluster
1 parent 83efe7c commit 81a56c9

14 files changed

Lines changed: 714 additions & 23 deletions

File tree

src/enums/schemasMap.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,6 @@ export default {
4040
[services.serviceAccount]: 'gcpServiceAccount',
4141
[services.apiKey]: 'gcpApiKey',
4242
[services.computeProject]: 'gcpComputeProject',
43+
[services.dataprocCluster]: 'gcpDataprocCluster',
4344
tag: 'gcpTag',
4445
}

src/enums/serviceMap.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import GcpSslPolicies from '../services/sslPolicy'
3636
import GcpServiceAccount from '../services/serviceAccount'
3737
import GcpApiKey from '../services/apiKey'
3838
import GcpComputeProject from '../services/computeProject'
39+
import GcpDataprocCluster from '../services/dataprocCluster'
3940

4041
/**
4142
* serviceMap is an object that contains all currently supported services
@@ -78,5 +79,6 @@ export default {
7879
[services.serviceAccount]: GcpServiceAccount,
7980
[services.apiKey]: GcpApiKey,
8081
[services.computeProject]: GcpComputeProject,
82+
[services.dataprocCluster]: GcpDataprocCluster,
8183
tag: GcpTag,
8284
}

src/enums/services.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export default {
2828
// composer: 'composer',
2929
// dataCatalog: 'data-catalog',
3030
// dataflow: 'dataflow',
31-
// dataproc: 'dataproc',
31+
dataprocCluster: 'dataprocCluster',
3232
// metastore: 'metastore',
3333
// pubsub: 'pubsub',
3434
// bigtable: 'bigtable',

src/services/base/schema.graphql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,15 @@ interface gcpBaseResource
1111
kind: String @search(by: [hash, regexp])
1212
labels: [gcpRawLabel]
1313
}
14+
15+
type gcpKeyValue
16+
@generate(
17+
query: { get: true, query: true, aggregate: true }
18+
mutation: { add: true, delete: false }
19+
subscription: false
20+
)
21+
@key(fields: "id") {
22+
id: String! @id @search(by: [hash])
23+
key: String! @search(by: [hash, regexp])
24+
value: String @search(by: [hash, regexp])
25+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import cuid from 'cuid'
2+
import { ClusterControllerClient } from '@google-cloud/dataproc'
3+
import CloudGraph from '@cloudgraph/sdk'
4+
import groupBy from 'lodash/groupBy'
5+
import { google } from '@google-cloud/dataproc/build/protos/protos'
6+
import gcpLoggerText from '../../properties/logger'
7+
import { GcpServiceInput } from '../../types'
8+
import { initTestEndpoint, generateGcpErrorLog } from '../../utils'
9+
10+
const lt = { ...gcpLoggerText }
11+
const { logger } = CloudGraph
12+
const serviceName = 'Dataproc Cluster'
13+
const apiEndpoint = initTestEndpoint(serviceName)
14+
15+
export interface RawGcpDataprocCluster extends Omit<google.cloud.dataproc.v1.ICluster, 'projectId' | 'labels'> {
16+
id: string
17+
region: string
18+
projectId: string
19+
Labels: { [key: string]: string }
20+
}
21+
22+
export default async ({
23+
regions,
24+
config,
25+
}: GcpServiceInput): Promise<{
26+
[region: string]: RawGcpDataprocCluster[]
27+
}> => {
28+
const clusterList: RawGcpDataprocCluster[] = []
29+
const { projectId } = config
30+
31+
for (const region of regions.split(',')) {
32+
/**
33+
* Get all the Dataproc Cluster
34+
*/
35+
36+
try {
37+
const dataprocClient = new ClusterControllerClient({ ...config, apiEndpoint })
38+
39+
const iterable = dataprocClient.listClustersAsync({ projectId, region })
40+
for await (const { labels, ...response } of iterable) {
41+
if (response) {
42+
clusterList.push({
43+
...response,
44+
id: cuid(),
45+
projectId,
46+
region,
47+
Labels: labels,
48+
})
49+
}
50+
}
51+
} catch (error) {
52+
generateGcpErrorLog(serviceName, 'dataprocCluster:listClustersAsync', error)
53+
}
54+
}
55+
56+
logger.debug(lt.foundResources(serviceName, clusterList.length))
57+
return groupBy(clusterList, 'region')
58+
}
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
import cuid from 'cuid'
2+
import { google } from '@google-cloud/dataproc/build/protos/protos'
3+
import {
4+
GcpDataprocCluster,
5+
GcpDataprocClusterConfig,
6+
GcpDataprocClusterConfigEndpoint,
7+
GcpDataprocClusterConfigGceCluster,
8+
GcpDataprocClusterConfigGke,
9+
GcpDataprocClusterConfigInstanceGroup,
10+
GcpDataprocClusterConfigLifecycleConfig,
11+
GcpDataprocClusterConfigNodeInitializationAction,
12+
GcpDataprocClusterConfigSecurity,
13+
GcpDataprocClusterConfigSoftware,
14+
GcpDataprocClusterStatus,
15+
} from '../../types/generated'
16+
import { RawGcpDataprocCluster } from './data'
17+
import { toISOString } from '../../utils/dateutils'
18+
import { enumKeyToString, formatKeyValueMap, formatLabelsFromMap } from '../../utils/format'
19+
20+
const formatClusterConfigGceCluster = ({
21+
zoneUri,
22+
networkUri,
23+
subnetworkUri,
24+
internalIpOnly,
25+
privateIpv6GoogleAccess,
26+
serviceAccount,
27+
serviceAccountScopes,
28+
tags,
29+
metadata = {},
30+
reservationAffinity,
31+
nodeGroupAffinity,
32+
shieldedInstanceConfig,
33+
confidentialInstanceConfig,
34+
}: google.cloud.dataproc.v1.IGceClusterConfig): GcpDataprocClusterConfigGceCluster => {
35+
return {
36+
zoneUri,
37+
networkUri,
38+
subnetworkUri,
39+
internalIpOnly,
40+
privateIpv6GoogleAccess: enumKeyToString(google.cloud.dataproc.v1.GceClusterConfig.PrivateIpv6GoogleAccess, privateIpv6GoogleAccess),
41+
serviceAccount,
42+
serviceAccountScopes,
43+
tags,
44+
metadata: formatKeyValueMap(metadata),
45+
consumeReservationType: enumKeyToString(google.cloud.dataproc.v1.ReservationAffinity.Type, reservationAffinity?.consumeReservationType),
46+
reservationAffinityKey: reservationAffinity?.key || '',
47+
reservationAffinityValues: reservationAffinity?.values || [],
48+
nodeGroupAffinityNodeGroupUri: nodeGroupAffinity?.nodeGroupUri || '',
49+
shieldedInstanceConfigEnableSecureBoot: shieldedInstanceConfig?.enableSecureBoot || false,
50+
shieldedInstanceConfigEnableVtpm: shieldedInstanceConfig?.enableVtpm || false,
51+
shieldedInstanceConfigEnableIntegrityMonitoring: shieldedInstanceConfig?.enableIntegrityMonitoring || false,
52+
confidentialInstanceConfigEnableConfidentialCompute: confidentialInstanceConfig?.enableConfidentialCompute || false,
53+
}
54+
}
55+
56+
const formatClusterConfigInstanceGroup = ({
57+
numInstances,
58+
instanceNames,
59+
imageUri,
60+
machineTypeUri,
61+
diskConfig = {},
62+
isPreemptible,
63+
preemptibility,
64+
managedGroupConfig = {},
65+
accelerators = [],
66+
minCpuPlatform,
67+
}: google.cloud.dataproc.v1.IInstanceGroupConfig): GcpDataprocClusterConfigInstanceGroup => {
68+
return {
69+
numInstances,
70+
instanceNames,
71+
imageUri,
72+
machineTypeUri,
73+
diskConfigBootDiskType: diskConfig?.bootDiskType || '',
74+
diskConfigBootDiskSizeGb: diskConfig?.bootDiskSizeGb || 0,
75+
diskConfigNumLocalSsds: diskConfig?.numLocalSsds || 0,
76+
isPreemptible,
77+
preemptibility: enumKeyToString(google.cloud.dataproc.v1.InstanceGroupConfig.Preemptibility, preemptibility),
78+
managedGroupConfigInstanceTemplateName: managedGroupConfig?.instanceTemplateName || '',
79+
managedGroupConfigInstanceGroupManagerName: managedGroupConfig?.instanceGroupManagerName || '',
80+
accelerators: accelerators?.map(({
81+
acceleratorTypeUri,
82+
acceleratorCount,
83+
}) => {
84+
return {
85+
id: cuid(),
86+
acceleratorTypeUri,
87+
acceleratorCount,
88+
}}),
89+
minCpuPlatform,
90+
}
91+
}
92+
93+
const formatClusterConfigSoftware = ({
94+
imageVersion,
95+
properties,
96+
optionalComponents = [],
97+
}: google.cloud.dataproc.v1.ISoftwareConfig): GcpDataprocClusterConfigSoftware => {
98+
return {
99+
imageVersion,
100+
properties: formatKeyValueMap(properties),
101+
optionalComponents: optionalComponents?.map(component => {
102+
return enumKeyToString(google.cloud.dataproc.v1.Component, component)
103+
}),
104+
}
105+
}
106+
107+
const formatClusterConfigNodeInitializationAction = ({
108+
executableFile,
109+
executionTimeout,
110+
}: google.cloud.dataproc.v1.INodeInitializationAction): GcpDataprocClusterConfigNodeInitializationAction => {
111+
return {
112+
id: cuid(),
113+
executableFile,
114+
executionTimeout: executionTimeout?.seconds?.toString() || '',
115+
}
116+
}
117+
118+
const formatClusterConfigSecurity = ({
119+
kerberosConfig = {},
120+
identityConfig = {},
121+
}: google.cloud.dataproc.v1.ISecurityConfig): GcpDataprocClusterConfigSecurity => {
122+
return {
123+
kerberosConfigEnableKerberos: kerberosConfig?.enableKerberos || false,
124+
kerberosConfigRootPrincipalPasswordUri: kerberosConfig?.rootPrincipalPasswordUri || '',
125+
kerberosConfigKmsKeyUri: kerberosConfig?.kmsKeyUri || '',
126+
kerberosConfigKeystoreUri: kerberosConfig?.keystoreUri || '',
127+
kerberosConfigTruststoreUri: kerberosConfig?.truststoreUri || '',
128+
kerberosConfigKeystorePasswordUri: kerberosConfig?.keystorePasswordUri || '',
129+
kerberosConfigKeyPasswordUri: kerberosConfig?.keyPasswordUri || '',
130+
kerberosConfigTruststorePasswordUri: kerberosConfig?.truststorePasswordUri || '',
131+
kerberosConfigCrossRealmTrustRealm: kerberosConfig?.crossRealmTrustRealm || '',
132+
kerberosConfigCrossRealmTrustKdc: kerberosConfig?.crossRealmTrustKdc || '',
133+
kerberosConfigCrossRealmTrustAdminServer: kerberosConfig?.crossRealmTrustAdminServer || '',
134+
kerberosConfigCrossRealmTrustSharedPasswordUri: kerberosConfig?.crossRealmTrustSharedPasswordUri || '',
135+
kerberosConfigKdcDbKeyUri: kerberosConfig?.kdcDbKeyUri || '',
136+
kerberosConfigTgtLifetimeHours: kerberosConfig?.tgtLifetimeHours || 0,
137+
kerberosConfigRealm: kerberosConfig?.realm || '',
138+
identityConfigUserServiceAccountMapping: formatKeyValueMap(identityConfig?.userServiceAccountMapping),
139+
}
140+
}
141+
142+
const formatClusterConfigLifecycle = ({
143+
idleDeleteTtl,
144+
autoDeleteTime,
145+
autoDeleteTtl,
146+
idleStartTime,
147+
}: google.cloud.dataproc.v1.ILifecycleConfig): GcpDataprocClusterConfigLifecycleConfig => {
148+
return {
149+
idleDeleteTtl: idleDeleteTtl.seconds?.toString() || '',
150+
autoDeleteTime: toISOString(autoDeleteTime?.seconds?.toString()),
151+
autoDeleteTtl: autoDeleteTtl.seconds?.toString() || '',
152+
idleStartTime: toISOString(idleStartTime?.seconds?.toString()),
153+
}
154+
}
155+
156+
const formatClusterConfigEndpoint = ({
157+
httpPorts,
158+
enableHttpPortAccess,
159+
}: google.cloud.dataproc.v1.IEndpointConfig): GcpDataprocClusterConfigEndpoint => {
160+
return {
161+
httpPorts: formatKeyValueMap(httpPorts),
162+
enableHttpPortAccess,
163+
}
164+
}
165+
166+
const formatClusterConfigGke = ({
167+
namespacedGkeDeploymentTarget,
168+
}: google.cloud.dataproc.v1.IGkeClusterConfig): GcpDataprocClusterConfigGke => {
169+
return {
170+
namespacedGkeDeploymentTargetTargetGkeCluster: namespacedGkeDeploymentTarget?.targetGkeCluster || '',
171+
namespacedGkeDeploymentTargetClusterNamespace: namespacedGkeDeploymentTarget?.clusterNamespace || '',
172+
}
173+
}
174+
175+
const formatClusterConfig = ({
176+
configBucket,
177+
tempBucket,
178+
gceClusterConfig = {},
179+
masterConfig = {},
180+
workerConfig = {},
181+
secondaryWorkerConfig = {},
182+
softwareConfig = {},
183+
initializationActions = [],
184+
encryptionConfig = {},
185+
autoscalingConfig = {},
186+
securityConfig = {},
187+
lifecycleConfig = {},
188+
endpointConfig = {},
189+
metastoreConfig = {},
190+
gkeClusterConfig = {},
191+
}: google.cloud.dataproc.v1.IClusterConfig): GcpDataprocClusterConfig => {
192+
return {
193+
configBucket,
194+
tempBucket,
195+
gceClusterConfig: formatClusterConfigGceCluster(gceClusterConfig),
196+
masterConfig: formatClusterConfigInstanceGroup(masterConfig),
197+
workerConfig: formatClusterConfigInstanceGroup(workerConfig),
198+
secondaryWorkerConfig: formatClusterConfigInstanceGroup(secondaryWorkerConfig),
199+
softwareConfig: formatClusterConfigSoftware(softwareConfig),
200+
initializationActions: initializationActions?.map(formatClusterConfigNodeInitializationAction),
201+
encryptionConfigGcePdKmsKeyName: encryptionConfig?.gcePdKmsKeyName || '',
202+
autoscalingConfigPolicyUri: autoscalingConfig?.policyUri || '',
203+
securityConfig: formatClusterConfigSecurity(securityConfig),
204+
lifecycleConfig: formatClusterConfigLifecycle(lifecycleConfig),
205+
endpointConfig: formatClusterConfigEndpoint(endpointConfig),
206+
metastoreMetastoreServiceConfig: metastoreConfig?.dataprocMetastoreService || '',
207+
gkeClusterConfig: formatClusterConfigGke(gkeClusterConfig),
208+
}
209+
}
210+
211+
const formatClusterStatus = ({
212+
state,
213+
detail,
214+
stateStartTime,
215+
substate,
216+
}: google.cloud.dataproc.v1.IClusterStatus): GcpDataprocClusterStatus => {
217+
return {
218+
id: cuid(),
219+
state: enumKeyToString(google.cloud.dataproc.v1.ClusterStatus.State, state),
220+
detail,
221+
stateStartTime: toISOString(stateStartTime?.seconds?.toString()) || '',
222+
substate: enumKeyToString(google.cloud.dataproc.v1.ClusterStatus.Substate, substate),
223+
}
224+
}
225+
226+
export default ({
227+
service,
228+
region,
229+
}: {
230+
service: RawGcpDataprocCluster
231+
region: string
232+
}): GcpDataprocCluster => {
233+
const {
234+
id,
235+
projectId,
236+
clusterName,
237+
config = {},
238+
Labels,
239+
status = {},
240+
statusHistory = [],
241+
clusterUuid,
242+
metrics = {},
243+
} = service
244+
245+
return {
246+
id,
247+
projectId,
248+
region,
249+
name: clusterName,
250+
config: formatClusterConfig(config),
251+
labels: formatLabelsFromMap(Labels),
252+
status: formatClusterStatus(status),
253+
statusHistory: statusHistory?.map(history => formatClusterStatus(history)),
254+
clusterUuid,
255+
hdfsMetrics: formatKeyValueMap(metrics?.hdfsMetrics || {}),
256+
yarnMetrics: formatKeyValueMap(metrics?.yarnMetrics || {}),
257+
}
258+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import {Service} from '@cloudgraph/sdk'
2+
import BaseService from '../base'
3+
import format from './format'
4+
import getData from './data'
5+
import mutation from './mutation'
6+
7+
export default class GcpDataproc extends BaseService implements Service {
8+
format = format.bind(this)
9+
10+
getData = getData.bind(this)
11+
12+
mutation = mutation
13+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export default `mutation($input: [AddgcpDataprocClusterInput!]!) {
2+
addgcpDataprocCluster(input: $input, upsert: true) {
3+
numUids
4+
}
5+
}`;

0 commit comments

Comments
 (0)