Skip to content

Commit 11860bd

Browse files
committed
add thrift protocol version handling
1 parent 056ed0b commit 11860bd

7 files changed

Lines changed: 415 additions & 21 deletions

File tree

lib/DBSQLClient.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
227227
const session = new DBSQLSession({
228228
handle: definedOrError(response.sessionHandle),
229229
context: this,
230+
serverProtocolVersion: response.serverProtocolVersion,
230231
});
231232
this.sessions.add(session);
232233
return session;

lib/DBSQLSession.ts

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import {
1212
TSparkDirectResults,
1313
TSparkArrowTypes,
1414
TSparkParameter,
15+
TProtocolVersion,
16+
TExecuteStatementReq,
1517
} from '../thrift/TCLIService_types';
1618
import IDBSQLSession, {
1719
ExecuteStatementOptions,
@@ -29,7 +31,7 @@ import IOperation from './contracts/IOperation';
2931
import DBSQLOperation from './DBSQLOperation';
3032
import Status from './dto/Status';
3133
import InfoValue from './dto/InfoValue';
32-
import { definedOrError, LZ4 } from './utils';
34+
import { definedOrError, LZ4, ProtocolVersion } from './utils';
3335
import CloseableCollection from './utils/CloseableCollection';
3436
import { LogLevel } from './contracts/IDBSQLLogger';
3537
import HiveDriverError from './errors/HiveDriverError';
@@ -74,13 +76,13 @@ function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undef
7476
};
7577
}
7678

