-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathlineage.ts
More file actions
129 lines (112 loc) · 3.42 KB
/
lineage.ts
File metadata and controls
129 lines (112 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import { isFalse, isNil } from '@/utils/index'
import { type Lineage } from '@/domain/lineage'
import type { ModelEncodedFQN } from '@/domain/models'
import {
toID,
type NodeId,
type LineageWorkerMessage,
type LineageWorkerRequestMessage,
type LineageWorkerResponseMessage,
type LineageWorkerErrorMessage,
type ConnectedNode,
} from '@/components/graph/types'
import type { Direction } from '../types'
interface WorkerScope {
onmessage: ((e: MessageEvent<LineageWorkerMessage>) => void) | null
postMessage: (message: LineageWorkerMessage) => void
}
const scope = self as unknown as WorkerScope
scope.onmessage = async (e: MessageEvent<LineageWorkerMessage>) => {
if (e.data.topic === 'lineage') {
try {
const message = e.data as LineageWorkerRequestMessage
const { currentLineage, newLineage, mainNode } = message.payload
const lineage = await mergeLineageWithModels(currentLineage, newLineage)
const nodesConnections = await getNodesConnections(mainNode, lineage)
const responseMessage: LineageWorkerResponseMessage = {
topic: 'lineage',
payload: {
lineage,
nodesConnections,
},
}
scope.postMessage(responseMessage)
} catch (error) {
const errorMessage: LineageWorkerErrorMessage = {
topic: 'error',
error: error as Error,
}
scope.postMessage(errorMessage)
}
}
}
async function mergeLineageWithModels(
currentLineage: Record<string, Lineage> = {},
data: Record<string, string[]> = {},
): Promise<Record<string, Lineage>> {
return Object.entries(data).reduce(
(acc: Record<string, Lineage>, [key, models = []]) => {
key = encodeURI(key)
acc[key] = {
models: models.map(encodeURI) as ModelEncodedFQN[],
columns: currentLineage?.[key]?.columns ?? undefined,
}
return acc
},
{},
)
}
async function getNodesConnections(
mainNode: string,
lineage: Record<string, Lineage> = {},
): Promise<Record<string, ConnectedNode>> {
return new Promise((resolve, reject) => {
if (isNil(lineage) || isNil(mainNode)) return {}
const distances: Record<string, ConnectedNode> = {}
try {
getConnectedNodes('upstream', mainNode, lineage, distances)
getConnectedNodes('downstream', mainNode, lineage, distances)
} catch (error) {
reject(error)
}
resolve(distances)
})
}
function getConnectedNodes(
direction: Direction = 'downstream',
node: string,
lineage: Record<string, Lineage> = {},
result: Record<string, ConnectedNode> = {},
): void {
const isDownstream = direction === 'downstream'
let models: string[] = []
if (isDownstream) {
models = Object.keys(lineage).filter(key =>
lineage[key]!.models.includes(node as ModelEncodedFQN),
)
} else {
models = lineage[node]?.models ?? []
}
if (isFalse(node in result)) {
result[node] = { edges: [] }
}
for (const model of models) {
const connectedNode = isDownstream
? createConnectedNode(node, model, [result[node]!])
: createConnectedNode(model, node, [result[node]!])
if (model in result) {
result[model]!.edges.push(connectedNode)
} else {
result[model] = connectedNode
getConnectedNodes(direction, model, lineage, result)
}
}
}
function createConnectedNode(
source: NodeId,
target: NodeId,
edges: ConnectedNode[] = [],
): ConnectedNode {
const id = toID(source, target)
return { id, edges }
}