Skip to content

Commit 00a62e8

Browse files
committed
fix: harden telemetry security + address code-review-squad findings
Security (Critical): - buildTelemetryUrl now rejects protocol-relative //prefix, zero-width and BOM codepoints, CR/LF/tab, userinfo, path/query/fragment, and loopback/RFC1918/IMDS/localhost/GCP+Azure-metadata hosts. Defeats the SSRF-shaped Bearer-token exfil vector an attacker-influenced host (env var, tampered config) could trigger. - redactSensitive now covers real Databricks secret shapes: dapi/ dkea/dskea/dsapi/dose PATs, JWT triplets, JSON-quoted access_token/ client_secret/refresh_token/id_token/password/api_key, Basic auth, URL-embedded credentials. Re-applies after truncation. - Unauthenticated export now omits system_configuration entirely, strips userAgentEntry from User-Agent, and blanks stack_trace, so on-path observers cannot re-identify clients on the unauth path. - sanitizeProcessName drops argv tail (handles node --db-password=X app.js shape). Correctness (High): - MetricsAggregator gained a closed flag; close() no longer races with batch-triggered flushes that would resurrect the interval. - evictExpiredStatements now runs from the periodic flush timer so idle clients actually reclaim orphan statement entries. - Evicted statements emit their buffered error events as standalone metrics before being dropped — first-failure signal survives. - Batch-size and terminal-error flush paths pass resetTimer=false so sustained overflow cant starve the periodic tail drain. - TelemetryTerminalError introduced for host-validation refusals, separating that taxonomy from AuthenticationError. - authMissingWarned re-arms after a successful export so operators see a fresh signal the next time auth breaks. - Retry log denominator uses totalAttempts (not maxRetries); negative maxRetries falls back to default; retry log includes the redacted failing error so ops can see whats being retried. API / hygiene: - CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE, and TelemetryTerminalError re-exported from lib/index.ts so consumers can instanceof-check. - DBSQLClient.getAuthProvider marked @internal. - DEFAULT_TELEMETRY_CONFIG / DEFAULT_CIRCUIT_BREAKER_CONFIG frozen. - pushBoundedError uses if instead of while. - CIRCUIT_BREAKER_OPEN_CODE typed as const. - Default export on buildTelemetryUrl removed (no callers). - Dropped wasted new Error allocation in processErrorEvent. Tests: - New telemetryUtils.test.ts (53 tests): URL-rejection table covering every known bypass, redactor shapes, sanitize process name. - DatabricksTelemetryExporter: 13 new tests covering Authorization on-the-wire, authMissingWarned idempotency + re-arm, unauth correlation/system_configuration/userAgentEntry/stack_trace stripping, malformed-host drop, loopback drop, dispose idempotency, errorStack to redacted stack_trace flow. - MetricsAggregator: 2 new tests for async close() awaiting the exporter promise (prevents process.exit truncation) and no timer resurrection after close. 700 unit tests pass, ESLint clean. Co-authored-by: Isaac Signed-off-by: samikshya-chand_data <samikshya.chand@databricks.com>
1 parent 1205fff commit 00a62e8

16 files changed

+1327
-456
lines changed

lib/DBSQLClient.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import thrift from 'thrift';
22
import Int64 from 'node-int64';
33

44
import { EventEmitter } from 'events';
5-
import { HeadersInit } from 'node-fetch';
65
import TCLIService from '../thrift/TCLIService';
76
import { TProtocolVersion } from '../thrift/TCLIService_types';
87
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
@@ -239,6 +238,12 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
239238
this.config.enableMetricViewMetadata = options.enableMetricViewMetadata;
240239
}
241240

241+
// Persist userAgentEntry so telemetry and feature-flag call sites reuse
242+
// the same value as the primary Thrift connection's User-Agent.
243+
if (options.userAgentEntry !== undefined) {
244+
this.config.userAgentEntry = options.userAgentEntry;
245+
}
246+
242247
this.authProvider = this.createAuthProvider(options, authProvider);
243248

244249
this.connectionProvider = this.createConnectionProvider(options);
@@ -354,15 +359,14 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
354359
return this.driver;
355360
}
356361

