Skip to content

Commit 1adefe5

Browse files
committed
feat(harvesters): add TimingHarvester for CPU jitter
- Measures nanosecond variations in code execution - Uses time.perf_counter_ns() for high precision - Entropy from OS scheduler and cache effects
1 parent 5c260b9 commit 1adefe5

1 file changed

Lines changed: 195 additions & 0 deletions

File tree

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
# =============================================================================
2+
# TrueEntropy - Timing Harvester
3+
# =============================================================================
4+
#
5+
# This harvester collects entropy from CPU timing jitter - the unpredictable
6+
# variations in how long code takes to execute.
7+
#
8+
# Why Timing is Random:
9+
# - The OS scheduler interrupts processes unpredictably
10+
# - Cache hits/misses vary based on system state
11+
# - CPU frequency may fluctuate (power management)
12+
# - Other processes compete for resources
13+
#
14+
# Collection Method:
15+
# 1. Run a simple operation multiple times
16+
# 2. Measure the time for each iteration using high-precision counter
17+
# 3. The nanosecond-level variations are our entropy
18+
#
19+
# Entropy Estimate:
20+
# - Conservative: ~2-4 bits per timing sample
21+
# - We collect many samples and use the least significant bits
22+
#
23+
# =============================================================================
24+
25+
"""
26+
Timing-based entropy harvester.
27+
28+
Collects entropy from CPU timing jitter by measuring the execution
29+
time of simple operations at nanosecond precision.
30+
"""
31+
32+
from __future__ import annotations
33+
34+
import struct
35+
import time
36+
from typing import List
37+
38+
from trueentropy.harvesters.base import BaseHarvester, HarvestResult
39+
40+
41+
class TimingHarvester(BaseHarvester):
42+
"""
43+
Harvests entropy from CPU timing jitter.
44+
45+
This harvester measures the execution time of simple operations
46+
using a high-precision timer. The nanosecond-level variations
47+
come from:
48+
49+
- OS scheduler interrupts
50+
- Cache effects
51+
- CPU frequency scaling
52+
- Other system activity
53+
54+
Attributes:
55+
num_samples: Number of timing samples to collect (default: 64)
56+
operation_size: Size of list operation for each sample (default: 10)
57+
58+
Example:
59+
>>> harvester = TimingHarvester()
60+
>>> result = harvester.collect()
61+
>>> print(f"Collected {len(result.data)} bytes, "
62+
... f"estimated {result.entropy_bits} bits")
63+
"""
64+
65+
# -------------------------------------------------------------------------
66+
# Configuration
67+
# -------------------------------------------------------------------------
68+
69+
def __init__(
70+
self,
71+
num_samples: int = 64,
72+
operation_size: int = 10
73+
) -> None:
74+
"""
75+
Initialize the timing harvester.
76+
77+
Args:
78+
num_samples: Number of timing measurements to collect.
79+
More samples = more entropy but slower.
80+
operation_size: Size of the list created in each timing operation.
81+
Affects timing variability.
82+
"""
83+
self._num_samples = num_samples
84+
self._operation_size = operation_size
85+
86+
# -------------------------------------------------------------------------
87+
# BaseHarvester Implementation
88+
# -------------------------------------------------------------------------
89+
90+
@property
91+
def name(self) -> str:
92+
"""Return harvester name."""
93+
return "timing"
94+
95+
def collect(self) -> HarvestResult:
96+
"""
97+
Collect entropy from timing jitter.
98+
99+
Process:
100+
1. Run self._num_samples timing measurements
101+
2. Each measurement times a simple list creation operation
102+
3. Collect the nanosecond-precision timestamps
103+
4. Pack the timing deltas into bytes
104+
5. Extract the most variable bits for entropy
105+
106+
Returns:
107+
HarvestResult containing timing entropy
108+
"""
109+
# Collect timing samples
110+
timing_samples = self._collect_timing_samples()
111+
112+
# Convert samples to bytes
113+
# We use the full nanosecond values for maximum entropy
114+
data = self._samples_to_bytes(timing_samples)
115+
116+
# Estimate entropy
117+
# Conservative: 2 bits per sample (the jitter is in least sig bits)
118+
entropy_bits = self._num_samples * 2
119+
120+
return HarvestResult(
121+
data=data,
122+
entropy_bits=entropy_bits,
123+
source=self.name,
124+
success=True
125+
)
126+
127+
# -------------------------------------------------------------------------
128+
# Private Methods
129+
# -------------------------------------------------------------------------
130+
131+
def _collect_timing_samples(self) -> List[int]:
132+
"""
133+
Collect timing samples using high-precision counter.
134+
135+
We measure the time to perform a simple operation (creating a list)
136+
at nanosecond precision. The variations come from:
137+
- CPU scheduling
138+
- Memory allocation
139+
- Cache behavior
140+
- System load
141+
142+
Returns:
143+
List of timing deltas in nanoseconds
144+
"""
145+
samples: List[int] = []
146+
147+
for _ in range(self._num_samples):
148+
# Record start time with nanosecond precision
149+
# time.perf_counter_ns() is the highest resolution timer available
150+
start = time.perf_counter_ns()
151+
152+
# Perform a simple operation
153+
# Creating a list involves memory allocation which varies
154+
_ = [None] * self._operation_size
155+
156+
# Record end time
157+
end = time.perf_counter_ns()
158+
159+
# Store the delta (execution time)
160+
samples.append(end - start)
161+
162+
return samples
163+
164+
def _samples_to_bytes(self, samples: List[int]) -> bytes:
165+
"""
166+
Convert timing samples to a bytes object.
167+
168+
We pack each timing value as an unsigned 64-bit integer.
169+
This preserves all the information in the samples.
170+
171+
Args:
172+
samples: List of timing values in nanoseconds
173+
174+
Returns:
175+
Bytes representation of the samples
176+
"""
177+
# Pack as unsigned 64-bit integers (big-endian)
178+
# Each sample is 8 bytes, so total is num_samples * 8 bytes
179+
return struct.pack(f"!{len(samples)}Q", *samples)
180+
181+
# -------------------------------------------------------------------------
182+
# Configuration Properties
183+
# -------------------------------------------------------------------------
184+
185+
@property
186+
def num_samples(self) -> int:
187+
"""Get the number of samples collected per harvest."""
188+
return self._num_samples
189+
190+
@num_samples.setter
191+
def num_samples(self, value: int) -> None:
192+
"""Set the number of samples."""
193+
if value < 1:
194+
raise ValueError("num_samples must be at least 1")
195+
self._num_samples = value

0 commit comments

Comments
 (0)