Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ coverage_unit
dist
*.DS_Store
lib/version.ts
coverage/
26 changes: 11 additions & 15 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import thrift from 'thrift';
import Int64 from 'node-int64';

import { EventEmitter } from 'events';
import { HeadersInit } from 'node-fetch';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
Expand Down Expand Up @@ -238,12 +239,6 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
this.config.enableMetricViewMetadata = options.enableMetricViewMetadata;
}

// Persist userAgentEntry so telemetry and feature-flag call sites reuse
// the same value as the primary Thrift connection's User-Agent.
if (options.userAgentEntry !== undefined) {
this.config.userAgentEntry = options.userAgentEntry;
}

this.authProvider = this.createAuthProvider(options, authProvider);

this.connectionProvider = this.createConnectionProvider(options);
Expand Down Expand Up @@ -359,14 +354,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
return this.driver;
}

/**
* Returns the authentication provider associated with this client, if any.
* Intended for internal telemetry/feature-flag call sites that need to
* obtain auth headers directly without routing through `IClientContext`.
*
* @internal Not part of the public API. May change without notice.
*/
public getAuthProvider(): IAuthentication | undefined {
return this.authProvider;
public async getAuthHeaders(): Promise<HeadersInit> {
if (this.authProvider) {
try {
return await this.authProvider.authenticate();
} catch (error) {
this.logger.log(LogLevel.debug, `Error getting auth headers: ${error}`);
return {};
}
}
return {};
}
}
14 changes: 8 additions & 6 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { HeadersInit } from 'node-fetch';
import IDBSQLLogger from './IDBSQLLogger';
import IDriver from './IDriver';
import IConnectionProvider from '../connection/contracts/IConnectionProvider';
Expand Down Expand Up @@ -28,16 +29,10 @@ export interface ClientConfig {
telemetryBatchSize?: number;
telemetryFlushIntervalMs?: number;
telemetryMaxRetries?: number;
telemetryBackoffBaseMs?: number;
telemetryBackoffMaxMs?: number;
telemetryBackoffJitterMs?: number;
telemetryAuthenticatedExport?: boolean;
telemetryCircuitBreakerThreshold?: number;
telemetryCircuitBreakerTimeout?: number;
telemetryMaxPendingMetrics?: number;
telemetryMaxErrorsPerStatement?: number;
telemetryStatementTtlMs?: number;
userAgentEntry?: string;
}