77-
function getArrowOptions(config: ClientConfig): {
79+
function getArrowOptions(config: ClientConfig, serverProtocolVersion: TProtocolVersion | undefined | null): {
7880
canReadArrowResult: boolean;
7981
useArrowNativeTypes?: TSparkArrowTypes;
8082
} {
8183
const { arrowEnabled = true, useArrowNativeTypes = true } = config;
8284

83-
if (!arrowEnabled) {
85+
if (!arrowEnabled || !ProtocolVersion.supportsArrowMetadata(serverProtocolVersion)) {
8486
return {
8587
canReadArrowResult: false,
8688
};
@@ -136,6 +138,7 @@ function getQueryParameters(
136138
interface DBSQLSessionConstructorOptions {
137139
handle: TSessionHandle;
138140
context: IClientContext;
141+
serverProtocolVersion?: TProtocolVersion;
139142
}
140143

141144
export default class DBSQLSession implements IDBSQLSession {
@@ -145,14 +148,22 @@ export default class DBSQLSession implements IDBSQLSession {
145148

146149
private isOpen = true;
147150

151+
private serverProtocolVersion?: TProtocolVersion;
152+
148153
public onClose?: () => void;
149154

150155
private operations = new CloseableCollection<DBSQLOperation>();
151156

152-
constructor({ handle, context }: DBSQLSessionConstructorOptions) {
157+
constructor({ handle, context, serverProtocolVersion }: DBSQLSessionConstructorOptions) {
153158
this.sessionHandle = handle;
154159
this.context = context;
160+
// Get the server protocol version from the provided parameter (from TOpenSessionResp)
161+
// rather than from the handle
162+
this.serverProtocolVersion = serverProtocolVersion;
155163
this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.id}`);
164+
if (this.serverProtocolVersion) {
165+
this.context.getLogger().log(LogLevel.debug, `Server protocol version: ${this.serverProtocolVersion}`);
166+
}
156167
}
157168

158169
public get id() {
@@ -193,17 +204,29 @@ export default class DBSQLSession implements IDBSQLSession {
193204
await this.failIfClosed();
194205
const driver = await this.context.getDriver();
195206
const clientConfig = this.context.getConfig();
196-
const operationPromise = driver.executeStatement({
207+
208+
const request = new TExecuteStatementReq({
197209
sessionHandle: this.sessionHandle,
198210
statement,
199-
queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined,
211+
queryTimeout: options.queryTimeout?numberToInt64(options.queryTimeout):undefined,
200212
runAsync: true,
201213
...getDirectResultsOptions(options.maxRows, clientConfig),
202-
...getArrowOptions(clientConfig),
203-
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
204-
parameters: getQueryParameters(options.namedParameters, options.ordinalParameters),
205-
canDecompressLZ4Result: (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4),
214+
...getArrowOptions(clientConfig, this.serverProtocolVersion),
206215
});
216+
217+
if(ProtocolVersion.supportsParameterizedQueries(this.serverProtocolVersion)) {
218+
request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters);
219+
}
220+
221+
if(ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion)) {
222+
request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4);
223+
}
224+
225+
if(ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) {
226+
request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch;
227+
}
228+
229+
const operationPromise = driver.executeStatement(request);
207230
const response = await this.handleResponse(operationPromise);
208231
const operation = this.createOperation(response);
209232

@@ -352,9 +375,13 @@ export default class DBSQLSession implements IDBSQLSession {
352375
await this.failIfClosed();
353376
const driver = await this.context.getDriver();
354377
const clientConfig = this.context.getConfig();
378+
379+
// Set runAsync only if supported by protocol version
380+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
381+
355382
const operationPromise = driver.getTypeInfo({
356383
sessionHandle: this.sessionHandle,
357-
runAsync: true,
384+
runAsync,
358385
...getDirectResultsOptions(request.maxRows, clientConfig),
359386
});
360387
const response = await this.handleResponse(operationPromise);
@@ -371,9 +398,13 @@ export default class DBSQLSession implements IDBSQLSession {
371398
await this.failIfClosed();
372399
const driver = await this.context.getDriver();
373400
const clientConfig = this.context.getConfig();
401+
402+
// Set runAsync only if supported by protocol version
403+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
404+
374405
const operationPromise = driver.getCatalogs({
375406
sessionHandle: this.sessionHandle,
376-
runAsync: true,
407+
runAsync,
377408
...getDirectResultsOptions(request.maxRows, clientConfig),
378409
});
379410
const response = await this.handleResponse(operationPromise);
@@ -390,11 +421,15 @@ export default class DBSQLSession implements IDBSQLSession {
390421
await this.failIfClosed();
391422
const driver = await this.context.getDriver();
392423
const clientConfig = this.context.getConfig();
424+
425+
// Set runAsync only if supported by protocol version
426+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
427+
393428
const operationPromise = driver.getSchemas({
394429
sessionHandle: this.sessionHandle,
395430
catalogName: request.catalogName,
396431
schemaName: request.schemaName,
397-
runAsync: true,
432+
runAsync,
398433
...getDirectResultsOptions(request.maxRows, clientConfig),
399434
});
400435
const response = await this.handleResponse(operationPromise);
@@ -411,13 +446,17 @@ export default class DBSQLSession implements IDBSQLSession {
411446
await this.failIfClosed();
412447
const driver = await this.context.getDriver();
413448
const clientConfig = this.context.getConfig();
449+
450+
// Set runAsync only if supported by protocol version
451+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
452+
414453
const operationPromise = driver.getTables({
415454
sessionHandle: this.sessionHandle,
416455
catalogName: request.catalogName,
417456
schemaName: request.schemaName,
418457
tableName: request.tableName,
419458
tableTypes: request.tableTypes,
420-
runAsync: true,
459+
runAsync,
421460
...getDirectResultsOptions(request.maxRows, clientConfig),
422461
});
423462
const response = await this.handleResponse(operationPromise);
@@ -434,9 +473,13 @@ export default class DBSQLSession implements IDBSQLSession {
434473
await this.failIfClosed();
435474
const driver = await this.context.getDriver();
436475
const clientConfig = this.context.getConfig();
476+
477+
// Set runAsync only if supported by protocol version
478+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
479+
437480
const operationPromise = driver.getTableTypes({
438481
sessionHandle: this.sessionHandle,
439-
runAsync: true,
482+
runAsync,
440483
...getDirectResultsOptions(request.maxRows, clientConfig),
441484
});
442485
const response = await this.handleResponse(operationPromise);
@@ -453,13 +496,17 @@ export default class DBSQLSession implements IDBSQLSession {
453496
await this.failIfClosed();
454497
const driver = await this.context.getDriver();
455498
const clientConfig = this.context.getConfig();
499+
500+
// Set runAsync only if supported by protocol version
501+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
502+
456503
const operationPromise = driver.getColumns({
457504
sessionHandle: this.sessionHandle,
458505
catalogName: request.catalogName,
459506
schemaName: request.schemaName,
460507
tableName: request.tableName,
461508
columnName: request.columnName,
462-
runAsync: true,
509+
runAsync,
463510
...getDirectResultsOptions(request.maxRows, clientConfig),
464511
});
465512
const response = await this.handleResponse(operationPromise);
@@ -476,12 +523,16 @@ export default class DBSQLSession implements IDBSQLSession {
476523
await this.failIfClosed();
477524
const driver = await this.context.getDriver();
478525
const clientConfig = this.context.getConfig();
526+
527+
// Set runAsync only if supported by protocol version
528+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
529+
479530
const operationPromise = driver.getFunctions({
480531
sessionHandle: this.sessionHandle,
481532
catalogName: request.catalogName,
482533
schemaName: request.schemaName,
483534
functionName: request.functionName,
484-
runAsync: true,
535+
runAsync,
485536
...getDirectResultsOptions(request.maxRows, clientConfig),
486537
});
487538
const response = await this.handleResponse(operationPromise);
@@ -492,12 +543,16 @@ export default class DBSQLSession implements IDBSQLSession {
492543
await this.failIfClosed();
493544
const driver = await this.context.getDriver();
494545
const clientConfig = this.context.getConfig();
546+
547+
// Set runAsync only if supported by protocol version
548+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
549+
495550
const operationPromise = driver.getPrimaryKeys({
496551
sessionHandle: this.sessionHandle,
497552
catalogName: request.catalogName,
498553
schemaName: request.schemaName,
499554
tableName: request.tableName,
500-
runAsync: true,
555+
runAsync,
501556
...getDirectResultsOptions(request.maxRows, clientConfig),
502557
});
503558
const response = await this.handleResponse(operationPromise);
@@ -514,6 +569,10 @@ export default class DBSQLSession implements IDBSQLSession {
514569
await this.failIfClosed();
515570
const driver = await this.context.getDriver();
516571
const clientConfig = this.context.getConfig();
572+
573+
// Set runAsync only if supported by protocol version
574+
const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
575+
517576
const operationPromise = driver.getCrossReference({
518577
sessionHandle: this.sessionHandle,
519578
parentCatalogName: request.parentCatalogName,
@@ -522,7 +581,7 @@ export default class DBSQLSession implements IDBSQLSession {
522581
foreignCatalogName: request.foreignCatalogName,
523582
foreignSchemaName: request.foreignSchemaName,
524583
foreignTableName: request.foreignTableName,
525-
runAsync: true,
584+
runAsync,
526585
...getDirectResultsOptions(request.maxRows, clientConfig),
527586
});
528587
const response = await this.handleResponse(operationPromise);

lib/utils/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ import definedOrError from './definedOrError';
22
import buildUserAgentString from './buildUserAgentString';
33
import formatProgress, { ProgressUpdateTransformer } from './formatProgress';
44
import LZ4 from './lz4';
5+
import * as ProtocolVersion from './protocolVersion';
56

6-
export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4 };
7+
export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4, ProtocolVersion };

lib/utils/protocolVersion.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import { TProtocolVersion } from '../../thrift/TCLIService_types';
2+
3+
/**
4+
* Protocol version information from Thrift TCLIService
5+
* Each version adds certain features to the Spark/Hive API
6+
*
7+
* Databricks only supports SPARK_CLI_SERVICE_PROTOCOL_V1 (0xA501) or higher
8+
*/
9+
10+
/**
11+
* Check if the current protocol version supports a specific feature
12+
* @param serverProtocolVersion The protocol version received from server in TOpenSessionResp
13+
* @param requiredVersion The minimum protocol version required for a feature
14+
* @returns boolean indicating if the feature is supported
15+
*/
16+
export function isFeatureSupported(
17+
serverProtocolVersion: TProtocolVersion | undefined | null,
18+
requiredVersion: TProtocolVersion
19+
): boolean {
20+
if (serverProtocolVersion === undefined || serverProtocolVersion === null) {
21+
return false;
22+
}
23+
24+
return serverProtocolVersion >= requiredVersion;
25+
}
26+
27+
/**
28+
* Check if parameterized queries are supported
29+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V8 or higher)
30+
* @param serverProtocolVersion The protocol version from server
31+
* @returns boolean indicating if parameterized queries are supported
32+
*/
33+
export function supportsParameterizedQueries(
34+
serverProtocolVersion: TProtocolVersion | undefined | null
35+
): boolean {
36+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8);
37+
}
38+
39+
/**
40+
* Check if async metadata operations are supported
41+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V6 or higher)
42+
* @param serverProtocolVersion The protocol version from server
43+
* @returns boolean indicating if async metadata operations are supported
44+
*/
45+
export function supportsAsyncMetadataOperations(
46+
serverProtocolVersion: TProtocolVersion | undefined | null
47+
): boolean {
48+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6);
49+
}
50+
51+
/**
52+
* Check if result persistence mode is supported
53+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V7 or higher)
54+
* @param serverProtocolVersion The protocol version from server
55+
* @returns boolean indicating if result persistence mode is supported
56+
*/
57+
export function supportsResultPersistenceMode(
58+
serverProtocolVersion: TProtocolVersion | undefined | null
59+
): boolean {
60+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7);
61+
}
62+
63+
/**
64+
* Check if Arrow compression is supported
65+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V6 or higher)
66+
* @param serverProtocolVersion The protocol version from server
67+
* @returns boolean indicating if compressed Arrow batches are supported
68+
*/
69+
export function supportsArrowCompression(
70+
serverProtocolVersion: TProtocolVersion | undefined | null
71+
): boolean {
72+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6);
73+
}
74+
75+
/**
76+
* Check if Arrow metadata is supported
77+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V5 or higher)
78+
* @param serverProtocolVersion The protocol version from server
79+
* @returns boolean indicating if Arrow metadata is supported
80+
*/
81+
export function supportsArrowMetadata(
82+
serverProtocolVersion: TProtocolVersion | undefined | null
83+
): boolean {
84+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5);
85+
}
86+
87+
/**
88+
* Check if multiple catalogs are supported
89+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V4 or higher)
90+
* @param serverProtocolVersion The protocol version from server
91+
* @returns boolean indicating if multiple catalogs are supported
92+
*/
93+
export function supportsMultipleCatalogs(
94+
serverProtocolVersion: TProtocolVersion | undefined | null
95+
): boolean {
96+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V4);
97+
}
98+
99+
/**
100+
* Check if cloud object storage fetching is supported
101+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V3 or higher)
102+
* @param serverProtocolVersion The protocol version from server
103+
* @returns boolean indicating if cloud fetching is supported
104+
*/
105+
export function supportsCloudFetch(
106+
serverProtocolVersion: TProtocolVersion | undefined | null
107+
): boolean {
108+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V3);
109+
}

0 commit comments

Comments
 (0)