357-
public async getAuthHeaders(): Promise<HeadersInit> {
358-
if (this.authProvider) {
359-
try {
360-
return await this.authProvider.authenticate();
361-
} catch (error) {
362-
this.logger.log(LogLevel.debug, `Error getting auth headers: ${error}`);
363-
return {};
364-
}
365-
}
366-
return {};
362+
/**
363+
* Returns the authentication provider associated with this client, if any.
364+
* Intended for internal telemetry/feature-flag call sites that need to
365+
* obtain auth headers directly without routing through `IClientContext`.
366+
*
367+
* @internal Not part of the public API. May change without notice.
368+
*/
369+
public getAuthProvider(): IAuthentication | undefined {
370+
return this.authProvider;
367371
}
368372
}

lib/contracts/IClientContext.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { HeadersInit } from 'node-fetch';
21
import IDBSQLLogger from './IDBSQLLogger';
32
import IDriver from './IDriver';
43
import IConnectionProvider from '../connection/contracts/IConnectionProvider';
@@ -29,10 +28,16 @@ export interface ClientConfig {
2928
telemetryBatchSize?: number;
3029
telemetryFlushIntervalMs?: number;
3130
telemetryMaxRetries?: number;
31+
telemetryBackoffBaseMs?: number;
32+
telemetryBackoffMaxMs?: number;
33+
telemetryBackoffJitterMs?: number;
3234
telemetryAuthenticatedExport?: boolean;
3335
telemetryCircuitBreakerThreshold?: number;
3436
telemetryCircuitBreakerTimeout?: number;
3537
telemetryMaxPendingMetrics?: number;
38+
telemetryMaxErrorsPerStatement?: number;
39+
telemetryStatementTtlMs?: number;
40+
userAgentEntry?: string;
3641
}
3742

