Skip to content

Commit 46e2d6e

Browse files
samikshya-dbclaude
andauthored
[2/7] Telemetry Infrastructure: CircuitBreaker and FeatureFlagCache (#325)
* Add telemetry infrastructure: CircuitBreaker and FeatureFlagCache This is part 2 of 7 in the telemetry implementation stack. Components: - CircuitBreaker: Per-host endpoint protection with state management - FeatureFlagCache: Per-host feature flag caching with reference counting - CircuitBreakerRegistry: Manages circuit breakers per host Circuit Breaker: - States: CLOSED (normal), OPEN (failing), HALF_OPEN (testing recovery) - Default: 5 failures trigger OPEN, 60s timeout, 2 successes to CLOSE - Per-host isolation prevents cascade failures - All state transitions logged at debug level Feature Flag Cache: - Per-host caching with 15-minute TTL - Reference counting for connection lifecycle management - Automatic cache expiration and refetch - Context removed when refCount reaches zero Testing: - 32 comprehensive unit tests for CircuitBreaker - 29 comprehensive unit tests for FeatureFlagCache - 100% function coverage, >80% line/branch coverage - CircuitBreakerStub for testing other components Dependencies: - Builds on [1/7] Types and Exception Classifier * Add authentication support for REST API calls Implements getAuthHeaders() method for authenticated REST API requests: - Added getAuthHeaders() to IClientContext interface - Implemented in DBSQLClient using authProvider.authenticate() - Updated FeatureFlagCache to fetch from connector-service API with auth - Added driver version support for version-specific feature flags - Replaced placeholder implementation with actual REST API calls Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Fix feature flag and telemetry export endpoints - Change feature flag endpoint to use NODEJS client type - Fix telemetry endpoints to /telemetry-ext and /telemetry-unauth - Update payload to match proto with system_configuration - Add shared buildUrl utility for protocol handling * Match JDBC telemetry payload format - Change payload structure to match JDBC: uploadTime, items, protoLogs - protoLogs contains JSON-stringified TelemetryFrontendLog objects - Remove workspace_id (JDBC doesn't populate it) - Remove debug logs added during testing * Fix lint errors - Fix import order in FeatureFlagCache - Replace require() with import for driverVersion - Fix variable shadowing - Disable prefer-default-export for urlUtils * Add missing getAuthHeaders method to ClientContextStub Fix TypeScript compilation error by implementing getAuthHeaders method required by IClientContext interface. * Fix prettier formatting * Add DRIVER_NAME constant for nodejs-sql-driver * Add missing telemetry fields to match JDBC Added osArch, runtimeVendor, localeName, charSetEncoding, and processName fields to DriverConfiguration to match JDBC implementation. * Fix TypeScript compilation: add missing fields to system_configuration interface * Fix telemetry PR review comments from #325 Three fixes addressing review feedback: 1. Fix documentation typo (sreekanth-db comment) - DatabricksTelemetryExporter.ts:94 - Changed "TelemetryFrontendLog" to "DatabricksTelemetryLog" 2. Add proxy support (jadewang-db comment) - DatabricksTelemetryExporter.ts:exportInternal() - Get HTTP agent from connection provider - Pass agent to fetch for proxy support - Follows same pattern as CloudFetchResultHandler and DBSQLSession - Supports http/https/socks proxies with authentication 3. Fix flush timer to prevent rate limiting (sreekanth-db comment) - MetricsAggregator.ts:flush() - Reset timer after manual flushes (batch size, terminal errors) - Ensures consistent 30s spacing between exports - Prevents rapid successive flushes (e.g., batch at 25s, timer at 30s) All changes follow existing driver patterns and maintain backward compatibility. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Add proxy support to feature flag fetching Feature flag fetching was also missing proxy support like telemetry exporter was. Applied the same fix: - Get HTTP agent from connection provider - Pass agent to fetch call for proxy support - Follows same pattern as CloudFetchResultHandler and DBSQLSession - Supports http/https/socks proxies with authentication This completes proxy support for all HTTP operations in the telemetry system (both telemetry export and feature flag fetching). Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Address PR #325 review feedback - Fix CircuitBreaker HALF_OPEN: any failure now immediately reopens the circuit instead of accumulating to failureThreshold, aligning with Resilience4j behavior used in the JDBC driver - Add maxPendingMetrics bound to MetricsAggregator (default 500) to prevent unbounded buffer growth when exports keep failing - Inline buildUrl into DatabricksTelemetryExporter and FeatureFlagCache; remove urlUtils.ts (single-use utility) - Add missing unit tests for DatabricksTelemetryExporter, MetricsAggregator, and TelemetryEventEmitter (HTTP calls, retry logic, batching, flush timers, event routing) Co-authored-by: Isaac * refactor: replace node-fetch injection with sendRequest via connection provider Address PR #325 review feedback: instead of accepting a `fetchFunction` constructor parameter, `DatabricksTelemetryExporter` now delegates HTTP calls to a private `sendRequest()` method that retrieves the agent from `IConnectionProvider.getAgent()` — the same pattern used by `CloudFetchResultHandler` and `DBSQLSession`. This keeps proxy support intact while removing the direct `node-fetch` coupling from the public API. Tests updated to stub `sendRequest` on the instance via sinon instead of injecting a fetch function through the constructor. Co-authored-by: Isaac * style: fix prettier formatting in telemetry files Co-authored-by: Isaac * fix: address telemetry code review issues - ExceptionClassifier: classify ECONNREFUSED, ENOTFOUND, EHOSTUNREACH, ECONNRESET as retryable — these are transient network failures that were previously falling through as non-retryable and silently dropped - DatabricksTelemetryExporter: read driver version from lib/version.ts instead of hardcoded '1.0.0'; use uuid.v4() instead of hand-rolled UUID generation which had incorrect version/variant bits Co-authored-by: Isaac * fix: address code review findings for telemetry infrastructure - Add workspace_id to telemetry log serialization (was silently dropped) - Fix socket leak: consume HTTP response body on success and error paths - Add 10s timeout to all telemetry/feature-flag fetch calls - Fix thundering herd: deduplicate concurrent feature flag fetches - Add Promise.resolve().catch() to flush() to prevent unhandled rejections - Add HALF_OPEN inflight guard to CircuitBreaker (limit to 1 probe) - Rename TelemetryEventType.ERROR to 'telemetry.error' (avoid EventEmitter collision) - Extract shared buildTelemetryUrl utility enforcing HTTPS - Clamp server-provided TTL to [60, 3600] seconds - Skip export silently when auth headers are missing - Log circuit breaker OPEN transitions at warn level - Fix CircuitBreakerStub HALF_OPEN behavior to match real implementation - Snapshot Map keys before iteration in close() - Remove unnecessary 'as any' cast and '| string' type widening Co-authored-by: Isaac * style: fix prettier formatting in CircuitBreaker.ts Co-authored-by: Isaac * fix: resolve ESLint errors in telemetry modules Use dot notation for Authorization header access and convert buildTelemetryUrl to default export to satisfy lint rules. Co-authored-by: Isaac * fix: harden telemetry security + address code-review-squad findings Security (Critical): - buildTelemetryUrl now rejects protocol-relative //prefix, zero-width and BOM codepoints, CR/LF/tab, userinfo, path/query/fragment, and loopback/RFC1918/IMDS/localhost/GCP+Azure-metadata hosts. Defeats the SSRF-shaped Bearer-token exfil vector an attacker-influenced host (env var, tampered config) could trigger. - redactSensitive now covers real Databricks secret shapes: dapi/ dkea/dskea/dsapi/dose PATs, JWT triplets, JSON-quoted access_token/ client_secret/refresh_token/id_token/password/api_key, Basic auth, URL-embedded credentials. Re-applies after truncation. - Unauthenticated export now omits system_configuration entirely, strips userAgentEntry from User-Agent, and blanks stack_trace, so on-path observers cannot re-identify clients on the unauth path. - sanitizeProcessName drops argv tail (handles node --db-password=X app.js shape). Correctness (High): - MetricsAggregator gained a closed flag; close() no longer races with batch-triggered flushes that would resurrect the interval. - evictExpiredStatements now runs from the periodic flush timer so idle clients actually reclaim orphan statement entries. - Evicted statements emit their buffered error events as standalone metrics before being dropped — first-failure signal survives. - Batch-size and terminal-error flush paths pass resetTimer=false so sustained overflow cant starve the periodic tail drain. - TelemetryTerminalError introduced for host-validation refusals, separating that taxonomy from AuthenticationError. - authMissingWarned re-arms after a successful export so operators see a fresh signal the next time auth breaks. - Retry log denominator uses totalAttempts (not maxRetries); negative maxRetries falls back to default; retry log includes the redacted failing error so ops can see whats being retried. API / hygiene: - CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE, and TelemetryTerminalError re-exported from lib/index.ts so consumers can instanceof-check. - DBSQLClient.getAuthProvider marked @internal. - DEFAULT_TELEMETRY_CONFIG / DEFAULT_CIRCUIT_BREAKER_CONFIG frozen. - pushBoundedError uses if instead of while. - CIRCUIT_BREAKER_OPEN_CODE typed as const. - Default export on buildTelemetryUrl removed (no callers). - Dropped wasted new Error allocation in processErrorEvent. Tests: - New telemetryUtils.test.ts (53 tests): URL-rejection table covering every known bypass, redactor shapes, sanitize process name. - DatabricksTelemetryExporter: 13 new tests covering Authorization on-the-wire, authMissingWarned idempotency + re-arm, unauth correlation/system_configuration/userAgentEntry/stack_trace stripping, malformed-host drop, loopback drop, dispose idempotency, errorStack to redacted stack_trace flow. - MetricsAggregator: 2 new tests for async close() awaiting the exporter promise (prevents process.exit truncation) and no timer resurrection after close. 700 unit tests pass, ESLint clean. Co-authored-by: Isaac Signed-off-by: samikshya-chand_data <samikshya.chand@databricks.com> * test: document synthetic-JWT source pattern in redactSensitive test Clarify that the JWT string in the redactor test is intentionally fake and is built from parts so the assembled token never appears as a source literal (to satisfy pre-commit secret scanners). Co-authored-by: Isaac Signed-off-by: samikshya-chand_data <samikshya.chand@databricks.com> * fix: stringify sinon.Call.args[1] before regex test CI's TypeScript was stricter than the local version and rejected the untyped `c.args[1]` passed to `RegExp.test()`. Wrap in `String(...)` so the tests compile on Node 14/16/18/20 runners. Co-authored-by: Isaac Signed-off-by: samikshya-chand_data <samikshya.chand@databricks.com> * fix: address Jade's review comments - ExceptionClassifier: remove ENOTFOUND from retryable (DNS "not found" is deterministic — retrying just pushes load at the resolver without any expectation of success). Add ETIMEDOUT and EAI_AGAIN per Jade's follow-up list. - Extract shared `normalizeHeaders` + `hasAuthorization` helpers into telemetryUtils; DatabricksTelemetryExporter and FeatureFlagCache both use them — eliminates the ~40-line duplication Jade flagged. - normalizeHeaders now guards `typeof raw === 'object'` before Object.entries, preventing Object.entries('some-string') index entries from leaking in as headers (Jade: "should we do type check here?"). - FeatureFlagCache.fetchFeatureFlag: add single-retry on transient errors (classified via ExceptionClassifier). Without a retry, one DNS hiccup would disable telemetry for the full 15-minute cache TTL; one retry gives an ephemeral failure a second chance without pushing sustained load at a broken endpoint. - Drop the private hasAuthorization/normalizeHeaders on the exporter; drop the inlined branching in FFC getAuthHeaders. - Update ExceptionClassifier tests: invert ENOTFOUND expectation with a comment explaining why; add cases for ETIMEDOUT and EAI_AGAIN. 702 unit tests pass, ESLint clean. Co-authored-by: Isaac Signed-off-by: samikshya-chand_data <samikshya.chand@databricks.com> --------- Signed-off-by: samikshya-chand_data <samikshya.chand@databricks.com> Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 69f901c commit 46e2d6e

19 files changed

+4348
-7
lines changed

lib/DBSQLClient.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,12 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
238238
this.config.enableMetricViewMetadata = options.enableMetricViewMetadata;
239239
}
240240

241+
// Persist userAgentEntry so telemetry and feature-flag call sites reuse
242+
// the same value as the primary Thrift connection's User-Agent.
243+
if (options.userAgentEntry !== undefined) {
244+
this.config.userAgentEntry = options.userAgentEntry;
245+
}
246+
241247
this.authProvider = this.createAuthProvider(options, authProvider);
242248

243249
this.connectionProvider = this.createConnectionProvider(options);
@@ -352,4 +358,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
352358
public async getDriver(): Promise<IDriver> {
353359
return this.driver;
354360
}
361+
362+
/**
363+
* Returns the authentication provider associated with this client, if any.
364+
* Intended for internal telemetry/feature-flag call sites that need to
365+
* obtain auth headers directly without routing through `IClientContext`.
366+
*
367+
* @internal Not part of the public API. May change without notice.
368+
*/
369+
public getAuthProvider(): IAuthentication | undefined {
370+
return this.authProvider;
371+
}
355372
}

