Skip to content

Commit 5caae2b

Browse files
committed
fix: address code review findings for telemetry infrastructure
- Add workspace_id to telemetry log serialization (was silently dropped) - Fix socket leak: consume HTTP response body on success and error paths - Add 10s timeout to all telemetry/feature-flag fetch calls - Fix thundering herd: deduplicate concurrent feature flag fetches - Add Promise.resolve().catch() to flush() to prevent unhandled rejections - Add HALF_OPEN inflight guard to CircuitBreaker (limit to 1 probe) - Rename TelemetryEventType.ERROR to 'telemetry.error' (avoid EventEmitter collision) - Extract shared buildTelemetryUrl utility enforcing HTTPS - Clamp server-provided TTL to [60, 3600] seconds - Skip export silently when auth headers are missing - Log circuit breaker OPEN transitions at warn level - Fix CircuitBreakerStub HALF_OPEN behavior to match real implementation - Snapshot Map keys before iteration in close() - Remove unnecessary 'as any' cast and '| string' type widening Co-authored-by: Isaac
1 parent 5dbc5a0 commit 5caae2b

12 files changed

+135
-58
lines changed

lib/telemetry/CircuitBreaker.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ export class CircuitBreaker {
6868

6969
private nextAttempt?: Date;
7070

71+
/** Number of in-flight requests in HALF_OPEN state (limits to 1 probe) */
72+
private halfOpenInflight = 0;
73+
7174
private readonly config: CircuitBreakerConfig;
7275

7376
constructor(private context: IClientContext, config?: Partial<CircuitBreakerConfig>) {
@@ -95,16 +98,30 @@ export class CircuitBreaker {
9598
// Timeout expired, transition to HALF_OPEN
9699
this.state = CircuitBreakerState.HALF_OPEN;
97100
this.successCount = 0;
101+
this.halfOpenInflight = 0;
98102
logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN');
99103
}
100104

105+
// In HALF_OPEN state, allow only one probe request at a time
106+
if (this.state === CircuitBreakerState.HALF_OPEN && this.halfOpenInflight > 0) {
107+
throw new Error('Circuit breaker OPEN');
108+
}
109+
110+
if (this.state === CircuitBreakerState.HALF_OPEN) {
111+
this.halfOpenInflight += 1;
112+
}
113+
101114
try {
102115
const result = await operation();
103116
this.onSuccess();
104117
return result;
105118
} catch (error) {
106119
this.onFailure();
107120
throw error;
121+
} finally {
122+
if (this.halfOpenInflight > 0) {
123+
this.halfOpenInflight -= 1;
124+
}
108125
}
109126
}
110127

@@ -171,7 +188,11 @@ export class CircuitBreaker {
171188
if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) {
172189
this.state = CircuitBreakerState.OPEN;
173190
this.nextAttempt = new Date(Date.now() + this.config.timeout);
174-
logger.log(LogLevel.debug, `Circuit breaker transitioned to OPEN (will retry after ${this.config.timeout}ms)`);
191+
// Log at warn level for OPEN transitions — meaningful operational signal
192+
logger.log(
193+
LogLevel.warn,
194+
`Telemetry circuit breaker OPEN after ${this.failureCount} failures (will retry after ${this.config.timeout}ms)`,
195+
);
175196
}
176197
}
177198
}
@@ -202,6 +223,12 @@ export class CircuitBreakerRegistry {
202223
this.breakers.set(host, breaker);
203224
const logger = this.context.getLogger();
204225
logger.log(LogLevel.debug, `Created circuit breaker for host: ${host}`);
226+
} else if (config) {
227+
const logger = this.context.getLogger();
228+
logger.log(
229+
LogLevel.debug,
230+
`Circuit breaker for host ${host} already exists; provided config will be ignored`,
231+
);
205232
}
206233
return breaker;
207234
}

