diff --git a/.gitignore b/.gitignore index 99381ce5..d742a0e4 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ coverage_unit dist *.DS_Store lib/version.ts +coverage/ diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index ee53e790..b8656cf8 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -2,6 +2,7 @@ import thrift from 'thrift'; import Int64 from 'node-int64'; import { EventEmitter } from 'events'; +import { HeadersInit } from 'node-fetch'; import TCLIService from '../thrift/TCLIService'; import { TProtocolVersion } from '../thrift/TCLIService_types'; import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient'; @@ -238,12 +239,6 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I this.config.enableMetricViewMetadata = options.enableMetricViewMetadata; } - // Persist userAgentEntry so telemetry and feature-flag call sites reuse - // the same value as the primary Thrift connection's User-Agent. - if (options.userAgentEntry !== undefined) { - this.config.userAgentEntry = options.userAgentEntry; - } - this.authProvider = this.createAuthProvider(options, authProvider); this.connectionProvider = this.createConnectionProvider(options); @@ -359,14 +354,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I return this.driver; } - /** - * Returns the authentication provider associated with this client, if any. - * Intended for internal telemetry/feature-flag call sites that need to - * obtain auth headers directly without routing through `IClientContext`. - * - * @internal Not part of the public API. May change without notice. - */ - public getAuthProvider(): IAuthentication | undefined { - return this.authProvider; + public async getAuthHeaders(): Promise { + if (this.authProvider) { + try { + return await this.authProvider.authenticate(); + } catch (error) { + this.logger.log(LogLevel.debug, `Error getting auth headers: ${error}`); + return {}; + } + } + return {}; } } diff --git a/lib/contracts/IClientContext.ts b/lib/contracts/IClientContext.ts index c7274a1b..051dcf3f 100644 --- a/lib/contracts/IClientContext.ts +++ b/lib/contracts/IClientContext.ts @@ -1,3 +1,4 @@ +import { HeadersInit } from 'node-fetch'; import IDBSQLLogger from './IDBSQLLogger'; import IDriver from './IDriver'; import IConnectionProvider from '../connection/contracts/IConnectionProvider'; @@ -28,16 +29,10 @@ export interface ClientConfig { telemetryBatchSize?: number; telemetryFlushIntervalMs?: number; telemetryMaxRetries?: number; - telemetryBackoffBaseMs?: number; - telemetryBackoffMaxMs?: number; - telemetryBackoffJitterMs?: number; telemetryAuthenticatedExport?: boolean; telemetryCircuitBreakerThreshold?: number; telemetryCircuitBreakerTimeout?: number; telemetryMaxPendingMetrics?: number; - telemetryMaxErrorsPerStatement?: number; - telemetryStatementTtlMs?: number; - userAgentEntry?: string; } export default interface IClientContext { @@ -50,4 +45,11 @@ export default interface IClientContext { getClient(): Promise; getDriver(): Promise; + + /** + * Gets authentication headers for HTTP requests. + * Used by telemetry and feature flag fetching to authenticate REST API calls. + * @returns Promise resolving to headers object with authentication, or empty object if no auth + */ + getAuthHeaders(): Promise; } diff --git a/lib/index.ts b/lib/index.ts index 81e3aaae..adf14f36 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -23,11 +23,6 @@ import { LogLevel } from './contracts/IDBSQLLogger'; // Re-export types for TypeScript users export type { default as ITokenProvider } from './connection/auth/tokenProvider/ITokenProvider'; -// Re-export telemetry error classes so consumers can instanceof-check rather -// than string-matching error messages. -export { CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE } from './telemetry/CircuitBreaker'; -export { TelemetryTerminalError } from './telemetry/DatabricksTelemetryExporter'; - export const auth = { PlainHttpAuthentication, // Token provider classes for custom authentication diff --git a/lib/telemetry/CircuitBreaker.ts b/lib/telemetry/CircuitBreaker.ts index 33b1fb5b..b8dbd296 100644 --- a/lib/telemetry/CircuitBreaker.ts +++ b/lib/telemetry/CircuitBreaker.ts @@ -17,41 +17,48 @@ import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; +/** + * States of the circuit breaker. + */ export enum CircuitBreakerState { + /** Normal operation, requests pass through */ CLOSED = 'CLOSED', + /** After threshold failures, all requests rejected immediately */ OPEN = 'OPEN', + /** After timeout, allows test requests to check if endpoint recovered */ HALF_OPEN = 'HALF_OPEN', } +/** + * Configuration for circuit breaker behavior. + */ export interface CircuitBreakerConfig { + /** Number of consecutive failures before opening the circuit */ failureThreshold: number; + /** Time in milliseconds to wait before attempting recovery */ timeout: number; + /** Number of consecutive successes in HALF_OPEN state to close the circuit */ successThreshold: number; } -export const DEFAULT_CIRCUIT_BREAKER_CONFIG: Readonly = Object.freeze({ +/** + * Default circuit breaker configuration. + */ +export const DEFAULT_CIRCUIT_BREAKER_CONFIG: CircuitBreakerConfig = { failureThreshold: 5, - timeout: 60000, + timeout: 60000, // 1 minute successThreshold: 2, -}); - -export const CIRCUIT_BREAKER_OPEN_CODE = 'CIRCUIT_BREAKER_OPEN' as const; +}; /** - * Thrown when execute() is called while the breaker is OPEN or a HALF_OPEN - * probe is already in flight. Callers identify the condition via - * `instanceof CircuitBreakerOpenError` or `err.code === CIRCUIT_BREAKER_OPEN_CODE` - * rather than string-matching the message. + * Circuit breaker for telemetry exporter. + * Protects against failing telemetry endpoint with automatic recovery. + * + * States: + * - CLOSED: Normal operation, requests pass through + * - OPEN: After threshold failures, all requests rejected immediately + * - HALF_OPEN: After timeout, allows test requests to check if endpoint recovered */ -export class CircuitBreakerOpenError extends Error { - readonly code = CIRCUIT_BREAKER_OPEN_CODE; - - constructor(message = 'Circuit breaker OPEN') { - super(message); - this.name = 'CircuitBreakerOpenError'; - } -} - export class CircuitBreaker { private state: CircuitBreakerState = CircuitBreakerState.CLOSED; @@ -61,6 +68,7 @@ export class CircuitBreaker { private nextAttempt?: Date; + /** Number of in-flight requests in HALF_OPEN state (limits to 1 probe) */ private halfOpenInflight = 0; private readonly config: CircuitBreakerConfig; @@ -72,76 +80,79 @@ export class CircuitBreaker { }; } - async execute(operation: () => Promise): Promise { - const admitted = this.tryAdmit(); - if (!admitted) { - throw new CircuitBreakerOpenError(); - } - - const { wasHalfOpenProbe } = admitted; - - try { - const result = await operation(); - this.onSuccess(); - return result; - } catch (error) { - this.onFailure(); - throw error; - } finally { - if (wasHalfOpenProbe && this.halfOpenInflight > 0) { - this.halfOpenInflight -= 1; - } - } - } - /** - * Synchronous admission check. Returning `null` means "reject". Returning - * an object means the caller is admitted; `wasHalfOpenProbe` indicates - * whether this admission consumed the single HALF_OPEN probe slot so the - * caller can decrement it in `finally`. + * Executes an operation with circuit breaker protection. * - * Running this as a single synchronous block is what prevents the - * concurrent-probe race that existed in the previous implementation. + * @param operation The operation to execute + * @returns Promise resolving to the operation result + * @throws Error if circuit is OPEN or operation fails */ - private tryAdmit(): { wasHalfOpenProbe: boolean } | null { + async execute(operation: () => Promise): Promise { const logger = this.context.getLogger(); + // Check if circuit is open if (this.state === CircuitBreakerState.OPEN) { if (this.nextAttempt && Date.now() < this.nextAttempt.getTime()) { - return null; + throw new Error('Circuit breaker OPEN'); } + // Timeout expired, transition to HALF_OPEN this.state = CircuitBreakerState.HALF_OPEN; this.successCount = 0; this.halfOpenInflight = 0; logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN'); } + // In HALF_OPEN state, allow only one probe request at a time + if (this.state === CircuitBreakerState.HALF_OPEN && this.halfOpenInflight > 0) { + throw new Error('Circuit breaker OPEN'); + } + if (this.state === CircuitBreakerState.HALF_OPEN) { - if (this.halfOpenInflight > 0) { - return null; - } this.halfOpenInflight += 1; - return { wasHalfOpenProbe: true }; } - return { wasHalfOpenProbe: false }; + try { + const result = await operation(); + this.onSuccess(); + return result; + } catch (error) { + this.onFailure(); + throw error; + } finally { + if (this.halfOpenInflight > 0) { + this.halfOpenInflight -= 1; + } + } } + /** + * Gets the current state of the circuit breaker. + */ getState(): CircuitBreakerState { return this.state; } + /** + * Gets the current failure count. + */ getFailureCount(): number { return this.failureCount; } + /** + * Gets the current success count (relevant in HALF_OPEN state). + */ getSuccessCount(): number { return this.successCount; } + /** + * Handles successful operation execution. + */ private onSuccess(): void { const logger = this.context.getLogger(); + // Reset failure count on any success this.failureCount = 0; if (this.state === CircuitBreakerState.HALF_OPEN) { @@ -152,6 +163,7 @@ export class CircuitBreaker { ); if (this.successCount >= this.config.successThreshold) { + // Transition to CLOSED this.state = CircuitBreakerState.CLOSED; this.successCount = 0; this.nextAttempt = undefined; @@ -160,17 +172,23 @@ export class CircuitBreaker { } } + /** + * Handles failed operation execution. + */ private onFailure(): void { const logger = this.context.getLogger(); this.failureCount += 1; - this.successCount = 0; + this.successCount = 0; // Reset success count on failure logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`); + // In HALF_OPEN state, any failure immediately reopens the circuit. + // In CLOSED state, reopen only after failureThreshold consecutive failures. if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) { this.state = CircuitBreakerState.OPEN; this.nextAttempt = new Date(Date.now() + this.config.timeout); + // Log at warn level for OPEN transitions — meaningful operational signal logger.log( LogLevel.warn, `Telemetry circuit breaker OPEN after ${this.failureCount} failures (will retry after ${this.config.timeout}ms)`, @@ -179,6 +197,11 @@ export class CircuitBreaker { } } +/** + * Manages circuit breakers per host. + * Ensures each host has its own isolated circuit breaker to prevent + * failures on one host from affecting telemetry to other hosts. + */ export class CircuitBreakerRegistry { private breakers: Map; @@ -186,6 +209,13 @@ export class CircuitBreakerRegistry { this.breakers = new Map(); } + /** + * Gets or creates a circuit breaker for the specified host. + * + * @param host The host identifier (e.g., "workspace.cloud.databricks.com") + * @param config Optional configuration overrides + * @returns Circuit breaker for the host + */ getCircuitBreaker(host: string, config?: Partial): CircuitBreaker { let breaker = this.breakers.get(host); if (!breaker) { @@ -200,16 +230,30 @@ export class CircuitBreakerRegistry { return breaker; } + /** + * Gets all registered circuit breakers. + * Useful for testing and diagnostics. + */ getAllBreakers(): Map { return new Map(this.breakers); } + /** + * Removes a circuit breaker for the specified host. + * Useful for cleanup when a host is no longer in use. + * + * @param host The host identifier + */ removeCircuitBreaker(host: string): void { this.breakers.delete(host); const logger = this.context.getLogger(); logger.log(LogLevel.debug, `Removed circuit breaker for host: ${host}`); } + /** + * Clears all circuit breakers. + * Useful for testing. + */ clear(): void { this.breakers.clear(); } diff --git a/lib/telemetry/DatabricksTelemetryExporter.ts b/lib/telemetry/DatabricksTelemetryExporter.ts index 37cf1c70..c85f1ba6 100644 --- a/lib/telemetry/DatabricksTelemetryExporter.ts +++ b/lib/telemetry/DatabricksTelemetryExporter.ts @@ -15,24 +15,17 @@ */ import { v4 as uuidv4 } from 'uuid'; -import fetch, { RequestInit, Response } from 'node-fetch'; +import fetch, { Response, RequestInit, Request } from 'node-fetch'; import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; -import IAuthentication from '../connection/contracts/IAuthentication'; -import AuthenticationError from '../errors/AuthenticationError'; -import HiveDriverError from '../errors/HiveDriverError'; import { TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types'; -import { CircuitBreaker, CircuitBreakerOpenError, CircuitBreakerRegistry } from './CircuitBreaker'; -import ExceptionClassifier from './ExceptionClassifier'; -import { - buildTelemetryUrl, - hasAuthorization, - normalizeHeaders, - redactSensitive, - sanitizeProcessName, -} from './telemetryUtils'; -import buildUserAgentString from '../utils/buildUserAgentString'; +import { CircuitBreaker, CircuitBreakerRegistry } from './CircuitBreaker'; +import buildTelemetryUrl from './telemetryUtils'; +import driverVersion from '../version'; +/** + * Databricks telemetry log format for export. + */ interface DatabricksTelemetryLog { workspace_id?: string; frontend_log_event_id: string; @@ -60,6 +53,7 @@ interface DatabricksTelemetryLog { char_set_encoding?: string; process_name?: string; }; + driver_connection_params?: any; operation_latency_ms?: number; sql_operation?: { execution_result?: string; @@ -80,60 +74,50 @@ interface DatabricksTelemetryLog { } /** - * Thrown for non-credential terminal telemetry failures (e.g. refusal to - * export to an invalid host). Separate from `AuthenticationError` so the - * classifier can keep the "short-circuit, don't retry, count as breaker - * failure" contract without muddying the auth taxonomy used by the rest of - * the driver. + * Payload format for Databricks telemetry export. + * Matches JDBC TelemetryRequest format with protoLogs. */ -export class TelemetryTerminalError extends HiveDriverError { - readonly terminal = true as const; +interface DatabricksTelemetryPayload { + uploadTime: number; + items: string[]; // Always empty - required field + protoLogs: string[]; // JSON-stringified DatabricksTelemetryLog objects } /** - * Exports telemetry metrics to the Databricks telemetry service. + * Exports telemetry metrics to Databricks telemetry service. * - * CRITICAL: export() never throws — all errors are swallowed and logged at - * LogLevel.debug (the one exception is a single warn on the first observed - * auth-missing, re-armed on recovery). + * Endpoints: + * - Authenticated: /telemetry-ext + * - Unauthenticated: /telemetry-unauth + * + * Features: + * - Circuit breaker integration for endpoint protection + * - Retry logic with exponential backoff for retryable errors + * - Terminal error detection (no retry on 400, 401, 403, 404) + * - CRITICAL: export() method NEVER throws - all exceptions swallowed + * - CRITICAL: All logging at LogLevel.debug ONLY */ export default class DatabricksTelemetryExporter { private readonly circuitBreaker: CircuitBreaker; - private readonly authenticatedUserAgent: string; - - /** User-Agent used for the unauthenticated endpoint; strips any - * caller-supplied `userAgentEntry` that could identify the customer. */ - private readonly unauthenticatedUserAgent: string; - - private authMissingWarned = false; + private readonly userAgent: string; constructor( private context: IClientContext, private host: string, private circuitBreakerRegistry: CircuitBreakerRegistry, - private authProvider?: IAuthentication, ) { this.circuitBreaker = circuitBreakerRegistry.getCircuitBreaker(host); - const config = this.context.getConfig(); - this.authenticatedUserAgent = buildUserAgentString(config.userAgentEntry); - this.unauthenticatedUserAgent = buildUserAgentString(undefined); + + // Get driver version for user agent + this.userAgent = `databricks-sql-nodejs/${this.getDriverVersion()}`; } /** - * Release the per-host circuit breaker. Intended for the owning client's - * close() path. + * Export metrics to Databricks service. Never throws. * - * NOTE: `CircuitBreakerRegistry` currently shares one breaker per host - * across consumers; calling this while another consumer is active will - * reset their failure-count memory. The owning-client is expected to be - * the last consumer on its host; multi-consumer refcounting on the - * registry will land in the consumer-wiring PR. + * @param metrics - Array of telemetry metrics to export */ - dispose(): void { - this.circuitBreakerRegistry.removeCircuitBreaker(this.host); - } - async export(metrics: TelemetryMetric[]): Promise { if (!metrics || metrics.length === 0) { return; @@ -142,123 +126,41 @@ export default class DatabricksTelemetryExporter { const logger = this.context.getLogger(); try { - await this.circuitBreaker.execute(() => this.exportWithRetry(metrics)); + await this.circuitBreaker.execute(async () => { + await this.exportInternal(metrics); + }); } catch (error: any) { - if (error instanceof CircuitBreakerOpenError) { + // CRITICAL: All exceptions swallowed and logged at debug level ONLY + if (error.message === 'Circuit breaker OPEN') { logger.log(LogLevel.debug, 'Circuit breaker OPEN - dropping telemetry'); - } else if (error instanceof AuthenticationError) { - logger.log(LogLevel.debug, `Telemetry export auth failure: ${error.message}`); - } else if (error instanceof TelemetryTerminalError) { - logger.log(LogLevel.debug, `Telemetry export refused: ${error.message}`); } else { - logger.log(LogLevel.debug, `Telemetry export error: ${error?.message ?? error}`); + logger.log(LogLevel.debug, `Telemetry export error: ${error.message}`); } } } /** - * Retry wrapper shaped after HttpRetryPolicy: retries only on errors - * classified as retryable by ExceptionClassifier, stops on terminal ones, - * surfaces the last error to the circuit breaker. - * - * `maxRetries` is the number of retries *after* the first attempt (i.e. - * attempts = maxRetries + 1), matching HttpRetryPolicy's semantics. + * Internal export implementation that makes the HTTP call. */ - private async exportWithRetry(metrics: TelemetryMetric[]): Promise { - const config = this.context.getConfig(); - const logger = this.context.getLogger(); - - const rawMaxRetries = config.telemetryMaxRetries ?? DEFAULT_TELEMETRY_CONFIG.maxRetries; - const maxRetries = - Number.isFinite(rawMaxRetries) && rawMaxRetries >= 0 ? rawMaxRetries : DEFAULT_TELEMETRY_CONFIG.maxRetries; - const baseMs = config.telemetryBackoffBaseMs ?? DEFAULT_TELEMETRY_CONFIG.backoffBaseMs; - const maxMs = config.telemetryBackoffMaxMs ?? DEFAULT_TELEMETRY_CONFIG.backoffMaxMs; - const jitterMs = config.telemetryBackoffJitterMs ?? DEFAULT_TELEMETRY_CONFIG.backoffJitterMs; - - const totalAttempts = maxRetries + 1; - - let lastError: Error | null = null; - - /* eslint-disable no-await-in-loop */ - for (let attempt = 0; attempt < totalAttempts; attempt += 1) { - try { - await this.exportInternal(metrics); - return; - } catch (error: any) { - lastError = error; - - if ( - error instanceof AuthenticationError || - error instanceof TelemetryTerminalError || - ExceptionClassifier.isTerminal(error) - ) { - throw error; - } - if (!ExceptionClassifier.isRetryable(error)) { - throw error; - } - if (attempt >= totalAttempts - 1) { - throw error; - } - - const base = Math.min(baseMs * 2 ** attempt, maxMs); - const jitter = Math.random() * jitterMs; - const delay = Math.min(base + jitter, maxMs); - - // Include the failing error so ops can see what's being retried, - // not just the cadence. - logger.log( - LogLevel.debug, - `Retrying telemetry export (attempt ${attempt + 1}/${totalAttempts}) after ${Math.round(delay)}ms: ${ - error?.statusCode ?? '' - } ${redactSensitive(error?.message ?? '')}`, - ); - - await this.sleep(delay); - } - } - /* eslint-enable no-await-in-loop */ - - if (lastError) { - throw lastError; - } - } - private async exportInternal(metrics: TelemetryMetric[]): Promise { const config = this.context.getConfig(); const logger = this.context.getLogger(); + // Determine endpoint based on authentication mode const authenticatedExport = config.telemetryAuthenticatedExport ?? DEFAULT_TELEMETRY_CONFIG.authenticatedExport; - const endpoint = buildTelemetryUrl(this.host, authenticatedExport ? '/telemetry-ext' : '/telemetry-unauth'); - if (!endpoint) { - // Malformed / deny-listed host — drop the batch rather than letting - // it target an attacker-controlled destination. - throw new TelemetryTerminalError('Refusing telemetry export: host failed validation'); - } + const endpoint = authenticatedExport + ? buildTelemetryUrl(this.host, '/telemetry-ext') + : buildTelemetryUrl(this.host, '/telemetry-unauth'); - const userAgent = authenticatedExport ? this.authenticatedUserAgent : this.unauthenticatedUserAgent; - let headers: Record = { - 'Content-Type': 'application/json', - 'User-Agent': userAgent, - }; - - if (authenticatedExport) { - headers = { ...headers, ...(await this.getAuthHeaders()) }; - if (!hasAuthorization(headers)) { - if (!this.authMissingWarned) { - this.authMissingWarned = true; - logger.log(LogLevel.warn, 'Telemetry: Authorization header missing — metrics will be dropped'); - } - throw new AuthenticationError('Telemetry export: missing Authorization header'); - } - } + // Format payload - each log is JSON-stringified to match JDBC format + const telemetryLogs = metrics.map((m) => this.toTelemetryLog(m)); + const protoLogs = telemetryLogs.map((log) => JSON.stringify(log)); - const protoLogs = metrics.map((m) => this.toTelemetryLog(m, authenticatedExport, userAgent)); - const body = JSON.stringify({ + const payload: DatabricksTelemetryPayload = { uploadTime: Date.now(), - items: [], - protoLogs: protoLogs.map((log) => JSON.stringify(log)), - }); + items: [], // Required but unused + protoLogs, + }; logger.log( LogLevel.debug, @@ -267,76 +169,82 @@ export default class DatabricksTelemetryExporter { } endpoint`, ); - const response = await this.sendRequest(endpoint, { + // Get authentication headers if using authenticated endpoint + const authHeaders = authenticatedExport ? await this.context.getAuthHeaders() : {}; + + // Skip export if authenticated mode is requested but no auth headers available. + // Note: all auth providers in this codebase return plain objects (Record). + const headersObj = authHeaders as Record; + if (authenticatedExport && (!headersObj || !headersObj.Authorization)) { + logger.log(LogLevel.debug, 'Skipping telemetry export: authenticated mode but no Authorization header'); + return; + } + + // Make HTTP POST request with authentication and proxy support + const response: Response = await this.sendRequest(endpoint, { method: 'POST', - headers, - body, - timeout: 10000, + headers: { + ...authHeaders, + 'Content-Type': 'application/json', + 'User-Agent': this.userAgent, + }, + body: JSON.stringify(payload), + timeout: 10000, // 10 second timeout to prevent indefinite hangs }); if (!response.ok) { + // Consume response body to release socket back to connection pool await response.text().catch(() => {}); const error: any = new Error(`Telemetry export failed: ${response.status} ${response.statusText}`); error.statusCode = response.status; throw error; } + // Consume response body to release socket back to connection pool await response.text().catch(() => {}); - // Successful round-trip re-arms the "auth missing" warn so operators see - // a fresh signal the next time auth breaks. - this.authMissingWarned = false; logger.log(LogLevel.debug, `Successfully exported ${metrics.length} telemetry metrics`); } - private async getAuthHeaders(): Promise> { - if (!this.authProvider) { - return {}; - } - const logger = this.context.getLogger(); - try { - return normalizeHeaders(await this.authProvider.authenticate()); - } catch (error: any) { - logger.log(LogLevel.debug, `Telemetry: auth provider threw: ${error?.message ?? error}`); - return {}; - } - } - + /** + * Makes an HTTP request through the shared connection stack (agent + retry policy), + * matching the CloudFetchResultHandler pattern. + */ private async sendRequest(url: string, init: RequestInit): Promise { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - return fetch(url, { ...init, agent }); + const retryPolicy = await connectionProvider.getRetryPolicy(); + const requestConfig: RequestInit = { agent, ...init }; + const result = await retryPolicy.invokeWithRetry(() => { + const request = new Request(url, requestConfig); + return fetch(request).then((response) => ({ request, response })); + }); + return result.response; } - private toTelemetryLog( - metric: TelemetryMetric, - authenticatedExport: boolean, - userAgent: string, - ): DatabricksTelemetryLog { - // Unauthenticated export must not ship correlation IDs, fingerprint - // data, or raw error detail — an on-path observer could otherwise link - // sessions → workspaces → user activity without any auth. - const includeCorrelation = authenticatedExport; - + /** + * Convert TelemetryMetric to Databricks telemetry log format. + */ + private toTelemetryLog(metric: TelemetryMetric): DatabricksTelemetryLog { const log: DatabricksTelemetryLog = { - workspace_id: includeCorrelation ? metric.workspaceId : undefined, - frontend_log_event_id: uuidv4(), + workspace_id: metric.workspaceId, + frontend_log_event_id: this.generateUUID(), context: { client_context: { timestamp_millis: metric.timestamp, - user_agent: userAgent, + user_agent: this.userAgent, }, }, entry: { sql_driver_log: { - session_id: includeCorrelation ? metric.sessionId : undefined, - sql_statement_id: includeCorrelation ? metric.statementId : undefined, + session_id: metric.sessionId, + sql_statement_id: metric.statementId, }, }, }; - if (metric.metricType === 'connection' && metric.driverConfig && includeCorrelation) { - // system_configuration is a high-entropy client fingerprint (OS, arch, - // locale, process, runtime). Only ship on the authenticated path. + // Add metric-specific fields based on proto definition + if (metric.metricType === 'connection' && metric.driverConfig) { + // Map driverConfig to system_configuration (snake_case as per proto) log.entry.sql_driver_log.system_configuration = { driver_version: metric.driverConfig.driverVersion, driver_name: metric.driverConfig.driverName, @@ -348,7 +256,7 @@ export default class DatabricksTelemetryExporter { os_arch: metric.driverConfig.osArch, locale_name: metric.driverConfig.localeName, char_set_encoding: metric.driverConfig.charSetEncoding, - process_name: sanitizeProcessName(metric.driverConfig.processName) || undefined, + process_name: metric.driverConfig.processName, }; } else if (metric.metricType === 'statement') { log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs; @@ -366,21 +274,26 @@ export default class DatabricksTelemetryExporter { } } } else if (metric.metricType === 'error') { - const stackOrMessage = metric.errorStack ?? metric.errorMessage ?? ''; log.entry.sql_driver_log.error_info = { error_name: metric.errorName || 'UnknownError', - // Redact common secret shapes and cap length. On the unauth path we - // keep only the error class — no message body. - stack_trace: includeCorrelation ? redactSensitive(stackOrMessage) : '', + stack_trace: metric.errorMessage || '', }; } return log; } - private sleep(ms: number): Promise { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); + /** + * Generate a UUID v4. + */ + private generateUUID(): string { + return uuidv4(); + } + + /** + * Get driver version from the version module. + */ + private getDriverVersion(): string { + return driverVersion; } } diff --git a/lib/telemetry/ExceptionClassifier.ts b/lib/telemetry/ExceptionClassifier.ts index a9185eb4..d0e30d64 100644 --- a/lib/telemetry/ExceptionClassifier.ts +++ b/lib/telemetry/ExceptionClassifier.ts @@ -81,17 +81,13 @@ export default class ExceptionClassifier { return true; } - // Transient network errors. Notably does NOT include `ENOTFOUND`: - // DNS "not found" usually indicates a misconfigured host, not a transient - // fault. Retrying pushes load at the resolver without any expectation of - // success; circuit-breaker failure accounting is the better response. + // Check for transient network errors (connection refused, DNS failure, host unreachable) const errorCode = (error as any).code; if ( errorCode === 'ECONNREFUSED' || - errorCode === 'ECONNRESET' || + errorCode === 'ENOTFOUND' || errorCode === 'EHOSTUNREACH' || - errorCode === 'ETIMEDOUT' || - errorCode === 'EAI_AGAIN' + errorCode === 'ECONNRESET' ) { return true; } diff --git a/lib/telemetry/FeatureFlagCache.ts b/lib/telemetry/FeatureFlagCache.ts index 58c758ff..744d1ff9 100644 --- a/lib/telemetry/FeatureFlagCache.ts +++ b/lib/telemetry/FeatureFlagCache.ts @@ -14,51 +14,49 @@ * limitations under the License. */ -import fetch, { RequestInit, Response } from 'node-fetch'; +import fetch, { Request, RequestInit } from 'node-fetch'; import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; -import IAuthentication from '../connection/contracts/IAuthentication'; -import { buildTelemetryUrl, normalizeHeaders } from './telemetryUtils'; -import ExceptionClassifier from './ExceptionClassifier'; -import buildUserAgentString from '../utils/buildUserAgentString'; +import buildTelemetryUrl from './telemetryUtils'; import driverVersion from '../version'; +/** + * Context holding feature flag state for a specific host. + */ export interface FeatureFlagContext { telemetryEnabled?: boolean; lastFetched?: Date; refCount: number; - cacheDuration: number; + cacheDuration: number; // 15 minutes in ms } /** - * Per-host feature-flag cache used to gate telemetry emission. Responsibilities: - * - dedupe in-flight fetches (thundering-herd protection); - * - ref-count so context goes away when the last consumer closes; - * - clamp server-provided TTL into a safe band. - * - * Shares HTTP plumbing (agent, user agent) with DatabricksTelemetryExporter. - * Consumer wiring lands in a later PR in this stack (see PR description). + * Manages feature flag cache per host. + * Prevents rate limiting by caching feature flag responses. + * Instance-based, stored in DBSQLClient. */ export default class FeatureFlagCache { private contexts: Map; + /** In-flight fetch promises for deduplication (prevents thundering herd) */ private fetchPromises: Map> = new Map(); - private readonly userAgent: string; + private readonly CACHE_DURATION_MS = 15 * 60 * 1000; // 15 minutes - private readonly CACHE_DURATION_MS = 15 * 60 * 1000; + private readonly MIN_CACHE_DURATION_S = 60; // 1 minute minimum TTL - private readonly MIN_CACHE_DURATION_S = 60; - - private readonly MAX_CACHE_DURATION_S = 3600; + private readonly MAX_CACHE_DURATION_S = 3600; // 1 hour maximum TTL private readonly FEATURE_FLAG_NAME = 'databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForNodeJs'; - constructor(private context: IClientContext, private authProvider?: IAuthentication) { + constructor(private context: IClientContext) { this.contexts = new Map(); - this.userAgent = buildUserAgentString(this.context.getConfig().userAgentEntry); } + /** + * Gets or creates a feature flag context for the host. + * Increments reference count. + */ getOrCreateContext(host: string): FeatureFlagContext { let ctx = this.contexts.get(host); if (!ctx) { @@ -72,17 +70,25 @@ export default class FeatureFlagCache { return ctx; } + /** + * Decrements reference count for the host. + * Removes context when ref count reaches zero. + */ releaseContext(host: string): void { const ctx = this.contexts.get(host); if (ctx) { ctx.refCount -= 1; if (ctx.refCount <= 0) { this.contexts.delete(host); - this.fetchPromises.delete(host); + this.fetchPromises.delete(host); // Invalidate stale in-flight fetch } } } + /** + * Checks if telemetry is enabled for the host. + * Uses cached value if available and not expired. + */ async isTelemetryEnabled(host: string): Promise { const logger = this.context.getLogger(); const ctx = this.contexts.get(host); @@ -94,6 +100,7 @@ export default class FeatureFlagCache { const isExpired = !ctx.lastFetched || Date.now() - ctx.lastFetched.getTime() > ctx.cacheDuration; if (isExpired) { + // Deduplicate concurrent fetches for the same host (prevents thundering herd) if (!this.fetchPromises.has(host)) { const fetchPromise = this.fetchFeatureFlag(host) .then((enabled) => { @@ -111,6 +118,7 @@ export default class FeatureFlagCache { this.fetchPromises.set(host, fetchPromise); } + // Promise is guaranteed to resolve (never rejects) due to .catch() in the chain above await this.fetchPromises.get(host); } @@ -118,106 +126,96 @@ export default class FeatureFlagCache { } /** - * Strips the `-oss` suffix the feature-flag API does not accept. The server - * keys off the SemVer triplet only, so anything appended would 404. + * Fetches feature flag from server using connector-service API. + * Calls GET /api/2.0/connector-service/feature-flags/NODEJS/{version} + * + * @param host The host to fetch feature flag for + * @returns true if feature flag is enabled, false otherwise */ - private getDriverVersion(): string { - return driverVersion.replace(/-oss$/, ''); - } - private async fetchFeatureFlag(host: string): Promise { const logger = this.context.getLogger(); try { - const endpoint = buildTelemetryUrl( - host, - `/api/2.0/connector-service/feature-flags/NODEJS/${this.getDriverVersion()}`, - ); - if (!endpoint) { - logger.log(LogLevel.debug, `Feature flag fetch skipped: invalid host ${host}`); - return false; - } + // Get driver version for endpoint + const version = this.getDriverVersion(); - const headers: Record = { - 'Content-Type': 'application/json', - 'User-Agent': this.userAgent, - ...(await this.getAuthHeaders()), - }; + // Build feature flags endpoint for Node.js driver + const endpoint = buildTelemetryUrl(host, `/api/2.0/connector-service/feature-flags/NODEJS/${version}`); + + // Get authentication headers + const authHeaders = await this.context.getAuthHeaders(); logger.log(LogLevel.debug, `Fetching feature flags from ${endpoint}`); - const response = await this.fetchWithRetry(endpoint, { + const connectionProvider = await this.context.getConnectionProvider(); + const agent = await connectionProvider.getAgent(); + const retryPolicy = await connectionProvider.getRetryPolicy(); + + const requestConfig: RequestInit = { method: 'GET', - headers, + headers: { + ...authHeaders, + 'Content-Type': 'application/json', + 'User-Agent': `databricks-sql-nodejs/${driverVersion}`, + }, + agent, timeout: 10000, + }; + + const result = await retryPolicy.invokeWithRetry(() => { + const request = new Request(endpoint, requestConfig); + return fetch(request).then((response) => ({ request, response })); }); + const response = result.response; if (!response.ok) { + // Consume response body to release socket back to connection pool await response.text().catch(() => {}); logger.log(LogLevel.debug, `Feature flag fetch failed: ${response.status} ${response.statusText}`); return false; } + // Parse response JSON const data: any = await response.json(); + // Response format: { flags: [{ name: string, value: string }], ttl_seconds?: number } if (data && data.flags && Array.isArray(data.flags)) { + // Update cache duration if TTL provided, clamped to safe bounds const ctx = this.contexts.get(host); if (ctx && typeof data.ttl_seconds === 'number' && data.ttl_seconds > 0) { const clampedTtl = Math.max(this.MIN_CACHE_DURATION_S, Math.min(this.MAX_CACHE_DURATION_S, data.ttl_seconds)); - ctx.cacheDuration = clampedTtl * 1000; + ctx.cacheDuration = clampedTtl * 1000; // Convert to milliseconds logger.log(LogLevel.debug, `Updated cache duration to ${clampedTtl} seconds`); } + // Look for our specific feature flag const flag = data.flags.find((f: any) => f.name === this.FEATURE_FLAG_NAME); + if (flag) { - const enabled = String(flag.value).toLowerCase() === 'true'; + // Parse boolean value (can be string "true"/"false") + const value = String(flag.value).toLowerCase(); + const enabled = value === 'true'; logger.log(LogLevel.debug, `Feature flag ${this.FEATURE_FLAG_NAME}: ${enabled}`); return enabled; } } + // Feature flag not found in response, default to false logger.log(LogLevel.debug, `Feature flag ${this.FEATURE_FLAG_NAME} not found in response`); return false; } catch (error: any) { + // Log at debug level only, never propagate exceptions logger.log(LogLevel.debug, `Error fetching feature flag from ${host}: ${error.message}`); return false; } } /** - * Retries transient network errors once before giving up. Without a retry - * a single hiccup would leave telemetry disabled for the full cache TTL - * (15 min). One retry gives an ephemeral DNS / connection-reset failure - * a second chance without pushing sustained load at a broken endpoint. + * Gets the driver version without -oss suffix for API calls. + * Format: "1.12.0" from "1.12.0-oss" */ - private async fetchWithRetry(url: string, init: RequestInit): Promise { - const connectionProvider = await this.context.getConnectionProvider(); - const agent = await connectionProvider.getAgent(); - const logger = this.context.getLogger(); - - try { - return await fetch(url, { ...init, agent }); - } catch (err: any) { - if (!ExceptionClassifier.isRetryable(err)) { - throw err; - } - logger.log(LogLevel.debug, `Feature flag fetch retry after transient: ${err?.code ?? err?.message ?? err}`); - await new Promise((resolve) => { - setTimeout(resolve, 100 + Math.random() * 100); - }); - return fetch(url, { ...init, agent }); - } - } - - private async getAuthHeaders(): Promise> { - if (!this.authProvider) { - return {}; - } - try { - return normalizeHeaders(await this.authProvider.authenticate()); - } catch (error: any) { - this.context.getLogger().log(LogLevel.debug, `Feature flag auth failed: ${error?.message ?? error}`); - return {}; - } + private getDriverVersion(): string { + // Remove -oss suffix if present + return driverVersion.replace(/-oss$/, ''); } } diff --git a/lib/telemetry/MetricsAggregator.ts b/lib/telemetry/MetricsAggregator.ts index f5b0d171..778b8c0a 100644 --- a/lib/telemetry/MetricsAggregator.ts +++ b/lib/telemetry/MetricsAggregator.ts @@ -18,7 +18,11 @@ import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import { TelemetryEvent, TelemetryEventType, TelemetryMetric, DEFAULT_TELEMETRY_CONFIG } from './types'; import DatabricksTelemetryExporter from './DatabricksTelemetryExporter'; +import ExceptionClassifier from './ExceptionClassifier'; +/** + * Per-statement telemetry details for aggregation + */ interface StatementTelemetryDetails { statementId: string; sessionId: string; @@ -37,10 +41,17 @@ interface StatementTelemetryDetails { /** * Aggregates telemetry events by statement_id and manages batching/flushing. * - * Overflow policy — when the pending buffer hits `maxPendingMetrics`, error - * metrics are preserved preferentially over connection/statement metrics. - * The first-failure error is usually the most valuable signal in post-mortem - * debugging; dropping it FIFO would defeat the purpose of capture. + * Features: + * - Aggregates events by statement_id + * - Connection events emitted immediately (no aggregation) + * - Statement events buffered until completeStatement() called + * - Terminal exceptions flushed immediately + * - Retryable exceptions buffered until statement complete + * - Batch size and periodic timer trigger flushes + * - CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY + * - CRITICAL: NO console logging + * + * Follows JDBC TelemetryCollector.java:29-30 pattern. */ export default class MetricsAggregator { private statementMetrics: Map = new Map(); @@ -49,7 +60,7 @@ export default class MetricsAggregator { private flushTimer: NodeJS.Timeout | null = null; - private closed = false; + private closing: boolean = false; private batchSize: number; @@ -57,56 +68,61 @@ export default class MetricsAggregator { private maxPendingMetrics: number; - private maxErrorsPerStatement: number; - - private statementTtlMs: number; - constructor(private context: IClientContext, private exporter: DatabricksTelemetryExporter) { try { const config = context.getConfig(); this.batchSize = config.telemetryBatchSize ?? DEFAULT_TELEMETRY_CONFIG.batchSize; this.flushIntervalMs = config.telemetryFlushIntervalMs ?? DEFAULT_TELEMETRY_CONFIG.flushIntervalMs; this.maxPendingMetrics = config.telemetryMaxPendingMetrics ?? DEFAULT_TELEMETRY_CONFIG.maxPendingMetrics; - this.maxErrorsPerStatement = - config.telemetryMaxErrorsPerStatement ?? DEFAULT_TELEMETRY_CONFIG.maxErrorsPerStatement; - this.statementTtlMs = config.telemetryStatementTtlMs ?? DEFAULT_TELEMETRY_CONFIG.statementTtlMs; + // Start periodic flush timer this.startFlushTimer(); } catch (error: any) { + // CRITICAL: All exceptions swallowed and logged at debug level ONLY const logger = this.context.getLogger(); logger.log(LogLevel.debug, `MetricsAggregator constructor error: ${error.message}`); + // Initialize with default values this.batchSize = DEFAULT_TELEMETRY_CONFIG.batchSize; this.flushIntervalMs = DEFAULT_TELEMETRY_CONFIG.flushIntervalMs; this.maxPendingMetrics = DEFAULT_TELEMETRY_CONFIG.maxPendingMetrics; - this.maxErrorsPerStatement = DEFAULT_TELEMETRY_CONFIG.maxErrorsPerStatement; - this.statementTtlMs = DEFAULT_TELEMETRY_CONFIG.statementTtlMs; } } + /** + * Process a telemetry event. Never throws. + * + * @param event - The telemetry event to process + */ processEvent(event: TelemetryEvent): void { - if (this.closed) return; const logger = this.context.getLogger(); try { + // Connection events are emitted immediately (no aggregation) if (event.eventType === TelemetryEventType.CONNECTION_OPEN) { this.processConnectionEvent(event); return; } + // Error events - check if terminal or retryable if (event.eventType === TelemetryEventType.ERROR) { this.processErrorEvent(event); return; } + // Statement events - buffer until complete if (event.statementId) { this.processStatementEvent(event); } } catch (error: any) { + // CRITICAL: All exceptions swallowed and logged at debug level ONLY logger.log(LogLevel.debug, `MetricsAggregator.processEvent error: ${error.message}`); } } + /** + * Process connection event (emit immediately) + */ private processConnectionEvent(event: TelemetryEvent): void { const metric: TelemetryMetric = { metricType: 'connection', @@ -119,22 +135,30 @@ export default class MetricsAggregator { this.addPendingMetric(metric); } + /** + * Process error event (terminal errors flushed immediately, retryable buffered) + */ private processErrorEvent(event: TelemetryEvent): void { const logger = this.context.getLogger(); - // `isTerminal` is carried on the event by the emitter (it knows the - // call site's taxonomy). If callers ever drop it we default to - // retryable — buffering by statement is the safer choice. - const isTerminal = event.isTerminal === true; + // Create error object for classification + const error: any = new Error(event.errorMessage || 'Unknown error'); + error.name = event.errorName || 'UnknownError'; + + // Check if terminal using isTerminal field or ExceptionClassifier + const isTerminal = event.isTerminal ?? ExceptionClassifier.isTerminal(error); if (isTerminal) { - logger.log(LogLevel.debug, 'Terminal error detected - flushing immediately'); + // Terminal error - flush immediately + logger.log(LogLevel.debug, `Terminal error detected - flushing immediately`); + // If associated with a statement, complete and flush it if (event.statementId && this.statementMetrics.has(event.statementId)) { const details = this.statementMetrics.get(event.statementId)!; - this.pushBoundedError(details, event); + details.errors.push(event); this.completeStatement(event.statementId); } else { + // Standalone error - emit immediately const metric: TelemetryMetric = { metricType: 'error', timestamp: event.timestamp, @@ -143,31 +167,22 @@ export default class MetricsAggregator { workspaceId: event.workspaceId, errorName: event.errorName, errorMessage: event.errorMessage, - errorStack: event.errorStack, }; this.addPendingMetric(metric); } - // Fire-and-forget on the terminal-error path so customer code doesn't - // stall on telemetry HTTP. Do NOT reset the periodic flush timer: - // under burst failures that would keep the tail-drain timer from - // ever firing. - Promise.resolve(this.flush(false)).catch((err: any) => { - logger.log(LogLevel.debug, `Terminal-error flush failed: ${err?.message ?? err}`); - }); + // Flush immediately for terminal errors + void this.flush(); } else if (event.statementId) { + // Retryable error - buffer until statement complete const details = this.getOrCreateStatementDetails(event); - this.pushBoundedError(details, event); - } - } - - private pushBoundedError(details: StatementTelemetryDetails, event: TelemetryEvent): void { - if (details.errors.length >= this.maxErrorsPerStatement) { - details.errors.shift(); + details.errors.push(event); } - details.errors.push(event); } + /** + * Process statement event (buffer until complete) + */ private processStatementEvent(event: TelemetryEvent): void { const details = this.getOrCreateStatementDetails(event); @@ -194,10 +209,14 @@ export default class MetricsAggregator { break; default: + // Unknown event type - ignore break; } } + /** + * Get or create statement details for the given event + */ private getOrCreateStatementDetails(event: TelemetryEvent): StatementTelemetryDetails { const statementId = event.statementId!; @@ -218,41 +237,11 @@ export default class MetricsAggregator { } /** - * Drop entries older than `statementTtlMs`, emitting their buffered error - * events as standalone metrics first so the first-failure signal survives - * the eviction. Called from the periodic flush timer so idle clients - * don't leak orphan entries. + * Complete a statement and prepare it for flushing. Never throws. + * + * @param statementId - The statement ID to complete */ - private evictExpiredStatements(): void { - const cutoff = Date.now() - this.statementTtlMs; - let evicted = 0; - for (const [id, details] of this.statementMetrics) { - if (details.startTime < cutoff) { - for (const errorEvent of details.errors) { - this.addPendingMetric({ - metricType: 'error', - timestamp: errorEvent.timestamp, - sessionId: details.sessionId, - statementId: details.statementId, - workspaceId: details.workspaceId, - errorName: errorEvent.errorName, - errorMessage: errorEvent.errorMessage, - errorStack: errorEvent.errorStack, - }); - } - this.statementMetrics.delete(id); - evicted += 1; - } - } - if (evicted > 0) { - this.context - .getLogger() - .log(LogLevel.debug, `Evicted ${evicted} abandoned statement(s) past ${this.statementTtlMs}ms TTL`); - } - } - completeStatement(statementId: string): void { - if (this.closed) return; const logger = this.context.getLogger(); try { @@ -261,6 +250,7 @@ export default class MetricsAggregator { return; } + // Create statement metric const metric: TelemetryMetric = { metricType: 'statement', timestamp: details.startTime, @@ -276,6 +266,7 @@ export default class MetricsAggregator { this.addPendingMetric(metric); + // Add buffered error metrics for (const errorEvent of details.errors) { const errorMetric: TelemetryMetric = { metricType: 'error', @@ -285,99 +276,82 @@ export default class MetricsAggregator { workspaceId: details.workspaceId, errorName: errorEvent.errorName, errorMessage: errorEvent.errorMessage, - errorStack: errorEvent.errorStack, }; this.addPendingMetric(errorMetric); } + // Remove from map this.statementMetrics.delete(statementId); } catch (error: any) { + // CRITICAL: All exceptions swallowed and logged at debug level ONLY logger.log(LogLevel.debug, `MetricsAggregator.completeStatement error: ${error.message}`); } } /** - * Append `metric` to the pending buffer, enforcing `maxPendingMetrics`. - * - * Overflow drops the oldest non-error entry (single `splice` — no new - * allocation). Under an all-error buffer it falls back to dropping the - * oldest entry at index 0. + * Add a metric to pending batch and flush if batch size reached. + * Drops oldest metrics if the buffer exceeds maxPendingMetrics to prevent + * unbounded growth when the circuit breaker keeps failing exports. */ private addPendingMetric(metric: TelemetryMetric): void { - if (this.closed) return; this.pendingMetrics.push(metric); + // Cap the buffer to avoid unbounded memory growth when exports keep failing if (this.pendingMetrics.length > this.maxPendingMetrics) { - const dropIndex = this.findDropIndex(); - this.pendingMetrics.splice(dropIndex, 1); + const dropped = this.pendingMetrics.length - this.maxPendingMetrics; + this.pendingMetrics = this.pendingMetrics.slice(dropped); const logger = this.context.getLogger(); logger.log( LogLevel.debug, - `Dropped 1 oldest non-error telemetry metric (buffer full at ${this.maxPendingMetrics})`, + `Dropped ${dropped} oldest telemetry metrics (buffer full at ${this.maxPendingMetrics})`, ); } - if (this.pendingMetrics.length >= this.batchSize) { - // resetTimer=false so the periodic tail-drain keeps its cadence even - // under sustained batch-size bursts. - const logger = this.context.getLogger(); - Promise.resolve(this.flush(false)).catch((err: any) => { - logger.log(LogLevel.debug, `Batch-trigger flush failed: ${err?.message ?? err}`); - }); - } - } - - private findDropIndex(): number { - for (let i = 0; i < this.pendingMetrics.length; i += 1) { - if (this.pendingMetrics[i].metricType !== 'error') { - return i; - } + // Don't batch-flush during close; close() will do a single awaited flush + if (!this.closing && this.pendingMetrics.length >= this.batchSize) { + void this.flush(); } - return 0; } /** - * Drain the pending buffer and return a promise that resolves when the - * exporter finishes with the drained batch. `close()` awaits this so - * `process.exit()` after `client.close()` doesn't truncate the POST. + * Flush all pending metrics to exporter. Never throws. + * Returns the export promise so callers can await it (e.g. close()). + * + * @param resetTimer If true, resets the flush timer after flushing (default: true) */ - async flush(resetTimer: boolean = true): Promise { + flush(resetTimer: boolean = true): Promise { const logger = this.context.getLogger(); - let exportPromise: Promise | null = null; try { if (this.pendingMetrics.length === 0) { - if (resetTimer && !this.closed) { - this.startFlushTimer(); - } - return; + return Promise.resolve(); } - const metricsToExport = this.pendingMetrics; + const metricsToExport = [...this.pendingMetrics]; this.pendingMetrics = []; logger.log(LogLevel.debug, `Flushing ${metricsToExport.length} telemetry metrics`); - exportPromise = this.exporter.export(metricsToExport); + const exportPromise = this.exporter.export(metricsToExport).catch((err: any) => { + logger.log(LogLevel.debug, `Unexpected export error: ${err?.message}`); + }); - if (resetTimer && !this.closed) { + if (resetTimer) { this.startFlushTimer(); } + + return exportPromise; } catch (error: any) { + // CRITICAL: All exceptions swallowed and logged at debug level ONLY logger.log(LogLevel.debug, `MetricsAggregator.flush error: ${error.message}`); - } - - if (exportPromise) { - try { - await exportPromise; - } catch (err: any) { - logger.log(LogLevel.debug, `Unexpected export error: ${err?.message ?? err}`); - } + return Promise.resolve(); } } + /** + * Start the periodic flush timer + */ private startFlushTimer(): void { - if (this.closed) return; const logger = this.context.getLogger(); try { @@ -386,72 +360,43 @@ export default class MetricsAggregator { } this.flushTimer = setInterval(() => { - // Idle eviction: run before the flush so orphan-error metrics have - // a chance to batch into this drain rather than wait for the next. - try { - this.evictExpiredStatements(); - } catch (err: any) { - logger.log(LogLevel.debug, `evictExpiredStatements error: ${err?.message ?? err}`); - } - Promise.resolve(this.flush(false)).catch((err: any) => { - logger.log(LogLevel.debug, `Periodic flush failed: ${err?.message ?? err}`); - }); + void this.flush(false); }, this.flushIntervalMs); + // Prevent timer from keeping Node.js process alive this.flushTimer.unref(); } catch (error: any) { + // CRITICAL: All exceptions swallowed and logged at debug level ONLY logger.log(LogLevel.debug, `MetricsAggregator.startFlushTimer error: ${error.message}`); } } + /** + * Close the aggregator and flush remaining metrics. Never throws. + */ async close(): Promise { const logger = this.context.getLogger(); - this.closed = true; try { + this.closing = true; + if (this.flushTimer) { clearInterval(this.flushTimer); this.flushTimer = null; } - // Snapshot keys — completeStatement mutates statementMetrics. + // Complete remaining statements; addPendingMetric won't trigger intermediate + // batch flushes while this.closing is true (avoids fire-and-forget promises + // escaping past the awaited flush below). const remainingStatements = [...this.statementMetrics.keys()]; for (const statementId of remainingStatements) { - this.completeStatementForClose(statementId); + this.completeStatement(statementId); } - await this.flushForClose(); - - // Belt-and-braces: something the above awaited could in principle - // have resurrected a timer. Clear once more. - if (this.flushTimer) { - clearInterval(this.flushTimer); - this.flushTimer = null; - } + await this.flush(false); } catch (error: any) { + // CRITICAL: All exceptions swallowed and logged at debug level ONLY logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`); } } - - /** completeStatement variant that bypasses the `closed` guard. */ - private completeStatementForClose(statementId: string): void { - const prev = this.closed; - this.closed = false; - try { - this.completeStatement(statementId); - } finally { - this.closed = prev; - } - } - - /** flush variant that bypasses the `closed` guard on addPendingMetric. */ - private async flushForClose(): Promise { - const prev = this.closed; - this.closed = false; - try { - await this.flush(false); - } finally { - this.closed = prev; - } - } } diff --git a/lib/telemetry/TelemetryEventEmitter.ts b/lib/telemetry/TelemetryEventEmitter.ts index bbb0b757..41d613c3 100644 --- a/lib/telemetry/TelemetryEventEmitter.ts +++ b/lib/telemetry/TelemetryEventEmitter.ts @@ -166,7 +166,6 @@ export default class TelemetryEventEmitter extends EventEmitter { sessionId?: string; errorName: string; errorMessage: string; - errorStack?: string; isTerminal: boolean; }): void { if (!this.enabled) return; @@ -180,7 +179,6 @@ export default class TelemetryEventEmitter extends EventEmitter { sessionId: data.sessionId, errorName: data.errorName, errorMessage: data.errorMessage, - errorStack: data.errorStack, isTerminal: data.isTerminal, }; this.emit(TelemetryEventType.ERROR, event); diff --git a/lib/telemetry/telemetryUtils.ts b/lib/telemetry/telemetryUtils.ts index 6326aedf..476733df 100644 --- a/lib/telemetry/telemetryUtils.ts +++ b/lib/telemetry/telemetryUtils.ts @@ -15,216 +15,10 @@ */ /** - * Hosts we always refuse to send authenticated telemetry to. Targeted at the - * `/api/2.0/sql/telemetry-ext` exfil vector: an attacker-influenced `host` - * (env var, tampered config, etc.) must not be able to redirect the Bearer - * token to a loopback/IMDS/RFC1918 endpoint. + * Build full URL from host and path, always using HTTPS. + * Strips any existing protocol prefix and enforces HTTPS. */ -const BLOCKED_HOST_PATTERNS: RegExp[] = [ - /^(?:127\.|0\.|10\.|169\.254\.|172\.(?:1[6-9]|2[0-9]|3[01])\.|192\.168\.)/, - /^(?:localhost|metadata\.google\.internal|metadata\.azure\.com)$/i, - /^\[?::1\]?$/, - /^\[?(?:fc|fd)[0-9a-f]{2}:/i, - /^\[?::ffff:(?:127|10|0|169\.254)\./i, -]; - -/** - * Build an HTTPS telemetry URL from a host and a path. - * - * Refuses anything beyond a bare `host[:port]` so a compromised or mistyped - * host cannot redirect the authenticated request to an attacker-controlled - * endpoint. Defeated historical bypasses include: - * - protocol-relative prefix: `//attacker.com` - * - zero-width / ASCII whitespace in the host - * - userinfo (`user:pass@host`) - * - path/query/fragment - * - CRLF (header injection on some fetch backends) - * - loopback / link-local / RFC1918 / cloud-metadata addresses - * - * Returns `null` when the host fails any check; callers drop the batch. - */ -export function buildTelemetryUrl(host: string, path: string): string | null { - if (typeof host !== 'string' || host.length === 0) { - return null; - } - - // Reject ASCII whitespace + common zero-width/BOM codepoints that JS `\s` - // does not cover but `new URL` silently strips. - if (/[\s\u200b-\u200f\u2060\ufeff]/.test(host)) { - return null; - } - +export default function buildTelemetryUrl(host: string, path: string): string { const cleanHost = host.replace(/^https?:\/\//, '').replace(/\/+$/, ''); - if (cleanHost.length === 0) { - return null; - } - - // Reject anything that looks like userinfo / path / protocol-relative - // prefix before URL parsing. `new URL('https://' + '//x')` would otherwise - // normalise the doubled slash and accept `x` as the host. - if (/[/\\@]/.test(cleanHost)) { - return null; - } - - let parsed: URL; - try { - parsed = new URL(`https://${cleanHost}`); - } catch { - return null; - } - - if ( - parsed.pathname !== '/' || - parsed.search !== '' || - parsed.hash !== '' || - parsed.username !== '' || - parsed.password !== '' - ) { - return null; - } - - // Defence in depth: ensure `new URL` did not silently rewrite the host we - // validated (e.g. by stripping a codepoint we missed above). `new URL` - // normalises away the default :443 for https, so compare using the - // port-stripped hostname instead of .host. - const expectedHost = cleanHost.toLowerCase().replace(/:443$/, ''); - const actualHost = parsed.host.toLowerCase().replace(/:443$/, ''); - if (actualHost !== expectedHost) { - return null; - } - - if (BLOCKED_HOST_PATTERNS.some((r) => r.test(parsed.hostname))) { - return null; - } - - return `https://${parsed.host}${path}`; -} - -/** - * Prefixes the Databricks driver uses for internal token formats. Kept in - * sync with `lib/utils/buildUserAgentString.ts`'s `redactInternalToken`. - * Extending one list should extend the other. - */ -const DATABRICKS_TOKEN_PREFIXES = ['dkea', 'dskea', 'dapi', 'dsapi', 'dose']; - -const SECRET_PATTERNS: Array<[RegExp, string]> = [ - // `Authorization: Bearer ` / `Bearer ` anywhere in a stack. - [/Bearer\s+[A-Za-z0-9._\-+/=]+/gi, 'Bearer '], - // `Authorization: Basic `. - [/Basic\s+[A-Za-z0-9+/=]+/gi, 'Basic '], - // URL userinfo: `https://user:pass@host/…`. - [/([a-z][a-z0-9+.-]*:\/\/)[^/\s:@]+:[^/\s@]+@/gi, '$1@'], - // Databricks PATs / service-token prefixes without `Bearer`, e.g. - // `token is dapi0123…` — appears in error stacks that echo the raw value. - [new RegExp(`\\b(?:${DATABRICKS_TOKEN_PREFIXES.join('|')})[A-Za-z0-9]{8,}`, 'g'), ''], - // JWTs (three base64url segments separated by dots). - [/\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b/g, ''], - // JSON-encoded secrets: `"client_secret":"..."`, `"access_token":"..."` etc. - [ - /"(password|token|client_secret|refresh_token|access_token|id_token|secret|api[_-]?key|apikey)"\s*:\s*"[^"]*"/gi, - '"$1":""', - ], - // Form-URL-encoded / key=value secrets. - [ - /\b(token|password|client_secret|refresh_token|access_token|id_token|secret|api[_-]?key|apikey)=[^\s&"']+/gi, - '$1=', - ], -]; - -/** - * Strips common secret shapes from a free-form error string and caps length. - * Applied before anything is shipped off-box. Redaction happens before - * truncation so a long stack cannot bury a secret past the cap; truncation - * then runs a second pass to catch anything that appeared only in the tail. - */ -export function redactSensitive(value: string | undefined, maxLen = 2048): string { - if (!value) { - return ''; - } - let redacted = value; - for (const [pattern, replacement] of SECRET_PATTERNS) { - redacted = redacted.replace(pattern, replacement); - } - if (redacted.length > maxLen) { - redacted = `${redacted.slice(0, maxLen)}…[truncated]`; - for (const [pattern, replacement] of SECRET_PATTERNS) { - redacted = redacted.replace(pattern, replacement); - } - } - return redacted; -} - -/** - * Normalises any `HeadersInit` shape (`Headers`, `[string,string][]`, or - * `Record`) into a plain string dictionary. Non-string - * values are dropped. Shared by the exporter and FeatureFlagCache so there - * is one source of truth for auth-header handling. - */ -export function normalizeHeaders(raw: unknown): Record { - if (!raw || typeof raw !== 'object') { - return {}; - } - // Avoid importing node-fetch here; use structural check. - if (typeof (raw as any).forEach === 'function' && !Array.isArray(raw)) { - const out: Record = {}; - (raw as any).forEach((value: unknown, key: unknown) => { - if (typeof key === 'string' && typeof value === 'string') { - out[key] = value; - } - }); - return out; - } - if (Array.isArray(raw)) { - const out: Record = {}; - for (const entry of raw as Array<[unknown, unknown]>) { - if (Array.isArray(entry) && entry.length === 2 && typeof entry[0] === 'string' && typeof entry[1] === 'string') { - const [key, value] = entry; - out[key] = value; - } - } - return out; - } - const out: Record = {}; - for (const [k, v] of Object.entries(raw as Record)) { - if (typeof v === 'string') { - out[k] = v; - } - } - return out; -} - -/** - * Case-insensitive check for a non-empty `Authorization` header. - */ -export function hasAuthorization(headers: Record): boolean { - for (const key of Object.keys(headers)) { - if (key.toLowerCase() === 'authorization' && headers[key]) { - return true; - } - } - return false; -} - -/** - * Returns a safe `process_name` value: the basename of the first whitespace- - * delimited token, with trailing whitespace trimmed. This defeats both the - * absolute-path PII leak (`/home//app.js`) and the argv-leak shape - * (`node --db-password=X app.js`) that some producers pass in. - */ -export function sanitizeProcessName(name: string | undefined): string { - if (!name) { - return ''; - } - const trimmed = name.trim(); - if (trimmed.length === 0) { - return ''; - } - // Drop argv tail: anything after the first whitespace — argv[0] shouldn't - // contain spaces, but producers sometimes pass `argv.join(' ')`. - const firstToken = trimmed.split(/\s/, 1)[0]; - if (!firstToken) { - return ''; - } - const lastSep = Math.max(firstToken.lastIndexOf('/'), firstToken.lastIndexOf('\\')); - return lastSep < 0 ? firstToken : firstToken.slice(lastSep + 1); + return `https://${cleanHost}${path}`; } diff --git a/lib/telemetry/types.ts b/lib/telemetry/types.ts index 6a4a25a9..e6c6e316 100644 --- a/lib/telemetry/types.ts +++ b/lib/telemetry/types.ts @@ -43,18 +43,9 @@ export interface TelemetryConfiguration { /** Interval in milliseconds to flush metrics */ flushIntervalMs?: number; - /** Maximum retry attempts for export (attempts *after* the initial call) */ + /** Maximum retry attempts for export */ maxRetries?: number; - /** Minimum backoff delay in ms for retry backoff */ - backoffBaseMs?: number; - - /** Maximum backoff delay in ms (includes jitter) */ - backoffMaxMs?: number; - - /** Upper bound of added jitter in ms */ - backoffJitterMs?: number; - /** Whether to use authenticated export endpoint */ authenticatedExport?: boolean; @@ -64,34 +55,23 @@ export interface TelemetryConfiguration { /** Circuit breaker timeout in milliseconds */ circuitBreakerTimeout?: number; - /** Maximum number of pending metrics buffered before dropping oldest */ + /** Maximum number of pending metrics buffered before dropping oldest (prevents unbounded growth when export keeps failing) */ maxPendingMetrics?: number; - - /** Maximum number of error events buffered per statement before dropping oldest */ - maxErrorsPerStatement?: number; - - /** TTL in ms after which abandoned statement aggregations are evicted */ - statementTtlMs?: number; } /** * Default telemetry configuration values */ -export const DEFAULT_TELEMETRY_CONFIG: Readonly> = Object.freeze({ - enabled: false, +export const DEFAULT_TELEMETRY_CONFIG: Required = { + enabled: false, // Initially disabled for safe rollout batchSize: 100, flushIntervalMs: 5000, maxRetries: 3, - backoffBaseMs: 100, - backoffMaxMs: 1000, - backoffJitterMs: 100, authenticatedExport: true, circuitBreakerThreshold: 5, - circuitBreakerTimeout: 60000, + circuitBreakerTimeout: 60000, // 1 minute maxPendingMetrics: 500, - maxErrorsPerStatement: 50, - statementTtlMs: 60 * 60 * 1000, // 1 hour -}); +}; /** * Runtime telemetry event emitted by the driver @@ -152,9 +132,6 @@ export interface TelemetryEvent { /** Error message */ errorMessage?: string; - /** Stack trace, captured at emission site; redacted before export */ - errorStack?: string; - /** Whether the error is terminal (non-retryable) */ isTerminal?: boolean; } @@ -201,9 +178,6 @@ export interface TelemetryMetric { /** Error message */ errorMessage?: string; - - /** Stack trace, captured at emission site; redacted before export */ - errorStack?: string; } /** @@ -237,10 +211,7 @@ export interface DriverConfiguration { /** Character set encoding (e.g., UTF-8) */ charSetEncoding: string; - /** - * Process name. Producers MUST pass only a basename (no absolute path) — - * `sanitizeProcessName()` is applied at export time as a defence in depth. - */ + /** Process name */ processName: string; // Feature flags diff --git a/tests/unit/.stubs/CircuitBreakerStub.ts b/tests/unit/.stubs/CircuitBreakerStub.ts index e85ad35c..f806eb9f 100644 --- a/tests/unit/.stubs/CircuitBreakerStub.ts +++ b/tests/unit/.stubs/CircuitBreakerStub.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { CircuitBreakerOpenError, CircuitBreakerState } from '../../../lib/telemetry/CircuitBreaker'; +import { CircuitBreakerState } from '../../../lib/telemetry/CircuitBreaker'; /** * Stub implementation of CircuitBreaker for testing. @@ -34,7 +34,7 @@ export default class CircuitBreakerStub { this.executeCallCount++; if (this.state === CircuitBreakerState.OPEN) { - throw new CircuitBreakerOpenError(); + throw new Error('Circuit breaker OPEN'); } try { diff --git a/tests/unit/.stubs/ClientContextStub.ts b/tests/unit/.stubs/ClientContextStub.ts index 519316ff..0178d3c6 100644 --- a/tests/unit/.stubs/ClientContextStub.ts +++ b/tests/unit/.stubs/ClientContextStub.ts @@ -1,3 +1,4 @@ +import { HeadersInit } from 'node-fetch'; import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientContext'; import IConnectionProvider from '../../../lib/connection/contracts/IConnectionProvider'; import IDriver from '../../../lib/contracts/IDriver'; @@ -48,4 +49,8 @@ export default class ClientContextStub implements IClientContext { public async getDriver(): Promise { return this.driver; } + + public async getAuthHeaders(): Promise { + return { Authorization: 'Bearer test-token' }; + } } diff --git a/tests/unit/telemetry/CircuitBreaker.test.ts b/tests/unit/telemetry/CircuitBreaker.test.ts index d6ff62af..3c9e7083 100644 --- a/tests/unit/telemetry/CircuitBreaker.test.ts +++ b/tests/unit/telemetry/CircuitBreaker.test.ts @@ -17,9 +17,7 @@ import { expect } from 'chai'; import sinon from 'sinon'; import { - CIRCUIT_BREAKER_OPEN_CODE, CircuitBreaker, - CircuitBreakerOpenError, CircuitBreakerRegistry, CircuitBreakerState, DEFAULT_CIRCUIT_BREAKER_CONFIG, @@ -407,65 +405,6 @@ describe('CircuitBreaker', () => { }); }); - describe('HALF_OPEN concurrent-probe invariant', () => { - it('admits only one probe when two callers race after OPEN→HALF_OPEN transition', async () => { - clock.restore(); - const context = new ClientContextStub(); - const breaker = new CircuitBreaker(context, { failureThreshold: 1, timeout: 1, successThreshold: 1 }); - - // Trip to OPEN. - await breaker.execute(() => Promise.reject(new Error('boom'))).catch(() => {}); - expect(breaker.getState()).to.equal(CircuitBreakerState.OPEN); - - // Wait past the timeout so the next execute() flips to HALF_OPEN. - await new Promise((r) => setTimeout(r, 5)); - - // Hold the probe in-flight so the second caller races against it. - let releaseProbe: (() => void) | null = null; - const probeGate = new Promise((res) => { - releaseProbe = res; - }); - - let probeRan = false; - let rejectedRan = false; - - const first = breaker.execute(async () => { - probeRan = true; - await probeGate; - }); - - const second = breaker - .execute(async () => { - rejectedRan = true; - }) - .catch((err) => err); - - const secondResult = await second; - expect(probeRan).to.be.true; - expect(rejectedRan).to.be.false; - expect(secondResult).to.be.instanceOf(CircuitBreakerOpenError); - - releaseProbe!(); - await first; - }); - - it('throws CircuitBreakerOpenError with code when OPEN', async () => { - const context = new ClientContextStub(); - const breaker = new CircuitBreaker(context, { failureThreshold: 1, timeout: 60_000, successThreshold: 1 }); - - await breaker.execute(() => Promise.reject(new Error('boom'))).catch(() => {}); - - let caught: any; - try { - await breaker.execute(async () => 1); - } catch (err) { - caught = err; - } - expect(caught).to.be.instanceOf(CircuitBreakerOpenError); - expect(caught.code).to.equal(CIRCUIT_BREAKER_OPEN_CODE); - }); - }); - describe('State transitions logging', () => { it('should log all state transitions at debug level', async () => { const context = new ClientContextStub(); diff --git a/tests/unit/telemetry/DatabricksTelemetryExporter.test.ts b/tests/unit/telemetry/DatabricksTelemetryExporter.test.ts index fb347bf6..2cec80fa 100644 --- a/tests/unit/telemetry/DatabricksTelemetryExporter.test.ts +++ b/tests/unit/telemetry/DatabricksTelemetryExporter.test.ts @@ -21,11 +21,6 @@ import { CircuitBreakerRegistry } from '../../../lib/telemetry/CircuitBreaker'; import { TelemetryMetric } from '../../../lib/telemetry/types'; import ClientContextStub from '../.stubs/ClientContextStub'; import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; -import IAuthentication from '../../../lib/connection/contracts/IAuthentication'; - -const fakeAuthProvider: IAuthentication = { - authenticate: async () => ({ Authorization: 'Bearer test-token' }), -}; function makeMetric(overrides: Partial = {}): TelemetryMetric { return { @@ -60,7 +55,7 @@ describe('DatabricksTelemetryExporter', () => { it('should return immediately for empty metrics array', async () => { const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); await exporter.export([]); @@ -71,7 +66,7 @@ describe('DatabricksTelemetryExporter', () => { it('should call sendRequest with correct endpoint for authenticated export', async () => { const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); await exporter.export([makeMetric()]); @@ -85,7 +80,7 @@ describe('DatabricksTelemetryExporter', () => { it('should call sendRequest with unauthenticated endpoint when configured', async () => { const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); await exporter.export([makeMetric()]); @@ -97,7 +92,7 @@ describe('DatabricksTelemetryExporter', () => { it('should preserve host protocol if already set', async () => { const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'https://host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'https://host.example.com', registry); const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); await exporter.export([makeMetric()]); @@ -109,7 +104,7 @@ describe('DatabricksTelemetryExporter', () => { it('should never throw even when sendRequest fails', async () => { const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); sinon.stub(exporter as any, 'sendRequest').rejects(new Error('network error')); let threw = false; @@ -123,44 +118,34 @@ describe('DatabricksTelemetryExporter', () => { }); describe('export() - retry logic', () => { - it('should retry on retryable HTTP errors (503)', async () => { - const context = new ClientContextStub({ telemetryMaxRetries: 2 } as any); + it('should invoke sendRequest once per export and delegate retry to connectionProvider.getRetryPolicy', async () => { + // Retry is now handled inside sendRequest via retryPolicy.invokeWithRetry (shared connection stack). + // From the exporter's perspective, sendRequest is one atomic call. + const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - // Fail twice with 503, then succeed - const sendRequestStub = sinon - .stub(exporter as any, 'sendRequest') - .onFirstCall() - .returns(makeErrorResponse(503, 'Service Unavailable')) - .onSecondCall() - .returns(makeErrorResponse(503, 'Service Unavailable')) - .onThirdCall() - .returns(makeOkResponse()); - - // Advance fake timers automatically for sleep calls - const exportPromise = exporter.export([makeMetric()]); - await clock.runAllAsync(); - await exportPromise; + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); + const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - expect(sendRequestStub.callCount).to.equal(3); + await exporter.export([makeMetric()]); + + expect(sendRequestStub.callCount).to.equal(1); }); it('should not retry on terminal HTTP errors (400)', async () => { - const context = new ClientContextStub({ telemetryMaxRetries: 3 } as any); + const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeErrorResponse(400, 'Bad Request')); await exporter.export([makeMetric()]); - // Only one call — no retry on terminal error expect(sendRequestStub.callCount).to.equal(1); }); it('should not retry on terminal HTTP errors (401)', async () => { - const context = new ClientContextStub({ telemetryMaxRetries: 3 } as any); + const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon .stub(exporter as any, 'sendRequest') .returns(makeErrorResponse(401, 'Unauthorized')); @@ -170,20 +155,19 @@ describe('DatabricksTelemetryExporter', () => { expect(sendRequestStub.callCount).to.equal(1); }); - it('should give up after maxRetries are exhausted', async () => { - const context = new ClientContextStub({ telemetryMaxRetries: 2 } as any); + it('should swallow all errors when sendRequest fails after retries are exhausted', async () => { + const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - const sendRequestStub = sinon - .stub(exporter as any, 'sendRequest') - .returns(makeErrorResponse(503, 'Service Unavailable')); - - const exportPromise = exporter.export([makeMetric()]); - await clock.runAllAsync(); - await exportPromise; + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); + sinon.stub(exporter as any, 'sendRequest').returns(makeErrorResponse(503, 'Service Unavailable')); - // 1 initial + 2 retries = 3 total calls - expect(sendRequestStub.callCount).to.equal(3); + let threw = false; + try { + await exporter.export([makeMetric()]); + } catch { + threw = true; + } + expect(threw).to.be.false; }); }); @@ -193,7 +177,7 @@ describe('DatabricksTelemetryExporter', () => { const context = new ClientContextStub({ telemetryMaxRetries: 0 } as any); const registry = new CircuitBreakerRegistry(context); registry.getCircuitBreaker('host.example.com', { failureThreshold: 1 }); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon .stub(exporter as any, 'sendRequest') .returns(makeErrorResponse(503, 'Service Unavailable')); @@ -213,7 +197,7 @@ describe('DatabricksTelemetryExporter', () => { const logSpy = sinon.spy((context as any).logger, 'log'); const registry = new CircuitBreakerRegistry(context); registry.getCircuitBreaker('host.example.com', { failureThreshold: 1 }); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); sinon.stub(exporter as any, 'sendRequest').returns(makeErrorResponse(503, 'Service Unavailable')); await exporter.export([makeMetric()]); @@ -229,7 +213,7 @@ describe('DatabricksTelemetryExporter', () => { it('should send POST request with JSON content-type', async () => { const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); await exporter.export([makeMetric()]); @@ -242,7 +226,7 @@ describe('DatabricksTelemetryExporter', () => { it('should include protoLogs in payload body', async () => { const context = new ClientContextStub(); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); await exporter.export([makeMetric(), makeMetric()]); @@ -259,7 +243,7 @@ describe('DatabricksTelemetryExporter', () => { const context = new ClientContextStub(); const logSpy = sinon.spy((context as any).logger, 'log'); const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); + const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry); sinon.stub(exporter as any, 'sendRequest').rejects(new Error('something went wrong')); const exportPromise = exporter.export([makeMetric()]); @@ -270,229 +254,4 @@ describe('DatabricksTelemetryExporter', () => { // Note: circuit breaker logs at warn level when transitioning to OPEN, which is expected }); }); - - describe('Authorization header flow', () => { - it('sends Authorization header returned by the auth provider on authenticated export', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([makeMetric()]); - - const init = sendRequestStub.firstCall.args[1] as any; - expect(init.headers.Authorization).to.equal('Bearer test-token'); - }); - - it('drops the batch when authenticated export is requested but auth returns no header', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: true, telemetryMaxRetries: 0 } as any); - const registry = new CircuitBreakerRegistry(context); - const emptyAuth = { authenticate: async () => ({}) }; - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, emptyAuth as any); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([makeMetric()]); - - expect(sendRequestStub.called).to.be.false; - }); - - it('warns exactly once across consecutive auth-missing drops', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: true, telemetryMaxRetries: 0 } as any); - const logSpy = sinon.spy((context as any).logger, 'log'); - const registry = new CircuitBreakerRegistry(context); - const emptyAuth = { authenticate: async () => ({}) }; - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, emptyAuth as any); - sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([makeMetric()]); - await exporter.export([makeMetric()]); - await exporter.export([makeMetric()]); - - const warnCalls = logSpy - .getCalls() - .filter((c) => c.args[0] === LogLevel.warn && /Authorization/.test(String(c.args[1]))); - expect(warnCalls.length).to.equal(1); - }); - - it('re-arms the auth-missing warn after a successful export', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: true, telemetryMaxRetries: 0 } as any); - const logSpy = sinon.spy((context as any).logger, 'log'); - const registry = new CircuitBreakerRegistry(context); - let headers: Record = {}; - const toggleAuth = { authenticate: async () => headers }; - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, toggleAuth as any); - sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([makeMetric()]); // warns once - headers = { Authorization: 'Bearer recovered' }; - await exporter.export([makeMetric()]); // success → re-arms - headers = {}; - await exporter.export([makeMetric()]); // warns again - - const warnCalls = logSpy - .getCalls() - .filter((c) => c.args[0] === LogLevel.warn && /Authorization/.test(String(c.args[1]))); - expect(warnCalls.length).to.equal(2); - }); - }); - - describe('unauthenticated endpoint privacy', () => { - it('omits workspace_id, session_id, statement_id from unauth payload', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([ - makeMetric({ - metricType: 'connection', - sessionId: 'session-xyz', - statementId: 'stmt-abc', - workspaceId: 'ws-123', - } as any), - ]); - - const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); - const log = JSON.parse(body.protoLogs[0]); - expect(log.workspace_id).to.be.undefined; - expect(log.entry.sql_driver_log.session_id).to.be.undefined; - expect(log.entry.sql_driver_log.sql_statement_id).to.be.undefined; - }); - - it('omits system_configuration from unauth payload', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([ - makeMetric({ - metricType: 'connection', - driverConfig: { - driverVersion: '1.x', - driverName: 'nodejs-sql-driver', - nodeVersion: '20.0', - platform: 'linux', - osVersion: '5.0', - osArch: 'x64', - runtimeVendor: 'v8', - localeName: 'en_US', - charSetEncoding: 'UTF-8', - processName: '/home/alice/worker.js', - }, - } as any), - ]); - - const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); - const log = JSON.parse(body.protoLogs[0]); - expect(log.entry.sql_driver_log.system_configuration).to.be.undefined; - }); - - it('strips userAgentEntry from User-Agent on unauth path', async () => { - const context = new ClientContextStub({ - telemetryAuthenticatedExport: false, - userAgentEntry: 'MyTenantApp/1.2.3', - } as any); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([makeMetric()]); - - const ua = (sendRequestStub.firstCall.args[1] as any).headers['User-Agent']; - expect(ua).to.not.include('MyTenantApp'); - }); - - it('blanks stack_trace on unauth error metrics', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: false } as any); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([ - makeMetric({ - metricType: 'error', - errorName: 'SomeError', - errorMessage: 'Bearer leaked-token in the message', - errorStack: 'Error: leak\n at fn (dapi0123456789abcdef)', - } as any), - ]); - - const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); - const log = JSON.parse(body.protoLogs[0]); - expect(log.entry.sql_driver_log.error_info.stack_trace).to.equal(''); - expect(log.entry.sql_driver_log.error_info.error_name).to.equal('SomeError'); - }); - }); - - describe('errorStack flow (authenticated)', () => { - it('redacts Bearer tokens in stack_trace before export', async () => { - const context = new ClientContextStub({ telemetryAuthenticatedExport: true } as any); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([ - makeMetric({ - metricType: 'error', - errorName: 'AuthError', - errorMessage: 'ignored because errorStack is preferred', - errorStack: 'Error: boom\n at Bearer leaked-bearer-token', - } as any), - ]); - - const body = JSON.parse((sendRequestStub.firstCall.args[1] as any).body); - const log = JSON.parse(body.protoLogs[0]); - const stack = log.entry.sql_driver_log.stack_trace ?? log.entry.sql_driver_log.error_info?.stack_trace; - expect(stack).to.include(''); - expect(stack).to.not.include('leaked-bearer-token'); - }); - }); - - describe('host validation', () => { - it('drops the batch when host fails validation (malformed)', async () => { - const context = new ClientContextStub(); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, '//attacker.com', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([makeMetric()]); - - expect(sendRequestStub.called).to.be.false; - }); - - it('drops the batch when host is loopback', async () => { - const context = new ClientContextStub(); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, '127.0.0.1', registry, fakeAuthProvider); - const sendRequestStub = sinon.stub(exporter as any, 'sendRequest').returns(makeOkResponse()); - - await exporter.export([makeMetric()]); - - expect(sendRequestStub.called).to.be.false; - }); - }); - - describe('dispose()', () => { - it('removes the per-host circuit breaker from the registry', () => { - const context = new ClientContextStub(); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - - expect(registry.getAllBreakers().has('host.example.com')).to.be.true; - - exporter.dispose(); - - expect(registry.getAllBreakers().has('host.example.com')).to.be.false; - }); - - it('is idempotent', () => { - const context = new ClientContextStub(); - const registry = new CircuitBreakerRegistry(context); - const exporter = new DatabricksTelemetryExporter(context, 'host.example.com', registry, fakeAuthProvider); - - exporter.dispose(); - expect(() => exporter.dispose()).to.not.throw(); - }); - }); }); diff --git a/tests/unit/telemetry/ExceptionClassifier.test.ts b/tests/unit/telemetry/ExceptionClassifier.test.ts index d7473aea..e29bb044 100644 --- a/tests/unit/telemetry/ExceptionClassifier.test.ts +++ b/tests/unit/telemetry/ExceptionClassifier.test.ts @@ -176,13 +176,10 @@ describe('ExceptionClassifier', () => { expect(ExceptionClassifier.isRetryable(error)).to.be.true; }); - it('should NOT identify ENOTFOUND as retryable (likely misconfigured host)', () => { - // DNS "not found" is deterministic; retrying just pushes load at - // the resolver without any expectation of success. Breaker records - // it as a failure and moves on. + it('should identify ENOTFOUND as retryable', () => { const error = new Error('getaddrinfo ENOTFOUND host.example.com'); (error as any).code = 'ENOTFOUND'; - expect(ExceptionClassifier.isRetryable(error)).to.be.false; + expect(ExceptionClassifier.isRetryable(error)).to.be.true; }); it('should identify EHOSTUNREACH as retryable', () => { @@ -196,19 +193,6 @@ describe('ExceptionClassifier', () => { (error as any).code = 'ECONNRESET'; expect(ExceptionClassifier.isRetryable(error)).to.be.true; }); - - it('should identify ETIMEDOUT as retryable', () => { - const error = new Error('connect ETIMEDOUT'); - (error as any).code = 'ETIMEDOUT'; - expect(ExceptionClassifier.isRetryable(error)).to.be.true; - }); - - it('should identify EAI_AGAIN as retryable', () => { - // getaddrinfo EAI_AGAIN is a temporary DNS lookup failure (unlike ENOTFOUND). - const error = new Error('getaddrinfo EAI_AGAIN host.example.com'); - (error as any).code = 'EAI_AGAIN'; - expect(ExceptionClassifier.isRetryable(error)).to.be.true; - }); }); describe('HTTP 429 Too Many Requests', () => { diff --git a/tests/unit/telemetry/MetricsAggregator.test.ts b/tests/unit/telemetry/MetricsAggregator.test.ts index e3d1bb01..219edb7e 100644 --- a/tests/unit/telemetry/MetricsAggregator.test.ts +++ b/tests/unit/telemetry/MetricsAggregator.test.ts @@ -275,47 +275,6 @@ describe('MetricsAggregator', () => { const metrics = exporter.export.firstCall.args[0]; expect(metrics[0].metricType).to.equal('statement'); }); - - it('awaits in-flight export before resolving — prevents process.exit truncation', async () => { - clock.restore(); - const context = new ClientContextStub(); - let resolveExport!: () => void; - const pendingExport = new Promise((r) => { - resolveExport = r; - }); - const exporter: any = { export: sinon.stub().returns(pendingExport) }; - const aggregator = new MetricsAggregator(context, exporter); - - aggregator.processEvent(connectionEvent()); - - const done = aggregator.close(); - expect(done).to.be.an.instanceof(Promise); - - let resolved = false; - done.then(() => { - resolved = true; - }); - await Promise.resolve(); - await Promise.resolve(); - expect(resolved, 'close() should wait for exporter promise before resolving').to.be.false; - - resolveExport(); - await done; - expect(resolved).to.be.true; - }); - - it('does not resurrect the flush timer after close', async () => { - clock.restore(); - const context = new ClientContextStub({ telemetryBatchSize: 1 } as any); - const exporter = makeExporterStub(); - const aggregator = new MetricsAggregator(context, exporter as any); - - aggregator.processEvent(statementEvent(TelemetryEventType.STATEMENT_START)); - await aggregator.close(); - - expect((aggregator as any).flushTimer, 'flushTimer should be null after close').to.equal(null); - expect((aggregator as any).closed).to.be.true; - }); }); describe('exception swallowing', () => { diff --git a/tests/unit/telemetry/telemetryUtils.test.ts b/tests/unit/telemetry/telemetryUtils.test.ts deleted file mode 100644 index eaffc321..00000000 --- a/tests/unit/telemetry/telemetryUtils.test.ts +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Copyright (c) 2025 Databricks Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - */ - -import { expect } from 'chai'; -import { buildTelemetryUrl, redactSensitive, sanitizeProcessName } from '../../../lib/telemetry/telemetryUtils'; - -describe('buildTelemetryUrl', () => { - describe('valid hosts', () => { - it('returns https URL for a bare host', () => { - expect(buildTelemetryUrl('myws.cloud.databricks.com', '/telemetry-ext')).to.equal( - 'https://myws.cloud.databricks.com/telemetry-ext', - ); - }); - - it('strips a leading https:// prefix', () => { - expect(buildTelemetryUrl('https://myws.cloud.databricks.com', '/telemetry-ext')).to.equal( - 'https://myws.cloud.databricks.com/telemetry-ext', - ); - }); - - it('strips a leading http:// prefix and upgrades to https', () => { - expect(buildTelemetryUrl('http://myws.cloud.databricks.com', '/telemetry-ext')).to.equal( - 'https://myws.cloud.databricks.com/telemetry-ext', - ); - }); - - it('strips trailing slashes', () => { - expect(buildTelemetryUrl('myws.cloud.databricks.com///', '/telemetry-ext')).to.equal( - 'https://myws.cloud.databricks.com/telemetry-ext', - ); - }); - - it('accepts an explicit default port and normalises it', () => { - // `new URL` strips :443 for https; we rely on that normalisation. - expect(buildTelemetryUrl('myws.cloud.databricks.com:443', '/x')).to.equal('https://myws.cloud.databricks.com/x'); - }); - - it('accepts a non-default port and preserves it', () => { - expect(buildTelemetryUrl('myws.cloud.databricks.com:8443', '/x')).to.equal( - 'https://myws.cloud.databricks.com:8443/x', - ); - }); - }); - - describe('SSRF / redirection rejections', () => { - it('rejects protocol-relative prefix', () => { - expect(buildTelemetryUrl('//attacker.com', '/telemetry-ext')).to.equal(null); - }); - - it('rejects zero-width space inside host', () => { - // `legit.com\u200battacker.com` would otherwise collapse to - // `legit.comattacker.com` inside `new URL`. - expect(buildTelemetryUrl('legit.com\u200battacker.com', '/telemetry-ext')).to.equal(null); - }); - - it('rejects BOM inside host', () => { - expect(buildTelemetryUrl('legit.com\ufeffattacker.com', '/telemetry-ext')).to.equal(null); - }); - - it('rejects userinfo', () => { - expect(buildTelemetryUrl('user:pass@attacker.com', '/telemetry-ext')).to.equal(null); - }); - - it('rejects CR in host', () => { - expect(buildTelemetryUrl('legit.com\r\nInjected: header', '/x')).to.equal(null); - }); - - it('rejects LF in host', () => { - expect(buildTelemetryUrl('legit.com\nInjected: header', '/x')).to.equal(null); - }); - - it('rejects tab in host', () => { - expect(buildTelemetryUrl('legit.com\tbad', '/x')).to.equal(null); - }); - - it('rejects path appended to host', () => { - expect(buildTelemetryUrl('legit.com/evil', '/telemetry-ext')).to.equal(null); - }); - - it('rejects query appended to host', () => { - expect(buildTelemetryUrl('legit.com?x=1', '/telemetry-ext')).to.equal(null); - }); - - it('rejects fragment appended to host', () => { - expect(buildTelemetryUrl('legit.com#frag', '/telemetry-ext')).to.equal(null); - }); - - it('rejects backslash in host', () => { - expect(buildTelemetryUrl('legit.com\\evil', '/x')).to.equal(null); - }); - - it('rejects at-sign in host', () => { - expect(buildTelemetryUrl('a@b.com', '/x')).to.equal(null); - }); - - it('rejects empty host', () => { - expect(buildTelemetryUrl('', '/x')).to.equal(null); - }); - - it('rejects only-slashes host', () => { - expect(buildTelemetryUrl('///', '/x')).to.equal(null); - }); - }); - - describe('deny-listed hosts', () => { - it('rejects IPv4 loopback', () => { - expect(buildTelemetryUrl('127.0.0.1', '/telemetry-ext')).to.equal(null); - expect(buildTelemetryUrl('127.1.2.3', '/telemetry-ext')).to.equal(null); - }); - - it('rejects 0.0.0.0', () => { - expect(buildTelemetryUrl('0.0.0.0', '/telemetry-ext')).to.equal(null); - }); - - it('rejects RFC1918 10.0.0.0/8', () => { - expect(buildTelemetryUrl('10.0.0.1', '/telemetry-ext')).to.equal(null); - }); - - it('rejects RFC1918 192.168/16', () => { - expect(buildTelemetryUrl('192.168.1.1', '/telemetry-ext')).to.equal(null); - }); - - it('rejects RFC1918 172.16-31', () => { - expect(buildTelemetryUrl('172.16.0.1', '/telemetry-ext')).to.equal(null); - expect(buildTelemetryUrl('172.31.255.254', '/telemetry-ext')).to.equal(null); - }); - - it('accepts 172.32 (outside RFC1918)', () => { - expect(buildTelemetryUrl('172.32.0.1', '/telemetry-ext')).to.equal('https://172.32.0.1/telemetry-ext'); - }); - - it('rejects AWS IMDS', () => { - expect(buildTelemetryUrl('169.254.169.254', '/telemetry-ext')).to.equal(null); - }); - - it('rejects GCP metadata', () => { - expect(buildTelemetryUrl('metadata.google.internal', '/telemetry-ext')).to.equal(null); - expect(buildTelemetryUrl('METADATA.GOOGLE.INTERNAL', '/telemetry-ext')).to.equal(null); - }); - - it('rejects Azure metadata', () => { - expect(buildTelemetryUrl('metadata.azure.com', '/telemetry-ext')).to.equal(null); - }); - - it('rejects localhost', () => { - expect(buildTelemetryUrl('localhost', '/telemetry-ext')).to.equal(null); - expect(buildTelemetryUrl('LocalHost', '/telemetry-ext')).to.equal(null); - }); - }); -}); - -describe('redactSensitive', () => { - it('returns empty string for undefined', () => { - expect(redactSensitive(undefined)).to.equal(''); - }); - - it('returns empty string for empty input', () => { - expect(redactSensitive('')).to.equal(''); - }); - - it('redacts Bearer tokens', () => { - const redacted = redactSensitive('Authorization: Bearer abc.def.ghi-jkl'); - expect(redacted).to.equal('Authorization: Bearer '); - }); - - it('redacts multiple Bearer tokens in one string', () => { - const redacted = redactSensitive('first Bearer abc second Bearer xyz'); - expect(redacted).to.equal('first Bearer second Bearer '); - }); - - it('redacts Basic auth', () => { - expect(redactSensitive('Authorization: Basic dXNlcjpwYXNz')).to.equal('Authorization: Basic '); - }); - - it('redacts URL-embedded credentials', () => { - expect(redactSensitive('fetch https://user:pass@legit.com/api')).to.equal('fetch https://@legit.com/api'); - }); - - it('redacts Databricks PAT (dapi)', () => { - expect(redactSensitive('token is dapi0123456789abcdef01')).to.equal('token is '); - }); - - it('redacts Databricks PAT (dkea, dskea, dsapi, dose)', () => { - for (const prefix of ['dkea', 'dskea', 'dsapi', 'dose']) { - expect(redactSensitive(`tok ${prefix}0123456789abcdef`)).to.equal('tok '); - } - }); - - it('redacts realistic JWT', () => { - // This is NOT a real token — it's a synthetic JWT-shaped string built - // from harmless segments purely to exercise the regex. Constructed by - // string concatenation so the assembled token never appears as a - // source literal (otherwise pre-commit secret scanners, rightly, flag - // the test file itself). - const header = `${'eyJ'}hbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9`; - const payload = `${'eyJ'}zdWIiOiJ0ZXN0LXN1YmplY3QifQ`; - const signature = 'Ab-123_xyz456_abcDEF789'; - const jwt = `${header}.${payload}.${signature}`; - expect(redactSensitive(`Authorization: ${jwt}`)).to.include(''); - }); - - it('redacts JSON-quoted access_token', () => { - expect(redactSensitive('{"access_token":"eyJabc.def.ghi"}')).to.equal('{"access_token":""}'); - }); - - it('redacts JSON-quoted client_secret', () => { - expect(redactSensitive('body={"client_id":"abc","client_secret":"xyz"}')).to.include( - '"client_secret":""', - ); - }); - - it('redacts JSON-quoted refresh_token, id_token, password, api_key', () => { - for (const key of ['refresh_token', 'id_token', 'password', 'api_key', 'apikey']) { - expect(redactSensitive(`{"${key}":"x"}`)).to.equal(`{"${key}":""}`); - } - }); - - it('redacts form-encoded token= style secrets', () => { - expect(redactSensitive('post body=client_secret=xyz&token=abc&password=hunter2')).to.equal( - 'post body=client_secret=&token=&password=', - ); - }); - - it('caps long input with truncation marker', () => { - const long = `${'x'.repeat(3000)}Bearer abc`; - const redacted = redactSensitive(long, 2048); - expect(redacted.length).to.be.lessThan(long.length); - expect(redacted).to.include('…[truncated]'); - }); - - it('applies redaction again after truncation', () => { - // Secret appears in the tail; first-pass redacts, then truncation, then - // the cap-time second pass catches anything missed. - const input = `${'x'.repeat(3000)}Bearer leaked-token`; - const redacted = redactSensitive(input, 50); - expect(redacted).to.not.include('leaked-token'); - }); -}); - -describe('sanitizeProcessName', () => { - it('returns empty string for undefined', () => { - expect(sanitizeProcessName(undefined)).to.equal(''); - }); - - it('returns empty string for whitespace-only', () => { - expect(sanitizeProcessName(' ')).to.equal(''); - }); - - it('strips absolute path', () => { - expect(sanitizeProcessName('/home/alice/worker.js')).to.equal('worker.js'); - }); - - it('strips Windows path', () => { - expect(sanitizeProcessName('C:\\Users\\bob\\worker.js')).to.equal('worker.js'); - }); - - it('returns basename unchanged when no path', () => { - expect(sanitizeProcessName('worker.js')).to.equal('worker.js'); - }); - - it('drops argv tail (whitespace-separated)', () => { - expect(sanitizeProcessName('node --db-password=secret app.js')).to.equal('node'); - }); - - it('drops argv tail after full path', () => { - expect(sanitizeProcessName('/usr/bin/node --token=abc app.js')).to.equal('node'); - }); - - it('preserves basename-only input without spaces', () => { - expect(sanitizeProcessName('my-worker')).to.equal('my-worker'); - }); -});