|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""A minimalistic sampler with an even more minimalistic GUI.""" |
| 3 | + |
| 4 | +import collections |
| 5 | +import math |
| 6 | +try: |
| 7 | + import tkinter as tk |
| 8 | +except ImportError: |
| 9 | + # Python 2.x |
| 10 | + import Tkinter as tk |
| 11 | + |
| 12 | +import rtmixer |
| 13 | +from tkhelper import TkKeyEventDebouncer |
| 14 | + |
| 15 | + |
| 16 | +HELPTEXT = 'Hold uppercase key for recording,\nlowercase for playback' |
| 17 | +REC_OFF = '#600' |
| 18 | +REC_ON = '#e00' |
| 19 | +BUFFER_DURATION = 0.1 # seconds |
| 20 | + |
| 21 | + |
| 22 | +class Sample(object): |
| 23 | + |
| 24 | + def __init__(self): |
| 25 | + elementsize = stream.samplesize[0] |
| 26 | + size = BUFFER_DURATION * stream.samplerate |
| 27 | + # Python 2.x doesn't have math.log2(), and it needs int(): |
| 28 | + size = 2**int(math.ceil(math.log(size, 2))) |
| 29 | + self.ringbuffer = rtmixer.RingBuffer(elementsize, size) |
| 30 | + self.buffer = bytearray() |
| 31 | + self.action = None |
| 32 | + |
| 33 | + |
| 34 | +class MiniSampler(tk.Tk, TkKeyEventDebouncer): |
| 35 | + |
| 36 | + def __init__(self, *args, **kwargs): |
| 37 | + tk.Tk.__init__(self, *args, **kwargs) |
| 38 | + TkKeyEventDebouncer.__init__(self) |
| 39 | + self.title('MiniSampler') |
| 40 | + tk.Label(self, text=HELPTEXT).pack(ipadx=20, ipady=20) |
| 41 | + self.rec_display = tk.Label(self, text='recording') |
| 42 | + self.rec_counter = tk.IntVar() |
| 43 | + self.rec_counter.trace(mode='w', callback=self.update_rec_display) |
| 44 | + self.rec_counter.set(0) |
| 45 | + self.rec_display.pack(padx=10, pady=10, ipadx=5, ipady=5) |
| 46 | + self.samples = collections.defaultdict(Sample) |
| 47 | + |
| 48 | + def update_rec_display(self, *args): |
| 49 | + if self.rec_counter.get() == 0: |
| 50 | + self.rec_display['bg'] = REC_OFF |
| 51 | + else: |
| 52 | + self.rec_display['bg'] = REC_ON |
| 53 | + |
| 54 | + def on_key_press(self, event): |
| 55 | + ch = event.char |
| 56 | + if ch.isupper(): |
| 57 | + sample = self.samples[ch.lower()] |
| 58 | + # TODO: check if we are already recording? check action? |
| 59 | + sample.ringbuffer.flush() |
| 60 | + sample.action = stream.record_ringbuffer(sample.ringbuffer) |
| 61 | + del sample.buffer[:] |
| 62 | + self.rec_counter.set(self.rec_counter.get() + 1) |
| 63 | + self.poll_ringbuffer(sample) |
| 64 | + elif ch in self.samples: |
| 65 | + sample = self.samples[ch] |
| 66 | + # TODO: can it be still recording or already playing? |
| 67 | + if sample.action in stream.actions: |
| 68 | + # TODO: the CANCEL action might still be active, which |
| 69 | + # shouldn't be a problem, right? |
| 70 | + print(ch, 'action still running:', sample.action.type) |
| 71 | + if ((sample.action is not None and |
| 72 | + sample.action.type != rtmixer._lib.CANCEL) or |
| 73 | + not sample.buffer): |
| 74 | + print(sample.action.type, len(sample.buffer)) |
| 75 | + raise RuntimeError('Unable to play') |
| 76 | + sample.action = stream.play_buffer(sample.buffer, channels=1) |
| 77 | + else: |
| 78 | + # TODO: handle special keys? |
| 79 | + pass |
| 80 | + |
| 81 | + def on_key_release(self, event): |
| 82 | + # NB: State of "shift" button may change between key press and release! |
| 83 | + ch = event.char.lower() |
| 84 | + if ch not in self.samples: |
| 85 | + return |
| 86 | + sample = self.samples[ch] |
| 87 | + # TODO: fade out (both recording and playback)? |
| 88 | + # TODO: is it possible that there is no action? |
| 89 | + assert sample.action |
| 90 | + # TODO: create a public API for that? |
| 91 | + if sample.action.type == rtmixer._lib.RECORD_RINGBUFFER: |
| 92 | + # TODO: check for errors/xruns? check for rinbuffer overflow? |
| 93 | + # Stop recording |
| 94 | + sample.action = stream.cancel(sample.action) |
| 95 | + # A CANCEL action is returned which is checked by poll_ringbuffer() |
| 96 | + elif sample.action.type == rtmixer._lib.PLAY_BUFFER: |
| 97 | + # TODO: check for errors/xruns? |
| 98 | + # Stop playback |
| 99 | + sample.action = stream.cancel(sample.action) |
| 100 | + # TODO: do something with sample.action? |
| 101 | + elif sample.action.type == rtmixer._lib.CANCEL: |
| 102 | + print('key {!r} released during CANCEL'.format(event.char)) |
| 103 | + else: |
| 104 | + print(event.char, sample.action) |
| 105 | + assert False |
| 106 | + |
| 107 | + def poll_ringbuffer(self, sample): |
| 108 | + # TODO: check for errors? is everything still working OK? |
| 109 | + assert sample.action, sample.action |
| 110 | + assert sample.action.type in (rtmixer._lib.RECORD_RINGBUFFER, |
| 111 | + rtmixer._lib.CANCEL), sample.action.type |
| 112 | + chunk = sample.ringbuffer.read() |
| 113 | + if chunk: |
| 114 | + sample.buffer.extend(chunk) |
| 115 | + |
| 116 | + if (sample.action.type == rtmixer._lib.CANCEL and |
| 117 | + sample.action not in stream.actions): |
| 118 | + # TODO: check for errors in CANCEL action? |
| 119 | + sample.action = None |
| 120 | + self.rec_counter.set(self.rec_counter.get() - 1) |
| 121 | + else: |
| 122 | + # Set polling rate based on input latency (which may change!): |
| 123 | + self.after(int(stream.latency[0] * 1000), |
| 124 | + self.poll_ringbuffer, sample) |
| 125 | + |
| 126 | + |
| 127 | +app = MiniSampler() |
| 128 | +with rtmixer.MixerAndRecorder(channels=1) as stream: |
| 129 | + app.mainloop() |
0 commit comments