Skip to content

Commit e81ad15

Browse files
committed
Merge main — take main's DatabricksTelemetryExporter, FeatureFlagCache, MetricsAggregator
Picks up #362 (shared connection stack + close() flush race fix) merged to main. Resolves add/add conflicts by preferring main's versions of the three infrastructure files, consistent with the prior merge strategy. Co-authored-by: samikshya-chand_data
2 parents 38e9d03 + 3a5e659 commit e81ad15

File tree

3 files changed

+27
-52
lines changed

3 files changed

+27
-52
lines changed

lib/telemetry/DatabricksTelemetryExporter.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import { v4 as uuidv4 } from 'uuid';
18-
import fetch, { RequestInit, Response } from 'node-fetch';
18+
import fetch, { RequestInit, Response, Request } from 'node-fetch';
1919
import IClientContext from '../contracts/IClientContext';
2020
import { LogLevel } from '../contracts/IDBSQLLogger';
2121
import IAuthentication from '../connection/contracts/IAuthentication';
@@ -304,7 +304,13 @@ export default class DatabricksTelemetryExporter {
304304
private async sendRequest(url: string, init: RequestInit): Promise<Response> {
305305
const connectionProvider = await this.context.getConnectionProvider();
306306
const agent = await connectionProvider.getAgent();
307-
return fetch(url, { ...init, agent });
307+
const retryPolicy = await connectionProvider.getRetryPolicy();
308+
const requestConfig: RequestInit = { agent, ...init };
309+
const result = await retryPolicy.invokeWithRetry(() => {
310+
const request = new Request(url, requestConfig);
311+
return fetch(request).then((response) => ({ request, response }));
312+
});
313+
return result.response;
308314
}
309315

310316
private toTelemetryLog(

lib/telemetry/FeatureFlagCache.ts

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
import fetch, { RequestInit, Response } from 'node-fetch';
17+
import fetch, { RequestInit, Response, Request } from 'node-fetch';
1818
import IClientContext from '../contracts/IClientContext';
1919
import { LogLevel } from '../contracts/IDBSQLLogger';
2020
import IAuthentication from '../connection/contracts/IAuthentication';
2121
import { buildTelemetryUrl, normalizeHeaders } from './telemetryUtils';
22-
import ExceptionClassifier from './ExceptionClassifier';
2322
import buildUserAgentString from '../utils/buildUserAgentString';
2423
import driverVersion from '../version';
2524

@@ -193,20 +192,13 @@ export default class FeatureFlagCache {
193192
private async fetchWithRetry(url: string, init: RequestInit): Promise<Response> {
194193
const connectionProvider = await this.context.getConnectionProvider();
195194
const agent = await connectionProvider.getAgent();
196-
const logger = this.context.getLogger();
197-
198-
try {
199-
return await fetch(url, { ...init, agent });
200-
} catch (err: any) {
201-
if (!ExceptionClassifier.isRetryable(err)) {
202-
throw err;
203-
}
204-
logger.log(LogLevel.debug, `Feature flag fetch retry after transient: ${err?.code ?? err?.message ?? err}`);
205-
await new Promise((resolve) => {
206-
setTimeout(resolve, 100 + Math.random() * 100);
207-
});
208-
return fetch(url, { ...init, agent });
209-
}
195+
const retryPolicy = await connectionProvider.getRetryPolicy();
196+
const requestConfig: RequestInit = { agent, ...init };
197+
const result = await retryPolicy.invokeWithRetry(() => {
198+
const request = new Request(url, requestConfig);
199+
return fetch(request).then((response) => ({ request, response }));
200+
});
201+
return result.response;
210202
}
211203

212204
private async getAuthHeaders(): Promise<Record<string, string>> {

lib/telemetry/MetricsAggregator.ts

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ export default class MetricsAggregator {
5151

5252
private closed = false;
5353

54+
private closing = false;
55+
5456
private batchSize: number;
5557

5658
private flushIntervalMs: number;
@@ -317,7 +319,7 @@ export default class MetricsAggregator {
317319
);
318320
}
319321

320-
if (this.pendingMetrics.length >= this.batchSize) {
322+
if (this.pendingMetrics.length >= this.batchSize && !this.closing) {
321323
// resetTimer=false so the periodic tail-drain keeps its cadence even
322324
// under sustained batch-size bursts.
323325
const logger = this.context.getLogger();
@@ -406,52 +408,27 @@ export default class MetricsAggregator {
406408

407409
async close(): Promise<void> {
408410
const logger = this.context.getLogger();
409-
this.closed = true;
410411

411412
try {
413+
// Suppress batch-triggered fire-and-forget flushes from addPendingMetric
414+
// so no promises escape past the single awaited flush below.
415+
this.closing = true;
416+
412417
if (this.flushTimer) {
413418
clearInterval(this.flushTimer);
414419
this.flushTimer = null;
415420
}
416421

417-
// Snapshot keys — completeStatement mutates statementMetrics.
422+
// closed is still false here so completeStatement → addPendingMetric works normally.
418423
const remainingStatements = [...this.statementMetrics.keys()];
419424
for (const statementId of remainingStatements) {
420-
this.completeStatementForClose(statementId);
425+
this.completeStatement(statementId);
421426
}
422427

423-
await this.flushForClose();
424-
425-
// Belt-and-braces: something the above awaited could in principle
426-
// have resurrected a timer. Clear once more.
427-
if (this.flushTimer) {
428-
clearInterval(this.flushTimer);
429-
this.flushTimer = null;
430-
}
428+
this.closed = true;
429+
await this.flush(false);
431430
} catch (error: any) {
432431
logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`);
433432
}
434433
}
435-
436-
/** completeStatement variant that bypasses the `closed` guard. */
437-
private completeStatementForClose(statementId: string): void {
438-
const prev = this.closed;
439-
this.closed = false;
440-
try {
441-
this.completeStatement(statementId);
442-
} finally {
443-
this.closed = prev;
444-
}
445-
}
446-
447-
/** flush variant that bypasses the `closed` guard on addPendingMetric. */
448-
private async flushForClose(): Promise<void> {
449-
const prev = this.closed;
450-
this.closed = false;
451-
try {
452-
await this.flush(false);
453-
} finally {
454-
this.closed = prev;
455-
}
456-
}
457434
}

0 commit comments

Comments
 (0)