Extract Session, finish async migration#11
Merged
Conversation
The previous implementation used Async::Queue for reply/app-complete channels and Async::Semaphore for command serialization. This was heavier than needed — each command only ever has one pending response. Replace with arrays of Async::Promise. Each send_message/execute_app creates its own promise and pushes it onto an array. The reader fiber resolves them in FIFO order. No mutex needed — cooperative fiber scheduling guarantees the array push happens before the yield point. Also: - Rename send_message/read_message to send_data/receive_data on Protocol::Connection for consistent EM-style naming - Move execute_app to Base so both Inbound and Outbound can execute applications (Outbound's application method passes session UUID) - Extract CommandDelegate to its own file - Use promise.reject in connection_closed instead of nil sentinel - Remove update_session (session is updated automatically) - Remove silent rescue in Outbound#run_session — errors propagate to Client/Server which already handle them - Add ResponseError to Client reconnect loop
- Extract setup_outbound/teardown_outbound helpers to eliminate duplicated 7-line setup across 8+ test classes - Rename test methods to read as prose (e.g. test_dispatches_matching_event_hooks) - Rename files/classes to reflect current behavior: outbound_non_nested_test → outbound_data_passing_test outbound_session_test → outbound_handshake_test OutboundListenerWithNestedApps → OutboundListenerWithSequentialApps - Remove dead code (attr_reader :queue, update_session tests) - Fix stale comments referencing queues instead of promises
The "Two fibers per connection" section still referenced Async::Queue and Async::Semaphore. Updated to describe the Async::Promise array pattern and promise rejection on disconnect. Also updated the sendmsg example to show the UUID parameter.
ConnectionError is expected when either side hangs up — the read loop closes, connection_closed rejects pending promises, and the error surfaces in run_session or event hook fibers. Previously this was silently swallowed by a blanket rescue in Outbound#run_session (removed in the promise refactor), causing it to propagate to Server#accept's catch-all and log noisy "Session error" messages. Fix by rescuing ConnectionError in three targeted places: - Server#handle_session and Client#handle_session — suppresses the error from run_session so it doesn't reach the generic error handler - Base#connection_closed — suppresses the error from event_barrier.wait when event hooks were mid-command at disconnect Consumers no longer need to rescue ConnectionError in session_initiated.
Listeners no longer need to know about IO::Endpoint or default ports. Client.start and Server.start handle endpoint creation, and listeners just delegate: Client.start(self, **options). Runner uses barrier.async to wrap the blocking start call.
execute_app now stores promises in a hash keyed by UUID instead of an array. CHANNEL_EXECUTE_COMPLETE events resolve the promise matching their unique_id. This is more correct and enables concurrent app execution on different channels (e.g. inbound mode).
execute_app now generates a SecureRandom.uuid and sends it as the Event-UUID header on sendmsg. FreeSWITCH echoes this back as Application-UUID in CHANNEL_EXECUTE_COMPLETE, which is used to resolve the correct promise. This replaces matching by channel UUID and properly correlates each individual app execution. Also: - Defaults go first in header merge so params can override (e.g. event_lock: false) - Use IO::Stream flush: true instead of separate flush call - Remove redundant closed? guard in Connection#close (IO::Stream already guards)
CommandSocket called the old send_message/read_message methods on Protocol::Connection which were renamed to send_data/receive_data. Also reorder application(app, uuid, args) to match execute_app. Add functional tests that verify auth handshake, API commands, and sendmsg application execution against a real TCP server.
CommandSocket#application had defaults overwriting caller params (params.merge(defaults)). Reversed to defaults.merge(params) so overrides like event_lock: false work, matching Base#execute_app. Server.start passed **options to the endpoint instead of to new, so listener options were lost. Now symmetric with Client.start.
The promise must be in @app_promises before send_message sends data, because the read loop could process CHANNEL_EXECUTE_COMPLETE before this fiber resumes. If FreeSWITCH sends the ack and completion back-to-back and both are buffered, read_until returns without yielding and the completion event would be lost.
Client#handle_session and Server#handle_session were identical and the Server's read loop knew about disconnect-notice and CHANNEL_HANGUP_COMPLETE — outbound-listener policy leaking into the transport supervisor. Extract the per-connection choreography (reader fiber + session fiber + teardown) into Librevox::Session. Both Client and Server now just manage their endpoint and hand each new socket to Session.new(...).run. Move the end-of-session detection into Listener::Outbound via a #session_complete? predicate that Session checks after each dispatched message. The reader exits cleanly without close_write, so in-flight event hooks drain via the barrier before the socket is actually closed — matching the previous behaviour. Also drops the dead `task.children.each(&:wait)` in Server#run: verified against io-endpoint 0.17.2 that Wrapper#accept is an infinite loop which spawns a fiber per connection via Fiber.schedule and never returns, so the trailing line was unreachable.
- Protocol::Connection#read_loop → each_message (returns Enumerator when called without a block). - Listener::Base#receive_data → receive_message (it takes a parsed Response, not bytes; old name was EventMachine baggage). - Listener::Inbound#initialize / Outbound#initialize use kwargs instead of positional args-hashes. Client/Server splat @options through. - CommandSocket#initialize takes kwargs. - Normalize Protocol::Connection#close and #close_write rescue lists to the same set (IOError + the common Errno family). - Drop Librevox.reopen_log (no trap ever wired it up) and the SIGHUP paragraph from the README.
- Drop redundant response.event? guards in Outbound — response.event already returns nil when not an event, so the && is dead weight. - attr :endpoint → attr_reader :endpoint on Server. - Drop ConnectionError from Client#run's rescue list — Session#run already catches it internally, so it never escapes. - Normalize error logging to #message on both Client and Server (full_message was noisy on routine reconnects). - Drop the enum_for fallback on Connection#each_message — no caller uses the block-less form and the stream is not re-iterable anyway. - Document the #session_complete? contract on Base so future listeners don't put expensive work in the hot path. The reader&.stop safe-nav in Session is kept: ensure can fire before reader is assigned when a test hard-kills the thread mid-run.
The FIFO @reply_promises invariant relied on push-then-send being atomic up to the stream's flush yield. Make it explicit with an Async::Semaphore around the enqueue+send critical section. The lock deliberately excludes promise.wait, so a slow reply does not queue the next sender behind it. Adds unit tests with a connection that yields mid-send (the real stream does not, but this guards the lock's logic) covering: no mid-send interleaving, FIFO reply routing under concurrency, UUID-keyed app completion under reversed order, and disconnect unblocking pending senders.
Drives Base#send_message -> Protocol::Connection -> Socket.pair -> real receive_data framing on the peer, with replies routed back through the listener's reader. Closes the gap where nothing exercised Connection over a live fd.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Completes the EventMachine → async (fiber-based I/O) migration and extracts a dedicated
Sessionabstraction. One linear stack of 15 commits on top ofmaster, grouped into three logical phases.1. Promise-based core
Replaces the
Async::Queue/Semaphoreplumbing with plainAsync::Promisearrays for reply routing, refreshes the test suite, and updates the README to match.2. Lifecycle & correlation fixes
sendmsgwithCHANNEL_EXECUTE_COMPLETEviaEvent-UUIDinstead of FIFO ordering, and create the app promise before sending (closes a latent ordering race).Client/Server.start.ConnectionErroron disconnect.CommandSocketafter the protocol rename (header merge order,Serveroption forwarding) with tests.3. Session extraction
Session, moving disconnect policy intoOutbound.@reply_promisesFIFO order, with concurrency tests.Testing
rake test