Skip to content

Commit 5495a76

Browse files
committed
feat(core): implement EntropyPool with SHA-256 mixing
- Thread-safe entropy accumulator with 4096-bit buffer - Cryptographic mixing using SHA-256 hash function - Forward secrecy: past states cannot be recovered - Automatic entropy estimation tracking
1 parent 9be2366 commit 5495a76

1 file changed

Lines changed: 335 additions & 0 deletions

File tree

src/trueentropy/pool.py

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
# =============================================================================
2+
# TrueEntropy - Entropy Pool Module
3+
# =============================================================================
4+
#
5+
# This module implements the core Entropy Pool - the heart of the TrueEntropy
6+
# library. The pool is a cryptographically secure buffer that accumulates
7+
# entropy from various sources and provides it for random number generation.
8+
#
9+
# How it works:
10+
# 1. The pool starts with an initial seed from os.urandom()
11+
# 2. New entropy is fed into the pool via the feed() method
12+
# 3. Each feed operation mixes the new data with existing pool state
13+
# using SHA-256 hashing (whitening)
14+
# 4. The extract() method pulls entropy out of the pool
15+
# 5. After extraction, the pool state is updated to prevent reuse
16+
#
17+
# Security Properties:
18+
# - Forward secrecy: Old pool states cannot be recovered
19+
# - Avalanche effect: Small input changes completely change output
20+
# - Thread safety: All operations are protected by locks
21+
#
22+
# =============================================================================
23+
24+
"""
25+
Entropy Pool implementation for TrueEntropy.
26+
27+
The pool maintains a buffer of accumulated entropy that is mixed using
28+
SHA-256 hashing. This ensures uniform distribution and makes it impossible
29+
to predict future outputs or recover past states.
30+
"""
31+
32+
from __future__ import annotations
33+
34+
import hashlib
35+
import os
36+
import struct
37+
import threading
38+
import time
39+
from typing import Optional
40+
41+
42+
class EntropyPool:
43+
"""
44+
Cryptographically secure entropy accumulator.
45+
46+
The EntropyPool class maintains a buffer of entropy that is continuously
47+
fed from various sources. All incoming data is mixed using SHA-256
48+
hashing to ensure uniform distribution.
49+
50+
Attributes:
51+
POOL_SIZE: Size of the entropy pool in bytes (default: 512 = 4096 bits)
52+
MIN_ENTROPY_THRESHOLD: Minimum bits before warning (default: 256)
53+
54+
Example:
55+
>>> pool = EntropyPool()
56+
>>> pool.feed(b"some random data")
57+
>>> entropy = pool.extract(32)
58+
>>> print(len(entropy))
59+
32
60+
"""
61+
62+
# -------------------------------------------------------------------------
63+
# Class Constants
64+
# -------------------------------------------------------------------------
65+
66+
# Pool size in bytes (512 bytes = 4096 bits)
67+
# This is large enough to provide plenty of entropy while being
68+
# small enough to fit in CPU cache for fast operations
69+
POOL_SIZE: int = 512
70+
71+
# Minimum entropy threshold before issuing warnings
72+
# If the pool drops below this level, extraction will log warnings
73+
MIN_ENTROPY_THRESHOLD: int = 256
74+
75+
# Hash output size (SHA-256 = 32 bytes)
76+
HASH_SIZE: int = 32
77+
78+
# -------------------------------------------------------------------------
79+
# Initialization
80+
# -------------------------------------------------------------------------
81+
82+
def __init__(self, seed: Optional[bytes] = None) -> None:
83+
"""
84+
Initialize a new entropy pool.
85+
86+
Args:
87+
seed: Optional initial seed bytes. If not provided, the pool
88+
will be seeded from os.urandom() for cryptographic security.
89+
"""
90+
# Initialize the pool with random data
91+
# We use os.urandom() as the initial seed because it provides
92+
# cryptographically secure random bytes from the OS
93+
if seed is not None:
94+
# Use provided seed (useful for testing with deterministic output)
95+
initial = seed
96+
else:
97+
# Use OS-provided entropy for production use
98+
initial = os.urandom(self.POOL_SIZE)
99+
100+
# Expand seed to full pool size if needed
101+
self._pool: bytes = self._expand_to_pool_size(initial)
102+
103+
# Track estimated entropy bits in the pool
104+
# This is a conservative estimate based on input sources
105+
self._entropy_bits: int = self.POOL_SIZE * 8
106+
107+
# Thread lock for safe concurrent access
108+
# This ensures that feed() and extract() can be called from
109+
# multiple threads without corrupting the pool state
110+
self._lock: threading.Lock = threading.Lock()
111+
112+
# Timestamp of last feed operation (for health monitoring)
113+
self._last_feed_time: float = time.time()
114+
115+
# Counter for total bytes fed into the pool
116+
self._total_fed: int = 0
117+
118+
# Counter for total bytes extracted from the pool
119+
self._total_extracted: int = 0
120+
121+
# -------------------------------------------------------------------------
122+
# Public Methods
123+
# -------------------------------------------------------------------------
124+
125+
def feed(self, data: bytes, entropy_estimate: int = 0) -> None:
126+
"""
127+
Feed new entropy into the pool.
128+
129+
The incoming data is mixed with the current pool state using SHA-256
130+
hashing. This process is called "whitening" - it ensures that even
131+
low-quality input data contributes meaningfully to the pool.
132+
133+
The mixing formula is:
134+
new_state = expand(SHA256(old_state || data || timestamp))
135+
136+
Args:
137+
data: Raw bytes to feed into the pool. Can be any length.
138+
entropy_estimate: Estimated bits of entropy in the data.
139+
This is used for health monitoring only and
140+
does not affect the mixing process.
141+
142+
Example:
143+
>>> pool = EntropyPool()
144+
>>> pool.feed(b"network latency: 45ms", entropy_estimate=8)
145+
"""
146+
if not data:
147+
return # Nothing to feed
148+
149+
with self._lock:
150+
# Get high-precision timestamp for additional mixing
151+
# This adds unpredictability even if the input data is known
152+
timestamp = struct.pack("!d", time.time())
153+
154+
# Concatenate: current pool + new data + timestamp
155+
# The timestamp ensures that even identical data fed at
156+
# different times produces different pool states
157+
mix_input = self._pool + data + timestamp
158+
159+
# Hash the concatenation using SHA-256
160+
# This produces a 32-byte digest with the avalanche property:
161+
# changing a single bit in the input flips ~50% of output bits
162+
hash_digest = hashlib.sha256(mix_input).digest()
163+
164+
# Expand the hash to fill the entire pool
165+
# We do this by repeatedly hashing with a counter
166+
self._pool = self._expand_to_pool_size(hash_digest)
167+
168+
# Update entropy estimate
169+
# We add the estimated entropy but cap at pool size
170+
self._entropy_bits = min(
171+
self._entropy_bits + entropy_estimate,
172+
self.POOL_SIZE * 8
173+
)
174+
175+
# Update statistics
176+
self._last_feed_time = time.time()
177+
self._total_fed += len(data)
178+
179+
def extract(self, num_bytes: int) -> bytes:
180+
"""
181+
Extract entropy from the pool.
182+
183+
This method securely extracts random bytes from the pool. After
184+
extraction, the pool state is updated to prevent the same bytes
185+
from ever being extracted again (forward secrecy).
186+
187+
Args:
188+
num_bytes: Number of bytes to extract (must be positive)
189+
190+
Returns:
191+
Random bytes of the requested length
192+
193+
Raises:
194+
ValueError: If num_bytes is not positive
195+
196+
Example:
197+
>>> pool = EntropyPool()
198+
>>> random_bytes = pool.extract(32)
199+
>>> print(len(random_bytes))
200+
32
201+
"""
202+
if num_bytes <= 0:
203+
raise ValueError("num_bytes must be positive")
204+
205+
with self._lock:
206+
# Generate output by hashing pool with extraction counter
207+
# This ensures forward secrecy: knowing the output doesn't
208+
# reveal the pool state or allow prediction of future outputs
209+
210+
result = b""
211+
counter = 0
212+
213+
while len(result) < num_bytes:
214+
# Create extraction hash input:
215+
# pool || counter || "extract" marker
216+
extract_input = (
217+
self._pool +
218+
struct.pack("!Q", counter) +
219+
b"extract"
220+
)
221+
222+
# Generate hash
223+
hash_output = hashlib.sha256(extract_input).digest()
224+
result += hash_output
225+
counter += 1
226+
227+
# Trim to exact requested size
228+
result = result[:num_bytes]
229+
230+
# Update pool state to prevent reuse (forward secrecy)
231+
# We mix the extraction operation back into the pool
232+
update_input = self._pool + result + b"update"
233+
self._pool = self._expand_to_pool_size(
234+
hashlib.sha256(update_input).digest()
235+
)
236+
237+
# Decrease entropy estimate
238+
# We assume each extracted bit removes one bit of entropy
239+
self._entropy_bits = max(0, self._entropy_bits - (num_bytes * 8))
240+
241+
# Update statistics
242+
self._total_extracted += num_bytes
243+
244+
return result
245+
246+
def reseed(self) -> None:
247+
"""
248+
Reseed the pool with fresh OS entropy.
249+
250+
This is useful if you want to completely refresh the pool state,
251+
for example after a fork() or if you suspect the pool has been
252+
compromised.
253+
254+
Example:
255+
>>> pool = EntropyPool()
256+
>>> pool.reseed()
257+
"""
258+
fresh_entropy = os.urandom(self.POOL_SIZE)
259+
self.feed(fresh_entropy, entropy_estimate=self.POOL_SIZE * 8)
260+
261+
# -------------------------------------------------------------------------
262+
# Properties for Monitoring
263+
# -------------------------------------------------------------------------
264+
265+
@property
266+
def entropy_bits(self) -> int:
267+
"""
268+
Get the estimated number of entropy bits in the pool.
269+
270+
This is a conservative estimate based on the entropy fed into
271+
the pool minus the entropy extracted. The actual entropy may
272+
be higher due to the mixing process.
273+
"""
274+
with self._lock:
275+
return self._entropy_bits
276+
277+
@property
278+
def last_feed_time(self) -> float:
279+
"""Get the timestamp of the last feed operation."""
280+
with self._lock:
281+
return self._last_feed_time
282+
283+
@property
284+
def total_fed(self) -> int:
285+
"""Get the total number of bytes fed into the pool."""
286+
with self._lock:
287+
return self._total_fed
288+
289+
@property
290+
def total_extracted(self) -> int:
291+
"""Get the total number of bytes extracted from the pool."""
292+
with self._lock:
293+
return self._total_extracted
294+
295+
# -------------------------------------------------------------------------
296+
# Private Methods
297+
# -------------------------------------------------------------------------
298+
299+
def _expand_to_pool_size(self, data: bytes) -> bytes:
300+
"""
301+
Expand a small amount of data to fill the entire pool.
302+
303+
We use a counter-mode expansion: hash(data || counter) for each
304+
block needed. This is similar to how HKDF-Expand works.
305+
306+
Args:
307+
data: The seed data to expand
308+
309+
Returns:
310+
Bytes of exactly POOL_SIZE length
311+
"""
312+
result = b""
313+
counter = 0
314+
315+
while len(result) < self.POOL_SIZE:
316+
# Hash: data || counter
317+
expand_input = data + struct.pack("!Q", counter)
318+
result += hashlib.sha256(expand_input).digest()
319+
counter += 1
320+
321+
# Trim to exact pool size
322+
return result[:self.POOL_SIZE]
323+
324+
# -------------------------------------------------------------------------
325+
# String Representation
326+
# -------------------------------------------------------------------------
327+
328+
def __repr__(self) -> str:
329+
"""Return a string representation of the pool."""
330+
return (
331+
f"EntropyPool("
332+
f"entropy_bits={self.entropy_bits}, "
333+
f"total_fed={self.total_fed}, "
334+
f"total_extracted={self.total_extracted})"
335+
)

0 commit comments

Comments
 (0)