Skip to content

Commit c9f9313

Browse files
committed
feat(core): implement EntropyTap for value extraction
- random(): uniform float [0.0, 1.0) - randint(): unbiased integers with rejection sampling - randbool(), choice(), shuffle(), sample() - gauss(): normal distribution via Box-Muller
1 parent 5495a76 commit c9f9313

1 file changed

Lines changed: 348 additions & 0 deletions

File tree

src/trueentropy/tap.py

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
# =============================================================================
2+
# TrueEntropy - Entropy Tap Module
3+
# =============================================================================
4+
#
5+
# The Tap is the "faucet" that extracts entropy from the pool and converts it
6+
# into usable random values. It provides the interface between the raw pool
7+
# bytes and the friendly random number API.
8+
#
9+
# Key Features:
10+
# - Uniform distribution: All random values are uniformly distributed
11+
# - Bias elimination: Uses rejection sampling for unbiased integer ranges
12+
# - Type conversion: Converts raw bytes to floats, ints, bools, etc.
13+
#
14+
# =============================================================================
15+
16+
"""
17+
Entropy Tap - extracts and formats random values from the pool.
18+
19+
This module provides the EntropyTap class that converts raw pool entropy
20+
into various random value types (float, int, bool, bytes, etc.).
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import struct
26+
from typing import Any, MutableSequence, Sequence, TypeVar
27+
28+
from trueentropy.pool import EntropyPool
29+
30+
31+
# Type variable for generic sequence operations
32+
T = TypeVar("T")
33+
34+
35+
class EntropyTap:
36+
"""
37+
Extracts and formats random values from an entropy pool.
38+
39+
The tap is responsible for converting raw entropy bytes into
40+
various formats like floats, integers, and booleans. It ensures
41+
that all generated values are uniformly distributed.
42+
43+
Example:
44+
>>> pool = EntropyPool()
45+
>>> tap = EntropyTap(pool)
46+
>>> value = tap.random()
47+
>>> print(f"Random: {value}")
48+
"""
49+
50+
# -------------------------------------------------------------------------
51+
# Initialization
52+
# -------------------------------------------------------------------------
53+
54+
def __init__(self, pool: EntropyPool) -> None:
55+
"""
56+
Initialize the tap with an entropy pool.
57+
58+
Args:
59+
pool: The EntropyPool instance to extract entropy from
60+
"""
61+
self._pool = pool
62+
63+
# -------------------------------------------------------------------------
64+
# Random Value Generation
65+
# -------------------------------------------------------------------------
66+
67+
def random(self) -> float:
68+
"""
69+
Generate a random float in the range [0.0, 1.0).
70+
71+
Uses 64 bits of entropy to generate a uniformly distributed
72+
floating-point number. The result is always less than 1.0.
73+
74+
Returns:
75+
A float value where 0.0 <= value < 1.0
76+
77+
How it works:
78+
1. Extract 8 bytes (64 bits) from the pool
79+
2. Interpret as unsigned 64-bit integer
80+
3. Divide by 2^64 to get value in [0, 1)
81+
"""
82+
# Extract 8 bytes of entropy
83+
raw_bytes = self._pool.extract(8)
84+
85+
# Unpack as unsigned 64-bit integer (big-endian)
86+
# We use big-endian for consistency across platforms
87+
value = struct.unpack("!Q", raw_bytes)[0]
88+
89+
# Convert to float in range [0.0, 1.0)
90+
# 2^64 = 18446744073709551616
91+
return value / 18446744073709551616.0
92+
93+
def randint(self, a: int, b: int) -> int:
94+
"""
95+
Generate a random integer N such that a <= N <= b.
96+
97+
Uses rejection sampling to ensure uniform distribution.
98+
This avoids modulo bias that would occur with simple modulo.
99+
100+
Args:
101+
a: Lower bound (inclusive)
102+
b: Upper bound (inclusive)
103+
104+
Returns:
105+
Random integer in [a, b]
106+
107+
Raises:
108+
ValueError: If a > b
109+
110+
How it works:
111+
1. Calculate the range size (b - a + 1)
112+
2. Find the smallest number of bits needed to represent range
113+
3. Generate random bits and check if value < range
114+
4. If not, reject and try again (rejection sampling)
115+
5. This ensures perfectly uniform distribution
116+
"""
117+
if a > b:
118+
raise ValueError(f"randint: a ({a}) must be <= b ({b})")
119+
120+
if a == b:
121+
return a # Only one possible value
122+
123+
# Calculate range size
124+
range_size = b - a + 1
125+
126+
# Find number of bits needed to represent range_size
127+
# We need ceil(log2(range_size)) bits
128+
bits_needed = (range_size - 1).bit_length()
129+
bytes_needed = (bits_needed + 7) // 8 # Round up to bytes
130+
131+
# Mask to extract only the bits we need
132+
# e.g., for range_size=100, bits_needed=7, mask=0x7F (127)
133+
mask = (1 << bits_needed) - 1
134+
135+
# Rejection sampling loop
136+
# We keep generating random values until we get one in range
137+
# Expected number of iterations is < 2 on average
138+
while True:
139+
# Extract random bytes
140+
raw_bytes = self._pool.extract(bytes_needed)
141+
142+
# Pad to 8 bytes for unpacking (big-endian)
143+
padded = raw_bytes.rjust(8, b"\x00")
144+
145+
# Unpack as unsigned 64-bit integer
146+
value = struct.unpack("!Q", padded)[0]
147+
148+
# Apply mask to get only needed bits
149+
value = value & mask
150+
151+
# Check if value is in valid range
152+
if value < range_size:
153+
return a + value
154+
155+
def randbool(self) -> bool:
156+
"""
157+
Generate a random boolean (True or False).
158+
159+
Each value has exactly 50% probability - a fair coin flip.
160+
161+
Returns:
162+
True or False with equal probability
163+
164+
How it works:
165+
1. Extract 1 byte from the pool
166+
2. Check the least significant bit
167+
3. Return True if bit is 1, False if 0
168+
"""
169+
# Extract 1 byte of entropy
170+
raw_byte = self._pool.extract(1)
171+
172+
# Check least significant bit
173+
return (raw_byte[0] & 1) == 1
174+
175+
def randbytes(self, n: int) -> bytes:
176+
"""
177+
Generate n random bytes.
178+
179+
Args:
180+
n: Number of bytes to generate (must be positive)
181+
182+
Returns:
183+
A bytes object of length n
184+
185+
Raises:
186+
ValueError: If n is not positive
187+
"""
188+
if n <= 0:
189+
raise ValueError(f"randbytes: n ({n}) must be positive")
190+
191+
return self._pool.extract(n)
192+
193+
def choice(self, seq: Sequence[T]) -> T:
194+
"""
195+
Return a random element from a non-empty sequence.
196+
197+
Each element has equal probability of being selected.
198+
199+
Args:
200+
seq: A non-empty sequence (list, tuple, string, etc.)
201+
202+
Returns:
203+
A randomly selected element
204+
205+
Raises:
206+
IndexError: If the sequence is empty
207+
"""
208+
if not seq:
209+
raise IndexError("Cannot choose from an empty sequence")
210+
211+
# Generate random index in valid range
212+
index = self.randint(0, len(seq) - 1)
213+
214+
return seq[index]
215+
216+
def shuffle(self, seq: MutableSequence[Any]) -> None:
217+
"""
218+
Shuffle a mutable sequence in-place.
219+
220+
Uses the Fisher-Yates (Knuth) shuffle algorithm, which produces
221+
a uniformly random permutation.
222+
223+
Args:
224+
seq: A mutable sequence to shuffle in-place
225+
226+
How it works:
227+
The Fisher-Yates algorithm:
228+
1. Start from the last element
229+
2. Swap it with a random element from index 0 to current
230+
3. Move to the previous element and repeat
231+
4. This produces every permutation with equal probability
232+
"""
233+
n = len(seq)
234+
235+
# Fisher-Yates shuffle
236+
# We iterate from the end to the beginning
237+
for i in range(n - 1, 0, -1):
238+
# Pick random index from [0, i]
239+
j = self.randint(0, i)
240+
241+
# Swap elements at i and j
242+
seq[i], seq[j] = seq[j], seq[i]
243+
244+
def sample(self, seq: Sequence[T], k: int) -> list[T]:
245+
"""
246+
Return a k-length list of unique elements from the sequence.
247+
248+
This implements random sampling without replacement - each
249+
element can only be selected once.
250+
251+
Args:
252+
seq: The sequence to sample from
253+
k: Number of unique elements to select
254+
255+
Returns:
256+
A list of k unique elements
257+
258+
Raises:
259+
ValueError: If k > len(seq) or k < 0
260+
261+
How it works:
262+
We use a modified Fisher-Yates algorithm that only
263+
shuffles the first k elements, then returns them.
264+
This is more efficient than shuffling the entire sequence.
265+
"""
266+
n = len(seq)
267+
268+
if k < 0:
269+
raise ValueError(f"sample: k ({k}) must be non-negative")
270+
271+
if k > n:
272+
raise ValueError(
273+
f"sample: k ({k}) is larger than sequence length ({n})"
274+
)
275+
276+
if k == 0:
277+
return []
278+
279+
# Create a copy of the sequence as a list
280+
# We only need to work with indices, so we create a pool
281+
pool = list(range(n))
282+
283+
# Partial Fisher-Yates: shuffle only k elements
284+
result: list[T] = []
285+
286+
for i in range(k):
287+
# Pick random index from remaining pool
288+
j = self.randint(i, n - 1)
289+
290+
# Swap to bring selected index to current position
291+
pool[i], pool[j] = pool[j], pool[i]
292+
293+
# Add the selected element to result
294+
result.append(seq[pool[i]])
295+
296+
return result
297+
298+
def uniform(self, a: float, b: float) -> float:
299+
"""
300+
Generate a random float N such that a <= N <= b.
301+
302+
Args:
303+
a: Lower bound
304+
b: Upper bound
305+
306+
Returns:
307+
Random float in [a, b]
308+
"""
309+
return a + self.random() * (b - a)
310+
311+
def gauss(self, mu: float = 0.0, sigma: float = 1.0) -> float:
312+
"""
313+
Generate a random float from the Gaussian (normal) distribution.
314+
315+
Uses the Box-Muller transform to convert uniform random numbers
316+
to normally distributed values.
317+
318+
Args:
319+
mu: Mean of the distribution (default: 0.0)
320+
sigma: Standard deviation (default: 1.0)
321+
322+
Returns:
323+
Random float from N(mu, sigma^2)
324+
"""
325+
import math
326+
327+
# Box-Muller transform
328+
# Generate two uniform random values in (0, 1)
329+
# We need them to be strictly > 0 to avoid log(0)
330+
u1 = self.random()
331+
while u1 == 0:
332+
u1 = self.random()
333+
334+
u2 = self.random()
335+
336+
# Transform to standard normal
337+
z0 = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
338+
339+
# Scale and shift to desired mean and standard deviation
340+
return mu + sigma * z0
341+
342+
# -------------------------------------------------------------------------
343+
# String Representation
344+
# -------------------------------------------------------------------------
345+
346+
def __repr__(self) -> str:
347+
"""Return string representation of the tap."""
348+
return f"EntropyTap(pool={self._pool!r})"

0 commit comments

Comments
 (0)