Skip to content

Commit d80f13e

Browse files
committed
fix: route telemetry HTTP through shared connection stack; fix close() flush race
- DatabricksTelemetryExporter.sendRequest and FeatureFlagCache.fetchWithRetry now use connectionProvider.getRetryPolicy().invokeWithRetry(), matching the CloudFetchResultHandler pattern instead of bespoke fetch/retry logic - MetricsAggregator: add closing flag so batch-triggered fire-and-forget flushes are suppressed during close(), ensuring a single awaited flushForClose() drains all remaining metrics without racing past process.exit() Co-authored-by: samikshya-chand_data
1 parent 46e2d6e commit d80f13e

File tree

3 files changed

+22
-19
lines changed

3 files changed

+22
-19
lines changed

lib/telemetry/DatabricksTelemetryExporter.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import { v4 as uuidv4 } from 'uuid';
18-
import fetch, { RequestInit, Response } from 'node-fetch';
18+
import fetch, { RequestInit, Response, Request } from 'node-fetch';
1919
import IClientContext from '../contracts/IClientContext';
2020
import { LogLevel } from '../contracts/IDBSQLLogger';
2121
import IAuthentication from '../connection/contracts/IAuthentication';
@@ -304,7 +304,13 @@ export default class DatabricksTelemetryExporter {
304304
private async sendRequest(url: string, init: RequestInit): Promise<Response> {
305305
const connectionProvider = await this.context.getConnectionProvider();
306306
const agent = await connectionProvider.getAgent();
307-
return fetch(url, { ...init, agent });
307+
const retryPolicy = await connectionProvider.getRetryPolicy();
308+
const requestConfig: RequestInit = { agent, ...init };
309+
const result = await retryPolicy.invokeWithRetry(() => {
310+
const request = new Request(url, requestConfig);
311+
return fetch(request).then((response) => ({ request, response }));
312+
});
313+
return result.response;
308314
}
309315

310316
private toTelemetryLog(

lib/telemetry/FeatureFlagCache.ts

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
import fetch, { RequestInit, Response } from 'node-fetch';
17+
import fetch, { RequestInit, Response, Request } from 'node-fetch';
1818
import IClientContext from '../contracts/IClientContext';
1919
import { LogLevel } from '../contracts/IDBSQLLogger';
2020
import IAuthentication from '../connection/contracts/IAuthentication';
2121
import { buildTelemetryUrl, normalizeHeaders } from './telemetryUtils';
22-
import ExceptionClassifier from './ExceptionClassifier';
2322
import buildUserAgentString from '../utils/buildUserAgentString';
2423
import driverVersion from '../version';
2524

@@ -193,20 +192,13 @@ export default class FeatureFlagCache {
193192
private async fetchWithRetry(url: string, init: RequestInit): Promise<Response> {
194193
const connectionProvider = await this.context.getConnectionProvider();
195194
const agent = await connectionProvider.getAgent();
196-
const logger = this.context.getLogger();
197-
198-
try {
199-
return await fetch(url, { ...init, agent });
200-
} catch (err: any) {
201-
if (!ExceptionClassifier.isRetryable(err)) {
202-
throw err;
203-
}
204-
logger.log(LogLevel.debug, `Feature flag fetch retry after transient: ${err?.code ?? err?.message ?? err}`);
205-
await new Promise((resolve) => {
206-
setTimeout(resolve, 100 + Math.random() * 100);
207-
});
208-
return fetch(url, { ...init, agent });
209-
}
195+
const retryPolicy = await connectionProvider.getRetryPolicy();
196+
const requestConfig: RequestInit = { agent, ...init };
197+
const result = await retryPolicy.invokeWithRetry(() => {
198+
const request = new Request(url, requestConfig);
199+
return fetch(request).then((response) => ({ request, response }));
200+
});
201+
return result.response;
210202
}
211203

212204
private async getAuthHeaders(): Promise<Record<string, string>> {

lib/telemetry/MetricsAggregator.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ export default class MetricsAggregator {
5151

5252
private closed = false;
5353

54+
private closing = false;
55+
5456
private batchSize: number;
5557

5658
private flushIntervalMs: number;
@@ -317,9 +319,11 @@ export default class MetricsAggregator {
317319
);
318320
}
319321

320-
if (this.pendingMetrics.length >= this.batchSize) {
322+
if (this.pendingMetrics.length >= this.batchSize && !this.closing) {
321323
// resetTimer=false so the periodic tail-drain keeps its cadence even
322324
// under sustained batch-size bursts.
325+
// Suppressed during close() so fire-and-forget promises don't race past
326+
// the single awaited flushForClose().
323327
const logger = this.context.getLogger();
324328
Promise.resolve(this.flush(false)).catch((err: any) => {
325329
logger.log(LogLevel.debug, `Batch-trigger flush failed: ${err?.message ?? err}`);
@@ -406,6 +410,7 @@ export default class MetricsAggregator {
406410

407411
async close(): Promise<void> {
408412
const logger = this.context.getLogger();
413+
this.closing = true;
409414
this.closed = true;
410415

411416
try {

0 commit comments

Comments
 (0)