export default interface IClientContext {
Expand All @@ -50,4 +45,11 @@ export default interface IClientContext {
getClient(): Promise<IThriftClient>;

getDriver(): Promise<IDriver>;

/**
* Gets authentication headers for HTTP requests.
* Used by telemetry and feature flag fetching to authenticate REST API calls.
* @returns Promise resolving to headers object with authentication, or empty object if no auth
*/
getAuthHeaders(): Promise<HeadersInit>;
}
5 changes: 0 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ import { LogLevel } from './contracts/IDBSQLLogger';
// Re-export types for TypeScript users
export type { default as ITokenProvider } from './connection/auth/tokenProvider/ITokenProvider';

// Re-export telemetry error classes so consumers can instanceof-check rather
// than string-matching error messages.
export { CircuitBreakerOpenError, CIRCUIT_BREAKER_OPEN_CODE } from './telemetry/CircuitBreaker';
export { TelemetryTerminalError } from './telemetry/DatabricksTelemetryExporter';

export const auth = {
PlainHttpAuthentication,
// Token provider classes for custom authentication
Expand Down
152 changes: 98 additions & 54 deletions lib/telemetry/CircuitBreaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,48 @@
import IClientContext from '../contracts/IClientContext';
import { LogLevel } from '../contracts/IDBSQLLogger';

/**
* States of the circuit breaker.
*/
export enum CircuitBreakerState {
/** Normal operation, requests pass through */
CLOSED = 'CLOSED',
/** After threshold failures, all requests rejected immediately */
OPEN = 'OPEN',
/** After timeout, allows test requests to check if endpoint recovered */
HALF_OPEN = 'HALF_OPEN',
}

/**
* Configuration for circuit breaker behavior.
*/
export interface CircuitBreakerConfig {
/** Number of consecutive failures before opening the circuit */
failureThreshold: number;
/** Time in milliseconds to wait before attempting recovery */
timeout: number;
/** Number of consecutive successes in HALF_OPEN state to close the circuit */
successThreshold: number;
}

export const DEFAULT_CIRCUIT_BREAKER_CONFIG: Readonly<CircuitBreakerConfig> = Object.freeze({
/**
* Default circuit breaker configuration.
*/
export const DEFAULT_CIRCUIT_BREAKER_CONFIG: CircuitBreakerConfig = {
failureThreshold: 5,
timeout: 60000,
timeout: 60000, // 1 minute
successThreshold: 2,
});

export const CIRCUIT_BREAKER_OPEN_CODE = 'CIRCUIT_BREAKER_OPEN' as const;
};

/**
* Thrown when execute() is called while the breaker is OPEN or a HALF_OPEN
* probe is already in flight. Callers identify the condition via
* `instanceof CircuitBreakerOpenError` or `err.code === CIRCUIT_BREAKER_OPEN_CODE`
* rather than string-matching the message.
* Circuit breaker for telemetry exporter.
* Protects against failing telemetry endpoint with automatic recovery.
*
* States:
* - CLOSED: Normal operation, requests pass through
* - OPEN: After threshold failures, all requests rejected immediately
* - HALF_OPEN: After timeout, allows test requests to check if endpoint recovered
*/
export class CircuitBreakerOpenError extends Error {
readonly code = CIRCUIT_BREAKER_OPEN_CODE;

constructor(message = 'Circuit breaker OPEN') {
super(message);
this.name = 'CircuitBreakerOpenError';
}
}

export class CircuitBreaker {
private state: CircuitBreakerState = CircuitBreakerState.CLOSED;

Expand All @@ -61,6 +68,7 @@ export class CircuitBreaker {

private nextAttempt?: Date;

/** Number of in-flight requests in HALF_OPEN state (limits to 1 probe) */
private halfOpenInflight = 0;

private readonly config: CircuitBreakerConfig;
Expand All @@ -72,76 +80,79 @@ export class CircuitBreaker {
};
}

async execute<T>(operation: () => Promise<T>): Promise<T> {
const admitted = this.tryAdmit();
if (!admitted) {
throw new CircuitBreakerOpenError();
}

const { wasHalfOpenProbe } = admitted;

try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
} finally {
if (wasHalfOpenProbe && this.halfOpenInflight > 0) {
this.halfOpenInflight -= 1;
}
}
}

/**
* Synchronous admission check. Returning `null` means "reject". Returning
* an object means the caller is admitted; `wasHalfOpenProbe` indicates
* whether this admission consumed the single HALF_OPEN probe slot so the
* caller can decrement it in `finally`.
* Executes an operation with circuit breaker protection.
*
* Running this as a single synchronous block is what prevents the
* concurrent-probe race that existed in the previous implementation.
* @param operation The operation to execute
* @returns Promise resolving to the operation result
* @throws Error if circuit is OPEN or operation fails
*/
private tryAdmit(): { wasHalfOpenProbe: boolean } | null {
async execute<T>(operation: () => Promise<T>): Promise<T> {
const logger = this.context.getLogger();

// Check if circuit is open
if (this.state === CircuitBreakerState.OPEN) {
if (this.nextAttempt && Date.now() < this.nextAttempt.getTime()) {
return null;
throw new Error('Circuit breaker OPEN');
}
// Timeout expired, transition to HALF_OPEN
this.state = CircuitBreakerState.HALF_OPEN;
this.successCount = 0;
this.halfOpenInflight = 0;
logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN');
}

// In HALF_OPEN state, allow only one probe request at a time
if (this.state === CircuitBreakerState.HALF_OPEN && this.halfOpenInflight > 0) {
throw new Error('Circuit breaker OPEN');
}

if (this.state === CircuitBreakerState.HALF_OPEN) {
if (this.halfOpenInflight > 0) {
return null;
}
this.halfOpenInflight += 1;
return { wasHalfOpenProbe: true };
}

return { wasHalfOpenProbe: false };
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
} finally {
if (this.halfOpenInflight > 0) {
this.halfOpenInflight -= 1;
}
}
}

/**
* Gets the current state of the circuit breaker.
*/
getState(): CircuitBreakerState {
return this.state;
}

/**
* Gets the current failure count.
*/
getFailureCount(): number {
return this.failureCount;
}

/**
* Gets the current success count (relevant in HALF_OPEN state).
*/
getSuccessCount(): number {
return this.successCount;
}

/**
* Handles successful operation execution.
*/
private onSuccess(): void {
const logger = this.context.getLogger();

// Reset failure count on any success
this.failureCount = 0;

if (this.state === CircuitBreakerState.HALF_OPEN) {
Expand All @@ -152,6 +163,7 @@ export class CircuitBreaker {
);

if (this.successCount >= this.config.successThreshold) {
// Transition to CLOSED
this.state = CircuitBreakerState.CLOSED;
this.successCount = 0;
this.nextAttempt = undefined;
Expand All @@ -160,17 +172,23 @@ export class CircuitBreaker {
}
}

/**
* Handles failed operation execution.
*/
private onFailure(): void {
const logger = this.context.getLogger();

this.failureCount += 1;
this.successCount = 0;
this.successCount = 0; // Reset success count on failure

logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`);

// In HALF_OPEN state, any failure immediately reopens the circuit.
// In CLOSED state, reopen only after failureThreshold consecutive failures.
if (this.state === CircuitBreakerState.HALF_OPEN || this.failureCount >= this.config.failureThreshold) {
this.state = CircuitBreakerState.OPEN;
this.nextAttempt = new Date(Date.now() + this.config.timeout);
// Log at warn level for OPEN transitions — meaningful operational signal
logger.log(
LogLevel.warn,
`Telemetry circuit breaker OPEN after ${this.failureCount} failures (will retry after ${this.config.timeout}ms)`,
Expand All @@ -179,13 +197,25 @@ export class CircuitBreaker {
}
}

/**
* Manages circuit breakers per host.
* Ensures each host has its own isolated circuit breaker to prevent
* failures on one host from affecting telemetry to other hosts.
*/
export class CircuitBreakerRegistry {
private breakers: Map<string, CircuitBreaker>;

constructor(private context: IClientContext) {
this.breakers = new Map();
}

/**
* Gets or creates a circuit breaker for the specified host.
*
* @param host The host identifier (e.g., "workspace.cloud.databricks.com")
* @param config Optional configuration overrides
* @returns Circuit breaker for the host
*/
getCircuitBreaker(host: string, config?: Partial<CircuitBreakerConfig>): CircuitBreaker {
let breaker = this.breakers.get(host);
if (!breaker) {
Expand All @@ -200,16 +230,30 @@ export class CircuitBreakerRegistry {
return breaker;
}

/**
* Gets all registered circuit breakers.
* Useful for testing and diagnostics.
*/
getAllBreakers(): Map<string, CircuitBreaker> {
return new Map(this.breakers);
}

/**
* Removes a circuit breaker for the specified host.
* Useful for cleanup when a host is no longer in use.
*
* @param host The host identifier
*/
removeCircuitBreaker(host: string): void {
this.breakers.delete(host);
const logger = this.context.getLogger();
logger.log(LogLevel.debug, `Removed circuit breaker for host: ${host}`);
}

/**
* Clears all circuit breakers.
* Useful for testing.
*/
clear(): void {
this.breakers.clear();
}
Expand Down
Loading
Loading