lib/telemetry/DatabricksTelemetryExporter.ts

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import fetch, { Response, RequestInit } from 'node-fetch';
1919
import IClientContext from '../contracts/IClientContext';
2020
import { LogLevel } from '../contracts/IDBSQLLogger';
2121
import { TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types';
22-
import { CircuitBreakerRegistry } from './CircuitBreaker';
22+
import { CircuitBreaker, CircuitBreakerRegistry } from './CircuitBreaker';
2323
import ExceptionClassifier from './ExceptionClassifier';
24+
import { buildTelemetryUrl } from './telemetryUtils';
2425
import driverVersion from '../version';
2526

2627
/**
@@ -87,8 +88,8 @@ interface DatabricksTelemetryPayload {
8788
* Exports telemetry metrics to Databricks telemetry service.
8889
*
8990
* Endpoints:
90-
* - Authenticated: /api/2.0/sql/telemetry-ext
91-
* - Unauthenticated: /api/2.0/sql/telemetry-unauth
91+
* - Authenticated: /telemetry-ext
92+
* - Unauthenticated: /telemetry-unauth
9293
*
9394
* Features:
9495
* - Circuit breaker integration for endpoint protection
@@ -98,7 +99,7 @@ interface DatabricksTelemetryPayload {
9899
* - CRITICAL: All logging at LogLevel.debug ONLY
99100
*/
100101
export default class DatabricksTelemetryExporter {
101-
private circuitBreaker;
102+
private readonly circuitBreaker: CircuitBreaker;
102103

103104
private readonly userAgent: string;
104105

@@ -207,8 +208,8 @@ export default class DatabricksTelemetryExporter {
207208
// Determine endpoint based on authentication mode
208209
const authenticatedExport = config.telemetryAuthenticatedExport ?? DEFAULT_TELEMETRY_CONFIG.authenticatedExport;
209210
const endpoint = authenticatedExport
210-
? this.buildUrl(this.host, '/telemetry-ext')
211-
: this.buildUrl(this.host, '/telemetry-unauth');
211+
? buildTelemetryUrl(this.host, '/telemetry-ext')
212+
: buildTelemetryUrl(this.host, '/telemetry-unauth');
212213

213214
// Format payload - each log is JSON-stringified to match JDBC format
214215
const telemetryLogs = metrics.map((m) => this.toTelemetryLog(m));
@@ -230,6 +231,14 @@ export default class DatabricksTelemetryExporter {
230231
// Get authentication headers if using authenticated endpoint
231232
const authHeaders = authenticatedExport ? await this.context.getAuthHeaders() : {};
232233

234+
// Skip export if authenticated mode is requested but no auth headers available.
235+
// Note: all auth providers in this codebase return plain objects (Record<string, string>).
236+
const headersObj = authHeaders as Record<string, string>;
237+
if (authenticatedExport && (!headersObj || !headersObj['Authorization'])) {
238+
logger.log(LogLevel.debug, 'Skipping telemetry export: authenticated mode but no Authorization header');
239+
return;
240+
}
241+
233242
// Make HTTP POST request with authentication and proxy support
234243
const response: Response = await this.sendRequest(endpoint, {
235244
method: 'POST',
@@ -239,14 +248,19 @@ export default class DatabricksTelemetryExporter {
239248
'User-Agent': this.userAgent,
240249
},
241250
body: JSON.stringify(payload),
251+
timeout: 10000, // 10 second timeout to prevent indefinite hangs
242252
});
243253

244254
if (!response.ok) {
255+
// Consume response body to release socket back to connection pool
256+
await response.text().catch(() => {});
245257
const error: any = new Error(`Telemetry export failed: ${response.status} ${response.statusText}`);
246258
error.statusCode = response.status;
247259
throw error;
248260
}
249261

262+
// Consume response body to release socket back to connection pool
263+
await response.text().catch(() => {});
250264
logger.log(LogLevel.debug, `Successfully exported ${metrics.length} telemetry metrics`);
251265
}
252266

@@ -265,6 +279,7 @@ export default class DatabricksTelemetryExporter {
265279
*/
266280
private toTelemetryLog(metric: TelemetryMetric): DatabricksTelemetryLog {
267281
const log: DatabricksTelemetryLog = {
282+
workspace_id: metric.workspaceId,
268283
frontend_log_event_id: this.generateUUID(),
269284
context: {
270285
client_context: {
@@ -321,16 +336,6 @@ export default class DatabricksTelemetryExporter {
321336
return log;
322337
}
323338

324-
/**
325-
* Build full URL from host and path, handling protocol correctly.
326-
*/
327-
private buildUrl(host: string, path: string): string {
328-
if (host.startsWith('http://') || host.startsWith('https://')) {
329-
return `${host}${path}`;
330-
}
331-
return `https://${host}${path}`;
332-
}
333-
334339
/**
335340
* Generate a UUID v4.
336341
*/

lib/telemetry/FeatureFlagCache.ts

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import fetch from 'node-fetch';
1818
import IClientContext from '../contracts/IClientContext';
1919
import { LogLevel } from '../contracts/IDBSQLLogger';
20+
import { buildTelemetryUrl } from './telemetryUtils';
2021
import driverVersion from '../version';
2122

2223
/**
@@ -37,8 +38,15 @@ export interface FeatureFlagContext {
3738
export default class FeatureFlagCache {
3839
private contexts: Map<string, FeatureFlagContext>;
3940

41+
/** In-flight fetch promises for deduplication (prevents thundering herd) */
42+
private fetchPromises: Map<string, Promise<boolean>> = new Map();
43+
4044
private readonly CACHE_DURATION_MS = 15 * 60 * 1000; // 15 minutes
4145

46+
private readonly MIN_CACHE_DURATION_S = 60; // 1 minute minimum TTL
47+
48+
private readonly MAX_CACHE_DURATION_S = 3600; // 1 hour maximum TTL
49+
4250
private readonly FEATURE_FLAG_NAME = 'databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForNodeJs';
4351

4452
constructor(private context: IClientContext) {
@@ -72,6 +80,7 @@ export default class FeatureFlagCache {
7280
ctx.refCount -= 1;
7381
if (ctx.refCount <= 0) {
7482
this.contexts.delete(host);
83+
this.fetchPromises.delete(host); // Invalidate stale in-flight fetch
7584
}
7685
}
7786
}
@@ -91,22 +100,34 @@ export default class FeatureFlagCache {
91100
const isExpired = !ctx.lastFetched || Date.now() - ctx.lastFetched.getTime() > ctx.cacheDuration;
92101

93102
if (isExpired) {
94-
try {
95-
// Fetch feature flag from server
96-
ctx.telemetryEnabled = await this.fetchFeatureFlag(host);
97-
ctx.lastFetched = new Date();
98-
} catch (error: any) {
99-
// Log at debug level only, never propagate exceptions
100-
logger.log(LogLevel.debug, `Error fetching feature flag: ${error.message}`);
103+
// Deduplicate concurrent fetches for the same host (prevents thundering herd)
104+
if (!this.fetchPromises.has(host)) {
105+
const fetchPromise = this.fetchFeatureFlag(host)
106+
.then((enabled) => {
107+
ctx.telemetryEnabled = enabled;
108+
ctx.lastFetched = new Date();
109+
return enabled;
110+
})
111+
.catch((error: any) => {
112+
logger.log(LogLevel.debug, `Error fetching feature flag: ${error.message}`);
113+
return ctx.telemetryEnabled ?? false;
114+
})
115+
.finally(() => {
116+
this.fetchPromises.delete(host);
117+
});
118+
this.fetchPromises.set(host, fetchPromise);
101119
}
120+
121+
// Promise is guaranteed to resolve (never rejects) due to .catch() in the chain above
122+
await this.fetchPromises.get(host);
102123
}
103124

104125
return ctx.telemetryEnabled ?? false;
105126
}
106127

107128
/**
108129
* Fetches feature flag from server using connector-service API.
109-
* Calls GET /api/2.0/connector-service/feature-flags/OSS_NODEJS/{version}
130+
* Calls GET /api/2.0/connector-service/feature-flags/NODEJS/{version}
110131
*
111132
* @param host The host to fetch feature flag for
112133
* @returns true if feature flag is enabled, false otherwise
@@ -119,7 +140,7 @@ export default class FeatureFlagCache {
119140
const version = this.getDriverVersion();
120141

121142
// Build feature flags endpoint for Node.js driver
122-
const endpoint = this.buildUrl(host, `/api/2.0/connector-service/feature-flags/NODEJS/${version}`);
143+
const endpoint = buildTelemetryUrl(host, `/api/2.0/connector-service/feature-flags/NODEJS/${version}`);
123144

124145
// Get authentication headers
125146
const authHeaders = await this.context.getAuthHeaders();
@@ -139,9 +160,12 @@ export default class FeatureFlagCache {
139160
'User-Agent': `databricks-sql-nodejs/${driverVersion}`,
140161
},
141162
agent, // Include agent for proxy support
163+
timeout: 10000, // 10 second timeout to prevent indefinite hangs
142164
});
143165

144166
if (!response.ok) {
167+
// Consume response body to release socket back to connection pool
168+
await response.text().catch(() => {});
145169
logger.log(LogLevel.debug, `Feature flag fetch failed: ${response.status} ${response.statusText}`);
146170
return false;
147171
}
@@ -151,11 +175,12 @@ export default class FeatureFlagCache {
151175

152176
// Response format: { flags: [{ name: string, value: string }], ttl_seconds?: number }
153177
if (data && data.flags && Array.isArray(data.flags)) {
154-
// Update cache duration if TTL provided
178+
// Update cache duration if TTL provided, clamped to safe bounds
155179
const ctx = this.contexts.get(host);
156-
if (ctx && data.ttl_seconds) {
157-
ctx.cacheDuration = data.ttl_seconds * 1000; // Convert to milliseconds
158-
logger.log(LogLevel.debug, `Updated cache duration to ${data.ttl_seconds} seconds`);
180+
if (ctx && typeof data.ttl_seconds === 'number' && data.ttl_seconds > 0) {
181+
const clampedTtl = Math.max(this.MIN_CACHE_DURATION_S, Math.min(this.MAX_CACHE_DURATION_S, data.ttl_seconds));
182+
ctx.cacheDuration = clampedTtl * 1000; // Convert to milliseconds
183+
logger.log(LogLevel.debug, `Updated cache duration to ${clampedTtl} seconds`);
159184
}
160185

161186
// Look for our specific feature flag
@@ -180,16 +205,6 @@ export default class FeatureFlagCache {
180205
}
181206
}
182207

183-
/**
184-
* Build full URL from host and path, handling protocol correctly.
185-
*/
186-
private buildUrl(host: string, path: string): string {
187-
if (host.startsWith('http://') || host.startsWith('https://')) {
188-
return `${host}${path}`;
189-
}
190-
return `https://${host}${path}`;
191-
}
192-
193208
/**
194209
* Gets the driver version without -oss suffix for API calls.
195210
* Format: "1.12.0" from "1.12.0-oss"

lib/telemetry/MetricsAggregator.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,11 @@ export default class MetricsAggregator {
329329

330330
logger.log(LogLevel.debug, `Flushing ${metricsToExport.length} telemetry metrics`);
331331

332-
// Export metrics (exporter.export never throws)
333-
this.exporter.export(metricsToExport);
332+
// Export metrics - exporter.export is designed to never throw, but add safety catch
333+
// to prevent unhandled promise rejections from crashing the process (Node.js 15+)
334+
Promise.resolve(this.exporter.export(metricsToExport)).catch((err: any) => {
335+
logger.log(LogLevel.debug, `Unexpected export error: ${err?.message}`);
336+
});
334337

335338
// Reset timer to avoid rapid successive flushes (e.g., batch flush at 25s then timer flush at 30s)
336339
// This ensures consistent spacing between exports and helps avoid rate limiting
@@ -380,8 +383,9 @@ export default class MetricsAggregator {
380383
this.flushTimer = null;
381384
}
382385

383-
// Complete any remaining statements
384-
for (const statementId of this.statementMetrics.keys()) {
386+
// Complete any remaining statements (snapshot keys to avoid mutation during iteration)
387+
const remainingStatements = [...this.statementMetrics.keys()];
388+
for (const statementId of remainingStatements) {
385389
this.completeStatement(statementId);
386390
}
387391

lib/telemetry/TelemetryEventEmitter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export default class TelemetryEventEmitter extends EventEmitter {
3636
super();
3737
// Check if telemetry is enabled from config
3838
// Default to false for safe rollout
39-
const config = context.getConfig() as any;
39+
const config = context.getConfig();
4040
this.enabled = config.telemetryEnabled ?? false;
4141
}
4242

lib/telemetry/telemetryUtils.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Copyright (c) 2025 Databricks Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Build full URL from host and path, always using HTTPS.
19+
* Strips any existing protocol prefix and enforces HTTPS.
20+
*/
21+
export function buildTelemetryUrl(host: string, path: string): string {
22+
const cleanHost = host.replace(/^https?:\/\//, '').replace(/\/+$/, '');
23+
return `https://${cleanHost}${path}`;
24+
}

lib/telemetry/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export enum TelemetryEventType {
2727
STATEMENT_START = 'statement.start',
2828
STATEMENT_COMPLETE = 'statement.complete',
2929
CLOUDFETCH_CHUNK = 'cloudfetch.chunk',
30-
ERROR = 'error',
30+
ERROR = 'telemetry.error',
3131
}
3232

3333
/**
@@ -78,7 +78,7 @@ export const DEFAULT_TELEMETRY_CONFIG: Required<TelemetryConfiguration> = {
7878
*/
7979
export interface TelemetryEvent {
8080
/** Type of the event */
81-
eventType: TelemetryEventType | string;
81+
eventType: TelemetryEventType;
8282

8383
/** Timestamp when the event occurred (milliseconds since epoch) */
8484
timestamp: number;

tests/unit/.stubs/CircuitBreakerStub.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ export default class CircuitBreakerStub {
112112
private onFailure(): void {
113113
this.failureCount++;
114114
this.successCount = 0;
115-
if (this.failureCount >= 5) {
115+
// In HALF_OPEN state, any single failure immediately reopens (matches real implementation)
116+
if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= 5) {
116117
this.state = CircuitBreakerState.OPEN;
117118
}
118119
}

tests/unit/.stubs/ClientContextStub.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@ export default class ClientContextStub implements IClientContext {
5151
}
5252

5353
public async getAuthHeaders(): Promise<HeadersInit> {
54-
return {};
54+
return { Authorization: 'Bearer test-token' };
5555
}
5656
}

0 commit comments

Comments
 (0)