Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ export interface ClientConfig {

useLZ4Compression: boolean;
enableMetricViewMetadata?: boolean;

// Telemetry configuration
telemetryEnabled?: boolean;
telemetryBatchSize?: number;
telemetryFlushIntervalMs?: number;
telemetryMaxRetries?: number;
telemetryAuthenticatedExport?: boolean;
telemetryCircuitBreakerThreshold?: number;
telemetryCircuitBreakerTimeout?: number;
}

export default interface IClientContext {
Expand Down
108 changes: 108 additions & 0 deletions lib/telemetry/ExceptionClassifier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Copyright (c) 2025 Databricks Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import AuthenticationError from '../errors/AuthenticationError';
import RetryError from '../errors/RetryError';

/**
* Classifies exceptions as terminal (unrecoverable) vs retryable.
*
* Terminal exceptions should be flushed immediately to telemetry,
* while retryable exceptions are buffered until statement completion.
*
* This follows the JDBC driver pattern of smart exception flushing
* to optimize telemetry export efficiency while ensuring critical
* errors are reported immediately.
*/
export default class ExceptionClassifier {
/**
* Determines if an exception is terminal (non-retryable).
*
* Terminal exceptions indicate unrecoverable failures that should
* be reported immediately, such as authentication failures, invalid
* requests, or resource not found errors.
*
* @param error - The error to classify
* @returns true if the error is terminal, false otherwise
*/
static isTerminal(error: Error): boolean {
// Check for AuthenticationError (terminal)
if (error instanceof AuthenticationError) {
return true;
}

// Check for HTTP status codes in error properties
// Supporting both 'statusCode' and 'status' property names for flexibility
const statusCode = (error as any).statusCode ?? (error as any).status;

if (typeof statusCode === 'number') {
// Terminal HTTP status codes:
// 400 - Bad Request (invalid request format)
// 401 - Unauthorized (authentication required)
// 403 - Forbidden (permission denied)
// 404 - Not Found (resource does not exist)
return statusCode === 400 ||
statusCode === 401 ||
statusCode === 403 ||
statusCode === 404;
}

// Default to false for unknown error types
return false;
}

/**
* Determines if an exception is retryable.
*
* Retryable exceptions indicate transient failures that may succeed
* on retry, such as rate limiting, server errors, or network timeouts.
*
* @param error - The error to classify
* @returns true if the error is retryable, false otherwise
*/
static isRetryable(error: Error): boolean {
// Check for RetryError (explicitly retryable)
if (error instanceof RetryError) {
return true;
}

// Check for network timeout errors
if (error.name === 'TimeoutError' || error.message.includes('timeout')) {
return true;
}

// Check for HTTP status codes in error properties
// Supporting both 'statusCode' and 'status' property names for flexibility
const statusCode = (error as any).statusCode ?? (error as any).status;

if (typeof statusCode === 'number') {
// Retryable HTTP status codes:
// 429 - Too Many Requests (rate limiting)
// 500 - Internal Server Error
// 502 - Bad Gateway
// 503 - Service Unavailable
// 504 - Gateway Timeout
return statusCode === 429 ||
statusCode === 500 ||
statusCode === 502 ||
statusCode === 503 ||
statusCode === 504;
}

// Default to false for unknown error types
return false;
}
}
253 changes: 253 additions & 0 deletions lib/telemetry/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/**
* Copyright (c) 2025 Databricks Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Event types emitted by the telemetry system
*/
export enum TelemetryEventType {
CONNECTION_OPEN = 'connection.open',
STATEMENT_START = 'statement.start',
STATEMENT_COMPLETE = 'statement.complete',
CLOUDFETCH_CHUNK = 'cloudfetch.chunk',
ERROR = 'error',
}

/**
* Configuration for telemetry components
*/
export interface TelemetryConfiguration {
/** Whether telemetry is enabled */
enabled?: boolean;

/** Maximum number of metrics to batch before flushing */
batchSize?: number;

/** Interval in milliseconds to flush metrics */
flushIntervalMs?: number;

/** Maximum retry attempts for export */
maxRetries?: number;

/** Whether to use authenticated export endpoint */
authenticatedExport?: boolean;

/** Circuit breaker failure threshold */
circuitBreakerThreshold?: number;

/** Circuit breaker timeout in milliseconds */
circuitBreakerTimeout?: number;
}

/**
* Default telemetry configuration values
*/
export const DEFAULT_TELEMETRY_CONFIG: Required<TelemetryConfiguration> = {
enabled: false, // Initially disabled for safe rollout
batchSize: 100,
flushIntervalMs: 5000,
maxRetries: 3,
authenticatedExport: true,
circuitBreakerThreshold: 5,
circuitBreakerTimeout: 60000, // 1 minute
};

/**
* Runtime telemetry event emitted by the driver
*/
export interface TelemetryEvent {
/** Type of the event */
eventType: TelemetryEventType | string;

/** Timestamp when the event occurred (milliseconds since epoch) */
timestamp: number;

/** Session ID for correlation */
sessionId?: string;

/** Statement ID for correlation */
statementId?: string;

// Connection-specific fields
/** Workspace ID */
workspaceId?: string;

/** Driver configuration */
driverConfig?: DriverConfiguration;

// Statement-specific fields
/** Type of operation (SELECT, INSERT, etc.) */
operationType?: string;

/** Execution latency in milliseconds */
latencyMs?: number;

/** Result format (inline, cloudfetch, arrow) */
resultFormat?: string;

/** Number of result chunks */
chunkCount?: number;

/** Total bytes downloaded */
bytesDownloaded?: number;

/** Number of poll operations */
pollCount?: number;

// CloudFetch-specific fields
/** Chunk index in the result set */
chunkIndex?: number;

/** Number of bytes in this chunk */
bytes?: number;

/** Whether compression was used */
compressed?: boolean;

// Error-specific fields
/** Error name/type */
errorName?: string;

/** Error message */
errorMessage?: string;

/** Whether the error is terminal (non-retryable) */
isTerminal?: boolean;
}

/**
* Aggregated telemetry metric for export to Databricks
*/
export interface TelemetryMetric {
/** Type of metric */
metricType: 'connection' | 'statement' | 'error';

/** Timestamp when the metric was created (milliseconds since epoch) */
timestamp: number;

/** Session ID for correlation */
sessionId?: string;

/** Statement ID for correlation */
statementId?: string;

/** Workspace ID */
workspaceId?: string;

/** Driver configuration (for connection metrics) */
driverConfig?: DriverConfiguration;

/** Execution latency in milliseconds */
latencyMs?: number;

/** Result format (inline, cloudfetch, arrow) */
resultFormat?: string;

/** Number of result chunks */
chunkCount?: number;

/** Total bytes downloaded */
bytesDownloaded?: number;

/** Number of poll operations */
pollCount?: number;

/** Error name/type */
errorName?: string;

/** Error message */
errorMessage?: string;
}

/**
* Driver configuration metadata collected once per connection
*/
export interface DriverConfiguration {
/** Driver version */
driverVersion: string;

/** Driver name */
driverName: string;

/** Node.js version */
nodeVersion: string;

/** Platform (linux, darwin, win32) */
platform: string;

/** OS version */
osVersion: string;

// Feature flags
/** Whether CloudFetch is enabled */
cloudFetchEnabled: boolean;

/** Whether LZ4 compression is enabled */
lz4Enabled: boolean;

/** Whether Arrow format is enabled */
arrowEnabled: boolean;

/** Whether direct results are enabled */
directResultsEnabled: boolean;

// Configuration values
/** Socket timeout in milliseconds */
socketTimeout: number;

/** Maximum retry attempts */
retryMaxAttempts: number;

/** Number of concurrent CloudFetch downloads */
cloudFetchConcurrentDownloads: number;
}

/**
* Per-statement metrics aggregated from multiple events
*/
export interface StatementMetrics {
/** Statement ID */
statementId: string;

/** Session ID */
sessionId: string;

/** Type of operation */
operationType?: string;

/** Start timestamp (milliseconds since epoch) */
startTime: number;

/** Total execution latency in milliseconds */
executionLatencyMs?: number;

/** Number of poll operations */
pollCount: number;

/** Total poll latency in milliseconds */
pollLatencyMs: number;

/** Result format (inline, cloudfetch, arrow) */
resultFormat?: string;

/** Number of CloudFetch chunks downloaded */
chunkCount: number;

/** Total bytes downloaded */
totalBytesDownloaded: number;

/** Whether compression was used */
compressionEnabled?: boolean;
}
Loading
Loading