lib/contracts/IClientContext.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,16 @@ export interface ClientConfig {
2828
telemetryBatchSize?: number;
2929
telemetryFlushIntervalMs?: number;
3030
telemetryMaxRetries?: number;
31+
telemetryBackoffBaseMs?: number;
32+
telemetryBackoffMaxMs?: number;
33+
telemetryBackoffJitterMs?: number;
3134
telemetryAuthenticatedExport?: boolean;
3235
telemetryCircuitBreakerThreshold?: number;
3336
telemetryCircuitBreakerTimeout?: number;
37+
telemetryMaxPendingMetrics?: number;
38+
telemetryMaxErrorsPerStatement?: number;
39+
telemetryStatementTtlMs?: number;
40+
userAgentEntry?: string;
3441
}
3542

3643
export default interface IClientContext {

lib/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ import { LogLevel } from './contracts/IDBSQLLogger';
2323
// Re-export types for TypeScript users
2424
export type { default as ITokenProvider } from './connection/auth/tokenProvider/ITokenProvider';
2525

26+
// Re-export telemetry error classes so consumers can instanceof-check rather
27+
// than string-matching error messages.
28+
export { CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE } from './telemetry/CircuitBreaker';
29+
export { TelemetryTerminalError } from './telemetry/DatabricksTelemetryExporter';
30+
2631
export const auth = {
2732
PlainHttpAuthentication,
2833
// Token provider classes for custom authentication

lib/telemetry/CircuitBreaker.ts

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/**
2+
* Copyright (c) 2025 Databricks Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import IClientContext from '../contracts/IClientContext';
18+
import { LogLevel } from '../contracts/IDBSQLLogger';
19+
20+
export enum CircuitBreakerState {
21+
CLOSED = 'CLOSED',
22+
OPEN = 'OPEN',
23+
HALF_OPEN = 'HALF_OPEN',
24+
}
25+
26+
export interface CircuitBreakerConfig {
27+
failureThreshold: number;
28+
timeout: number;
29+
successThreshold: number;
30+
}
31+
32+
export const DEFAULT_CIRCUIT_BREAKER_CONFIG: Readonly<CircuitBreakerConfig> = Object.freeze({
33+
failureThreshold: 5,
34+
timeout: 60000,
35+
successThreshold: 2,
36+
});
37+
38+
export const CIRCUIT_BREAKER_OPEN_CODE = 'CIRCUIT_BREAKER_OPEN' as const;
39+
40+
/**
41+
* Thrown when execute() is called while the breaker is OPEN or a HALF_OPEN
42+
* probe is already in flight. Callers identify the condition via
43+
* `instanceof CircuitBreakerOpenError` or `err.code === CIRCUIT_BREAKER_OPEN_CODE`
44+
* rather than string-matching the message.
45+
*/
46+
export class CircuitBreakerOpenError extends Error {
47+
readonly code = CIRCUIT_BREAKER_OPEN_CODE;
48+
49+
constructor(message = 'Circuit breaker OPEN') {
50+
super(message);
51+
this.name = 'CircuitBreakerOpenError';
52+
}
53+
}
54+
55+
export class CircuitBreaker {
56+
private state: CircuitBreakerState = CircuitBreakerState.CLOSED;
57+
58+
private failureCount = 0;
59+
60+
private successCount = 0;
61+
62+
private nextAttempt?: Date;
63+
64+
private halfOpenInflight = 0;
65+
66+
private readonly config: CircuitBreakerConfig;
67+
68+
constructor(private context: IClientContext, config?: Partial<CircuitBreakerConfig>) {
69+
this.config = {
70+
...DEFAULT_CIRCUIT_BREAKER_CONFIG,
71+
...config,
72+
};
73+
}
74+
75+
async execute<T>(operation: () => Promise<T>): Promise<T> {
76+
const admitted = this.tryAdmit();
77+
if (!admitted) {
78+
throw new CircuitBreakerOpenError();
79+
}
80+
81+
const { wasHalfOpenProbe } = admitted;
82+
83+
try {
84+
const result = await operation();
85+
this.onSuccess();
86+
return result;
87+
} catch (error) {
88+
this.onFailure();
89+
throw error;
90+
} finally {
91+
if (wasHalfOpenProbe && this.halfOpenInflight > 0) {
92+
this.halfOpenInflight -= 1;
93+
}
94+
}
95+
}
96+
97+
/**
98+
* Synchronous admission check. Returning `null` means "reject". Returning
99+
* an object means the caller is admitted; `wasHalfOpenProbe` indicates
100+
* whether this admission consumed the single HALF_OPEN probe slot so the
101+
* caller can decrement it in `finally`.
102+
*
103+
* Running this as a single synchronous block is what prevents the
104+
* concurrent-probe race that existed in the previous implementation.
105+
*/
106+
private tryAdmit(): { wasHalfOpenProbe: boolean } | null {
107+
const logger = this.context.getLogger();
108+
109+
if (this.state === CircuitBreakerState.OPEN) {
110+
if (this.nextAttempt && Date.now() < this.nextAttempt.getTime()) {
111+
return null;
112+
}
113+
this.state = CircuitBreakerState.HALF_OPEN;
114+
this.successCount = 0;
115+
this.halfOpenInflight = 0;
116+
logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN');
117+
}
118+
119+
if (this.state === CircuitBreakerState.HALF_OPEN) {
120+
if (this.halfOpenInflight > 0) {
121+
return null;
122+
}
123+
this.halfOpenInflight += 1;
124+
return { wasHalfOpenProbe: true };
125+
}
126+
127+
return { wasHalfOpenProbe: false };
128+
}
129+
130+
getState(): CircuitBreakerState {
131+
return this.state;
132+
}
133+
134+
getFailureCount(): number {
135+
return this.failureCount;
136+
}
137+
138+
getSuccessCount(): number {
139+
return this.successCount;
140+
}
141+
142+
private onSuccess(): void {
143+
const logger = this.context.getLogger();
144+
145+
this.failureCount = 0;
146+
147+
if (this.state === CircuitBreakerState.HALF_OPEN) {
148+
this.successCount += 1;
149+
logger.log(
150+
LogLevel.debug,
151+
`Circuit breaker success in HALF_OPEN (${this.successCount}/${this.config.successThreshold})`,
152+
);
153+
154+
if (this.successCount >= this.config.successThreshold) {
155+
this.state = CircuitBreakerState.CLOSED;
156+
this.successCount = 0;
157+
this.nextAttempt = undefined;
158+
logger.log(LogLevel.debug, 'Circuit breaker transitioned to CLOSED');
159+
}
160+
}
161+
}
162+
163+
private onFailure(): void {
164+
const logger = this.context.getLogger();
165+
166+
this.failureCount += 1;
167+
this.successCount = 0;
168+
169+
logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`);
170+
171+
if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) {
172+
this.state = CircuitBreakerState.OPEN;
173+
this.nextAttempt = new Date(Date.now() + this.config.timeout);
174+
logger.log(
175+
LogLevel.warn,
176+
`Telemetry circuit breaker OPEN after ${this.failureCount} failures (will retry after ${this.config.timeout}ms)`,
177+
);
178+
}
179+
}
180+
}
181+
182+
export class CircuitBreakerRegistry {
183+
private breakers: Map<string, CircuitBreaker>;
184+
185+
constructor(private context: IClientContext) {
186+
this.breakers = new Map();
187+
}
188+
189+
getCircuitBreaker(host: string, config?: Partial<CircuitBreakerConfig>): CircuitBreaker {
190+
let breaker = this.breakers.get(host);
191+
if (!breaker) {
192+
breaker = new CircuitBreaker(this.context, config);
193+
this.breakers.set(host, breaker);
194+
const logger = this.context.getLogger();
195+
logger.log(LogLevel.debug, `Created circuit breaker for host: ${host}`);
196+
} else if (config) {
197+
const logger = this.context.getLogger();
198+
logger.log(LogLevel.debug, `Circuit breaker for host ${host} already exists; provided config will be ignored`);
199+
}
200+
return breaker;
201+
}
202+
203+
getAllBreakers(): Map<string, CircuitBreaker> {
204+
return new Map(this.breakers);
205+
}
206+
207+
removeCircuitBreaker(host: string): void {
208+
this.breakers.delete(host);
209+
const logger = this.context.getLogger();
210+
logger.log(LogLevel.debug, `Removed circuit breaker for host: ${host}`);
211+
}
212+
213+
clear(): void {
214+
this.breakers.clear();
215+
}
216+
}

0 commit comments

Comments
 (0)