Skip to content

Commit f265368

Browse files
[PECO-238] Initial CloudFetch implementation (#158)
* [PECO-238] Initial CloudFetch implementation Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Fix tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Handle expired links Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Improve cloud fetch result handler: better performance and fixed memory consumption issues Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Extract some cloudfetch settings to global config; add e2e tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add unit tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Make e2e tests more stable Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Increase Nodejs heap size for E2E tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 6de01cb commit f265368

18 files changed

Lines changed: 712 additions & 47 deletions

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ jobs:
5959
E2E_TABLE_SUFFIX: ${{github.sha}}
6060
run: |
6161
npm ci
62-
npm run e2e
62+
NODE_OPTIONS="--max-old-space-size=4096" npm run e2e
6363
- name: Coverage
6464
uses: codecov/codecov-action@v3
6565
with:

lib/DBSQLOperation/SchemaHelper.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import Status from '../dto/Status';
44
import IOperationResult from '../result/IOperationResult';
55
import JsonResult from '../result/JsonResult';
66
import ArrowResult from '../result/ArrowResult';
7+
import CloudFetchResult from '../result/CloudFetchResult';
78
import HiveDriverError from '../errors/HiveDriverError';
89
import { definedOrError } from '../utils';
910

@@ -14,6 +15,8 @@ export default class SchemaHelper {
1415

1516
private metadata?: TGetResultSetMetadataResp;
1617

18+
private resultHandler?: IOperationResult;
19+
1720
constructor(driver: HiveDriver, operationHandle: TOperationHandle, metadata?: TGetResultSetMetadataResp) {
1821
this.driver = driver;
1922
this.operationHandle = operationHandle;
@@ -41,13 +44,27 @@ export default class SchemaHelper {
4144
const metadata = await this.fetchMetadata();
4245
const resultFormat = definedOrError(metadata.resultFormat);
4346

44-
switch (resultFormat) {
45-
case TSparkRowSetType.COLUMN_BASED_SET:
46-
return new JsonResult(metadata.schema);
47-
case TSparkRowSetType.ARROW_BASED_SET:
48-
return new ArrowResult(metadata.schema, metadata.arrowSchema);
49-
default:
50-
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
47+
if (!this.resultHandler) {
48+
switch (resultFormat) {
49+
case TSparkRowSetType.COLUMN_BASED_SET:
50+
this.resultHandler = new JsonResult(metadata.schema);
51+
break;
52+
case TSparkRowSetType.ARROW_BASED_SET:
53+
this.resultHandler = new ArrowResult(metadata.schema, metadata.arrowSchema);
54+
break;
55+
case TSparkRowSetType.URL_BASED_SET:
56+
this.resultHandler = new CloudFetchResult(metadata.schema);
57+
break;
58+
default:
59+
this.resultHandler = undefined;
60+
break;
61+
}
62+
}
63+
64+
if (!this.resultHandler) {
65+
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
5166
}
67+
68+
return this.resultHandler;
5269
}
5370
}

lib/DBSQLOperation/index.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export default class DBSQLOperation implements IOperation {
106106
this._data.fetch(options?.maxRows || defaultMaxRows),
107107
]);
108108

109-
const result = resultHandler.getValue(data ? [data] : []);
109+
const result = await resultHandler.getValue(data ? [data] : []);
110110
this.logger?.log(
111111
LogLevel.debug,
112112
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.getId()}`,
@@ -147,10 +147,19 @@ export default class DBSQLOperation implements IOperation {
147147
}
148148

149149
public async hasMoreRows(): Promise<boolean> {
150+
// If operation is closed or cancelled - we should not try to get data from it
150151
if (this._completeOperation.closed || this._completeOperation.cancelled) {
151152
return false;
152153
}
153-
return this._data.hasMoreRows;
154+
155+
// Return early if there are still data available for fetching
156+
if (this._data.hasMoreRows) {
157+
return true;
158+
}
159+
160+
// If we fetched all the data from server - check if there's anything buffered in result handler
161+
const resultHandler = await this._schema.getResultHandler();
162+
return resultHandler.hasPendingData();
154163
}
155164

156165
public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {

lib/DBSQLSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export default class DBSQLSession implements IDBSQLSession {
125125
runAsync: options.runAsync || false,
126126
...getDirectResultsOptions(options.maxRows),
127127
...getArrowOptions(),
128+
canDownloadResult: options.useCloudFetch ?? globalConfig.useCloudFetch,
128129
});
129130

130131
return this.createOperation(response);

lib/contracts/IDBSQLSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export type ExecuteStatementOptions = {
77
queryTimeout?: Int64;
88
runAsync?: boolean;
99
maxRows?: number | null;
10+
useCloudFetch?: boolean;
1011
};
1112

1213
export type TypeInfoRequest = {

lib/globalConfig.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ interface GlobalConfig {
77
retriesTimeout: number; // in milliseconds
88
retryDelayMin: number; // in milliseconds
99
retryDelayMax: number; // in milliseconds
10+
11+
useCloudFetch: boolean;
12+
cloudFetchConcurrentDownloads: number;
1013
}
1114

1215
export default {
@@ -18,4 +21,7 @@ export default {
1821
retriesTimeout: 900 * 1000,
1922
retryDelayMin: 1 * 1000,
2023
retryDelayMax: 60 * 1000,
24+
25+
useCloudFetch: false,
26+
cloudFetchConcurrentDownloads: 10,
2127
} satisfies GlobalConfig;

lib/result/ArrowResult.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@ export default class ArrowResult implements IOperationResult {
3030
this.arrowSchema = arrowSchema;
3131
}
3232

33-
getValue(data?: Array<TRowSet>) {
33+
async hasPendingData() {
34+
return false;
35+
}
36+
37+
async getValue(data?: Array<TRowSet>) {
3438
if (this.schema.length === 0 || !this.arrowSchema || !data) {
3539
return [];
3640
}
3741

38-
const batches = this.getBatches(data);
42+
const batches = await this.getBatches(data);
3943
if (batches.length === 0) {
4044
return [];
4145
}
@@ -44,7 +48,7 @@ export default class ArrowResult implements IOperationResult {
4448
return this.getRows(table.schema, table.toArray());
4549
}
4650

47-
private getBatches(data: Array<TRowSet>): Array<Buffer> {
51+
protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
4852
const result: Array<Buffer> = [];
4953

5054
data.forEach((rowSet) => {

lib/result/CloudFetchResult.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { Buffer } from 'buffer';
2+
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
3+
import { TRowSet, TSparkArrowResultLink, TTableSchema } from '../../thrift/TCLIService_types';
4+
import ArrowResult from './ArrowResult';
5+
import globalConfig from '../globalConfig';
6+
7+
export default class CloudFetchResult extends ArrowResult {
8+
private pendingLinks: Array<TSparkArrowResultLink> = [];
9+
10+
private downloadedBatches: Array<Buffer> = [];
11+
12+
constructor(schema?: TTableSchema) {
13+
// Arrow schema returned in metadata is not needed for CloudFetch results:
14+
// each batch already contains schema and could be decoded as is
15+
super(schema, Buffer.alloc(0));
16+
}
17+
18+
async hasPendingData() {
19+
return this.pendingLinks.length > 0 || this.downloadedBatches.length > 0;
20+
}
21+
22+
protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
23+
data.forEach((item) => {
24+
item.resultLinks?.forEach((link) => {
25+
this.pendingLinks.push(link);
26+
});
27+
});
28+
29+
if (this.downloadedBatches.length === 0) {
30+
const links = this.pendingLinks.splice(0, globalConfig.cloudFetchConcurrentDownloads);
31+
const tasks = links.map((link) => this.downloadLink(link));
32+
const batches = await Promise.all(tasks);
33+
this.downloadedBatches.push(...batches);
34+
}
35+
36+
return this.downloadedBatches.splice(0, 1);
37+
}
38+
39+
private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
40+
if (Date.now() >= link.expiryTime.toNumber()) {
41+
throw new Error('CloudFetch link has expired');
42+
}
43+
44+
const response = await this.fetch(link.fileLink);
45+
if (!response.ok) {
46+
throw new Error(`CloudFetch HTTP error ${response.status} ${response.statusText}`);
47+
}
48+
49+
const result = await response.arrayBuffer();
50+
return Buffer.from(result);
51+
}
52+
53+
private async fetch(url: RequestInfo, init?: RequestInit) {
54+
return fetch(url, init);
55+
}
56+
}

lib/result/IOperationResult.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { TRowSet } from '../../thrift/TCLIService_types';
22

33
export default interface IOperationResult {
4-
getValue(data?: Array<TRowSet>): any;
4+
getValue(data?: Array<TRowSet>): Promise<any>;
5+
6+
hasPendingData(): Promise<boolean>;
57
}

lib/result/JsonResult.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ export default class JsonResult implements IOperationResult {
1010
this.schema = getSchemaColumns(schema);
1111
}
1212

13-
getValue(data?: Array<TRowSet>): Array<object> {
13+
async hasPendingData() {
14+
return false;
15+
}
16+
17+
async getValue(data?: Array<TRowSet>): Promise<Array<object>> {
1418
if (this.schema.length === 0 || !data) {
1519
return [];
1620
}

0 commit comments

Comments
 (0)