3843
export default interface IClientContext {
@@ -45,11 +50,4 @@ export default interface IClientContext {
4550
getClient(): Promise<IThriftClient>;
4651

4752
getDriver(): Promise<IDriver>;
48-
49-
/**
50-
* Gets authentication headers for HTTP requests.
51-
* Used by telemetry and feature flag fetching to authenticate REST API calls.
52-
* @returns Promise resolving to headers object with authentication, or empty object if no auth
53-
*/
54-
getAuthHeaders(): Promise<HeadersInit>;
5553
}

lib/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ import { LogLevel } from './contracts/IDBSQLLogger';
2323
// Re-export types for TypeScript users
2424
export type { default as ITokenProvider } from './connection/auth/tokenProvider/ITokenProvider';
2525

26+
// Re-export telemetry error classes so consumers can instanceof-check rather
27+
// than string-matching error messages.
28+
export { CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE } from './telemetry/CircuitBreaker';
29+
export { TelemetryTerminalError } from './telemetry/DatabricksTelemetryExporter';
30+
2631
export const auth = {
2732
PlainHttpAuthentication,
2833
// Token provider classes for custom authentication

lib/telemetry/CircuitBreaker.ts

Lines changed: 54 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,41 @@
1717
import IClientContext from '../contracts/IClientContext';
1818
import { LogLevel } from '../contracts/IDBSQLLogger';
1919

20-
/**
21-
* States of the circuit breaker.
22-
*/
2320
export enum CircuitBreakerState {
24-
/** Normal operation, requests pass through */
2521
CLOSED = 'CLOSED',
26-
/** After threshold failures, all requests rejected immediately */
2722
OPEN = 'OPEN',
28-
/** After timeout, allows test requests to check if endpoint recovered */
2923
HALF_OPEN = 'HALF_OPEN',
3024
}
3125

32-
/**
33-
* Configuration for circuit breaker behavior.
34-
*/
3526
export interface CircuitBreakerConfig {
36-
/** Number of consecutive failures before opening the circuit */
3727
failureThreshold: number;
38-
/** Time in milliseconds to wait before attempting recovery */
3928
timeout: number;
40-
/** Number of consecutive successes in HALF_OPEN state to close the circuit */
4129
successThreshold: number;
4230
}
4331

44-
/**
45-
* Default circuit breaker configuration.
46-
*/
47-
export const DEFAULT_CIRCUIT_BREAKER_CONFIG: CircuitBreakerConfig = {
32+
export const DEFAULT_CIRCUIT_BREAKER_CONFIG: Readonly<CircuitBreakerConfig> = Object.freeze({
4833
failureThreshold: 5,
49-
timeout: 60000, // 1 minute
34+
timeout: 60000,
5035
successThreshold: 2,
51-
};
36+
});
37+
38+
export const CIRCUIT_BREAKER_OPEN_CODE = 'CIRCUIT_BREAKER_OPEN' as const;
5239

5340
/**
54-
* Circuit breaker for telemetry exporter.
55-
* Protects against failing telemetry endpoint with automatic recovery.
56-
*
57-
* States:
58-
* - CLOSED: Normal operation, requests pass through
59-
* - OPEN: After threshold failures, all requests rejected immediately
60-
* - HALF_OPEN: After timeout, allows test requests to check if endpoint recovered
41+
* Thrown when execute() is called while the breaker is OPEN or a HALF_OPEN
42+
* probe is already in flight. Callers identify the condition via
43+
* `instanceof CircuitBreakerOpenError` or `err.code === CIRCUIT_BREAKER_OPEN_CODE`
44+
* rather than string-matching the message.
6145
*/
46+
export class CircuitBreakerOpenError extends Error {
47+
readonly code = CIRCUIT_BREAKER_OPEN_CODE;
48+
49+
constructor(message = 'Circuit breaker OPEN') {
50+
super(message);
51+
this.name = 'CircuitBreakerOpenError';
52+
}
53+
}
54+
6255
export class CircuitBreaker {
6356
private state: CircuitBreakerState = CircuitBreakerState.CLOSED;
6457

@@ -68,7 +61,6 @@ export class CircuitBreaker {
6861

6962
private nextAttempt?: Date;
7063

71-
/** Number of in-flight requests in HALF_OPEN state (limits to 1 probe) */
7264
private halfOpenInflight = 0;
7365

7466
private readonly config: CircuitBreakerConfig;
@@ -80,79 +72,76 @@ export class CircuitBreaker {
8072
};
8173
}
8274

75+
async execute<T>(operation: () => Promise<T>): Promise<T> {
76+
const admitted = this.tryAdmit();
77+
if (!admitted) {
78+
throw new CircuitBreakerOpenError();
79+
}
80+
81+
const { wasHalfOpenProbe } = admitted;
82+
83+
try {
84+
const result = await operation();
85+
this.onSuccess();
86+
return result;
87+
} catch (error) {
88+
this.onFailure();
89+
throw error;
90+
} finally {
91+
if (wasHalfOpenProbe && this.halfOpenInflight > 0) {
92+
this.halfOpenInflight -= 1;
93+
}
94+
}
95+
}
96+
8397
/**
84-
* Executes an operation with circuit breaker protection.
98+
* Synchronous admission check. Returning `null` means "reject". Returning
99+
* an object means the caller is admitted; `wasHalfOpenProbe` indicates
100+
* whether this admission consumed the single HALF_OPEN probe slot so the
101+
* caller can decrement it in `finally`.
85102
*
86-
* @param operation The operation to execute
87-
* @returns Promise resolving to the operation result
88-
* @throws Error if circuit is OPEN or operation fails
103+
* Running this as a single synchronous block is what prevents the
104+
* concurrent-probe race that existed in the previous implementation.
89105
*/
90-
async execute<T>(operation: () => Promise<T>): Promise<T> {
106+
private tryAdmit(): { wasHalfOpenProbe: boolean } | null {
91107
const logger = this.context.getLogger();
92108

93-
// Check if circuit is open
94109
if (this.state === CircuitBreakerState.OPEN) {
95110
if (this.nextAttempt && Date.now() < this.nextAttempt.getTime()) {
96-
throw new Error('Circuit breaker OPEN');
111+
return null;
97112
}
98-
// Timeout expired, transition to HALF_OPEN
99113
this.state = CircuitBreakerState.HALF_OPEN;
100114
this.successCount = 0;
101115
this.halfOpenInflight = 0;
102116
logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN');
103117
}
104118

105-
// In HALF_OPEN state, allow only one probe request at a time
106-
if (this.state === CircuitBreakerState.HALF_OPEN && this.halfOpenInflight > 0) {
107-
throw new Error('Circuit breaker OPEN');
108-
}
109-
110119
if (this.state === CircuitBreakerState.HALF_OPEN) {
111-
this.halfOpenInflight += 1;
112-
}
113-
114-
try {
115-
const result = await operation();
116-
this.onSuccess();
117-
return result;
118-
} catch (error) {
119-
this.onFailure();
120-
throw error;
121-
} finally {
122120
if (this.halfOpenInflight > 0) {
123-
this.halfOpenInflight -= 1;
121+
return null;
124122
}
123+
this.halfOpenInflight += 1;
124+
return { wasHalfOpenProbe: true };
125125
}
126+
127+
return { wasHalfOpenProbe: false };
126128
}
127129

128-
/**
129-
* Gets the current state of the circuit breaker.
130-
*/
131130
getState(): CircuitBreakerState {
132131
return this.state;
133132
}
134133

135-
/**
136-
* Gets the current failure count.
137-
*/
138134
getFailureCount(): number {
139135
return this.failureCount;
140136
}
141137

142-
/**
143-
* Gets the current success count (relevant in HALF_OPEN state).
144-
*/
145138
getSuccessCount(): number {
146139
return this.successCount;
147140
}
148141

149-
/**
150-
* Handles successful operation execution.
151-
*/
152142
private onSuccess(): void {
153143
const logger = this.context.getLogger();
154144

155-
// Reset failure count on any success
156145
this.failureCount = 0;
157146

158147
if (this.state === CircuitBreakerState.HALF_OPEN) {
@@ -163,7 +152,6 @@ export class CircuitBreaker {
163152
);
164153

165154
if (this.successCount >= this.config.successThreshold) {
166-
// Transition to CLOSED
167155
this.state = CircuitBreakerState.CLOSED;
168156
this.successCount = 0;
169157
this.nextAttempt = undefined;
@@ -172,23 +160,17 @@ export class CircuitBreaker {
172160
}
173161
}
174162

175-
/**
176-
* Handles failed operation execution.
177-
*/
178163
private onFailure(): void {
179164
const logger = this.context.getLogger();
180165

181166
this.failureCount += 1;
182-
this.successCount = 0; // Reset success count on failure
167+
this.successCount = 0;
183168

184169
logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`);
185170

186-
// In HALF_OPEN state, any failure immediately reopens the circuit.
187-
// In CLOSED state, reopen only after failureThreshold consecutive failures.
188171
if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) {
189172
this.state = CircuitBreakerState.OPEN;
190173
this.nextAttempt = new Date(Date.now() + this.config.timeout);
191-
// Log at warn level for OPEN transitions — meaningful operational signal
192174
logger.log(
193175
LogLevel.warn,
194176
`Telemetry circuit breaker OPEN after ${this.failureCount} failures (will retry after ${this.config.timeout}ms)`,
@@ -197,25 +179,13 @@ export class CircuitBreaker {
197179
}
198180
}
199181

200-
/**
201-
* Manages circuit breakers per host.
202-
* Ensures each host has its own isolated circuit breaker to prevent
203-
* failures on one host from affecting telemetry to other hosts.
204-
*/
205182
export class CircuitBreakerRegistry {
206183
private breakers: Map<string, CircuitBreaker>;
207184

208185
constructor(private context: IClientContext) {
209186
this.breakers = new Map();
210187
}
211188

212-
/**
213-
* Gets or creates a circuit breaker for the specified host.
214-
*
215-
* @param host The host identifier (e.g., "workspace.cloud.databricks.com")
216-
* @param config Optional configuration overrides
217-
* @returns Circuit breaker for the host
218-
*/
219189
getCircuitBreaker(host: string, config?: Partial<CircuitBreakerConfig>): CircuitBreaker {
220190
let breaker = this.breakers.get(host);
221191
if (!breaker) {
@@ -230,30 +200,16 @@ export class CircuitBreakerRegistry {
230200
return breaker;
231201
}
232202

233-
/**
234-
* Gets all registered circuit breakers.
235-
* Useful for testing and diagnostics.
236-
*/
237203
getAllBreakers(): Map<string, CircuitBreaker> {
238204
return new Map(this.breakers);
239205
}
240206

241-
/**
242-
* Removes a circuit breaker for the specified host.
243-
* Useful for cleanup when a host is no longer in use.
244-
*
245-
* @param host The host identifier
246-
*/
247207
removeCircuitBreaker(host: string): void {
248208
this.breakers.delete(host);
249209
const logger = this.context.getLogger();
250210
logger.log(LogLevel.debug, `Removed circuit breaker for host: ${host}`);
251211
}
252212

253-
/**
254-
* Clears all circuit breakers.
255-
* Useful for testing.
256-
*/
257213
clear(): void {
258214
this.breakers.clear();
259215
}

0 commit comments

Comments
 (0)