Skip to content

Commit 79c1d31

Browse files
committed
Address PR #325 review feedback
- Fix CircuitBreaker HALF_OPEN: any failure now immediately reopens the circuit instead of accumulating to failureThreshold, aligning with Resilience4j behavior used in the JDBC driver - Add maxPendingMetrics bound to MetricsAggregator (default 500) to prevent unbounded buffer growth when exports keep failing - Inline buildUrl into DatabricksTelemetryExporter and FeatureFlagCache; remove urlUtils.ts (single-use utility) - Add missing unit tests for DatabricksTelemetryExporter, MetricsAggregator, and TelemetryEventEmitter (HTTP calls, retry logic, batching, flush timers, event routing) Co-authored-by: Isaac
1 parent f531391 commit 79c1d31

File tree

11 files changed

+806
-51
lines changed

11 files changed

+806
-51
lines changed

lib/contracts/IClientContext.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export interface ClientConfig {
3232
telemetryAuthenticatedExport?: boolean;
3333
telemetryCircuitBreakerThreshold?: number;
3434
telemetryCircuitBreakerTimeout?: number;
35+
telemetryMaxPendingMetrics?: number;
3536
}
3637

3738
export default interface IClientContext {

lib/telemetry/CircuitBreaker.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,9 @@ export class CircuitBreaker {
166166

167167
logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`);
168168

169-
if (this.failureCount >= this.config.failureThreshold) {
170-
// Transition to OPEN
169+
// In HALF_OPEN state, any failure immediately reopens the circuit.
170+
// In CLOSED state, reopen only after failureThreshold consecutive failures.
171+
if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) {
171172
this.state = CircuitBreakerState.OPEN;
172173
this.nextAttempt = new Date(Date.now() + this.config.timeout);
173174
logger.log(LogLevel.debug, `Circuit breaker transitioned to OPEN (will retry after ${this.config.timeout}ms)`);

lib/telemetry/DatabricksTelemetryExporter.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import { LogLevel } from '../contracts/IDBSQLLogger';
2020
import { TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types';
2121
import { CircuitBreakerRegistry } from './CircuitBreaker';
2222
import ExceptionClassifier from './ExceptionClassifier';
23-
import { buildUrl } from './urlUtils';
2423

2524
/**
2625
* Databricks telemetry log format for export.
@@ -210,8 +209,8 @@ export default class DatabricksTelemetryExporter {
210209
// Determine endpoint based on authentication mode
211210
const authenticatedExport = config.telemetryAuthenticatedExport ?? DEFAULT_TELEMETRY_CONFIG.authenticatedExport;
212211
const endpoint = authenticatedExport
213-
? buildUrl(this.host, '/telemetry-ext')
214-
: buildUrl(this.host, '/telemetry-unauth');
212+
? this.buildUrl(this.host, '/telemetry-ext')
213+
: this.buildUrl(this.host, '/telemetry-unauth');
215214

216215
// Format payload - each log is JSON-stringified to match JDBC format
217216
const telemetryLogs = metrics.map((m) => this.toTelemetryLog(m));
@@ -319,6 +318,16 @@ export default class DatabricksTelemetryExporter {
319318
return log;
320319
}
321320

321+
/**
322+
* Build full URL from host and path, handling protocol correctly.
323+
*/
324+
private buildUrl(host: string, path: string): string {
325+
if (host.startsWith('http://') || host.startsWith('https://')) {
326+
return `${host}${path}`;
327+
}
328+
return `https://${host}${path}`;
329+
}
330+
322331
/**
323332
* Generate a UUID v4.
324333
*/

lib/telemetry/FeatureFlagCache.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import fetch from 'node-fetch';
1818
import IClientContext from '../contracts/IClientContext';
1919
import { LogLevel } from '../contracts/IDBSQLLogger';
2020
import driverVersion from '../version';
21-
import { buildUrl } from './urlUtils';
2221

2322
/**
2423
* Context holding feature flag state for a specific host.
@@ -120,7 +119,7 @@ export default class FeatureFlagCache {
120119
const version = this.getDriverVersion();
121120

122121
// Build feature flags endpoint for Node.js driver
123-
const endpoint = buildUrl(host, `/api/2.0/connector-service/feature-flags/NODEJS/${version}`);
122+
const endpoint = this.buildUrl(host, `/api/2.0/connector-service/feature-flags/NODEJS/${version}`);
124123

125124
// Get authentication headers
126125
const authHeaders = await this.context.getAuthHeaders();
@@ -181,6 +180,16 @@ export default class FeatureFlagCache {
181180
}
182181
}
183182

183+
/**
184+
* Build full URL from host and path, handling protocol correctly.
185+
*/
186+
private buildUrl(host: string, path: string): string {
187+
if (host.startsWith('http://') || host.startsWith('https://')) {
188+
return `${host}${path}`;
189+
}
190+
return `https://${host}${path}`;
191+
}
192+
184193
/**
185194
* Gets the driver version without -oss suffix for API calls.
186195
* Format: "1.12.0" from "1.12.0-oss"

lib/telemetry/MetricsAggregator.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,14 @@ export default class MetricsAggregator {
6464

6565
private flushIntervalMs: number;
6666

67+
private maxPendingMetrics: number;
68+
6769
constructor(private context: IClientContext, private exporter: DatabricksTelemetryExporter) {
6870
try {
6971
const config = context.getConfig();
7072
this.batchSize = config.telemetryBatchSize ?? DEFAULT_TELEMETRY_CONFIG.batchSize;
7173
this.flushIntervalMs = config.telemetryFlushIntervalMs ?? DEFAULT_TELEMETRY_CONFIG.flushIntervalMs;
74+
this.maxPendingMetrics = config.telemetryMaxPendingMetrics ?? DEFAULT_TELEMETRY_CONFIG.maxPendingMetrics;
7275

7376
// Start periodic flush timer
7477
this.startFlushTimer();
@@ -80,6 +83,7 @@ export default class MetricsAggregator {
8083
// Initialize with default values
8184
this.batchSize = DEFAULT_TELEMETRY_CONFIG.batchSize;
8285
this.flushIntervalMs = DEFAULT_TELEMETRY_CONFIG.flushIntervalMs;
86+
this.maxPendingMetrics = DEFAULT_TELEMETRY_CONFIG.maxPendingMetrics;
8387
}
8488
}
8589

@@ -283,11 +287,21 @@ export default class MetricsAggregator {
283287
}
284288

285289
/**
286-
* Add a metric to pending batch and flush if batch size reached
290+
* Add a metric to pending batch and flush if batch size reached.
291+
* Drops oldest metrics if the buffer exceeds maxPendingMetrics to prevent
292+
* unbounded growth when the circuit breaker keeps failing exports.
287293
*/
288294
private addPendingMetric(metric: TelemetryMetric): void {
289295
this.pendingMetrics.push(metric);
290296

297+
// Cap the buffer to avoid unbounded memory growth when exports keep failing
298+
if (this.pendingMetrics.length > this.maxPendingMetrics) {
299+
const dropped = this.pendingMetrics.length - this.maxPendingMetrics;
300+
this.pendingMetrics = this.pendingMetrics.slice(dropped);
301+
const logger = this.context.getLogger();
302+
logger.log(LogLevel.debug, `Dropped ${dropped} oldest telemetry metrics (buffer full at ${this.maxPendingMetrics})`);
303+
}
304+
291305
// Check if batch size reached
292306
if (this.pendingMetrics.length >= this.batchSize) {
293307
this.flush();

lib/telemetry/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ export interface TelemetryConfiguration {
5454

5555
/** Circuit breaker timeout in milliseconds */
5656
circuitBreakerTimeout?: number;
57+
58+
/** Maximum number of pending metrics buffered before dropping oldest (prevents unbounded growth when export keeps failing) */
59+
maxPendingMetrics?: number;
5760
}
5861

5962
/**
@@ -67,6 +70,7 @@ export const DEFAULT_TELEMETRY_CONFIG: Required<TelemetryConfiguration> = {
6770
authenticatedExport: true,
6871
circuitBreakerThreshold: 5,
6972
circuitBreakerTimeout: 60000, // 1 minute
73+
maxPendingMetrics: 500,
7074
};
7175

7276
/**

lib/telemetry/urlUtils.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

tests/unit/telemetry/CircuitBreaker.test.ts

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -367,42 +367,39 @@ describe('CircuitBreaker', () => {
367367
expect(breaker.getState()).to.equal(CircuitBreakerState.CLOSED);
368368
});
369369

370-
it('should reopen if operation fails in HALF_OPEN state', async () => {
370+
it('should reopen immediately if operation fails in HALF_OPEN state', async () => {
371371
const context = new ClientContextStub();
372372
const breaker = new CircuitBreaker(context);
373373

374374
await openAndWaitForHalfOpen(breaker);
375375

376-
// First success
376+
// First success moves to HALF_OPEN
377377
const successOp = sinon.stub().resolves('success');
378378
await breaker.execute(successOp);
379379
expect(breaker.getState()).to.equal(CircuitBreakerState.HALF_OPEN);
380380
expect(breaker.getSuccessCount()).to.equal(1);
381381

382-
// Failure should reset success count but not immediately open
382+
// Any failure in HALF_OPEN immediately reopens the circuit
383383
const failOp = sinon.stub().rejects(new Error('Failed'));
384384
try {
385385
await breaker.execute(failOp);
386386
} catch {}
387387

388388
expect(breaker.getSuccessCount()).to.equal(0); // Reset
389-
expect(breaker.getFailureCount()).to.equal(1);
390-
expect(breaker.getState()).to.equal(CircuitBreakerState.HALF_OPEN);
389+
expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN);
391390
});
392391

393-
it('should track failures and eventually reopen circuit', async () => {
392+
it('should reopen immediately on first failure in HALF_OPEN state', async () => {
394393
const context = new ClientContextStub();
395394
const breaker = new CircuitBreaker(context);
396395

397396
await openAndWaitForHalfOpen(breaker);
398397

399-
// Now in HALF_OPEN, fail 5 times to reopen
398+
// A single failure in HALF_OPEN reopens immediately (not after threshold)
400399
const failOp = sinon.stub().rejects(new Error('Failed'));
401-
for (let i = 0; i < 5; i++) {
402-
try {
403-
await breaker.execute(failOp);
404-
} catch {}
405-
}
400+
try {
401+
await breaker.execute(failOp);
402+
} catch {}
406403

407404
expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN);
408405
});

0 commit comments

Comments
 (0)