Skip to content

Extract Session, finish async migration#11

Merged
henrikbjorn merged 15 commits into
masterfrom
extract-session
Jun 17, 2026
Merged

Extract Session, finish async migration#11
henrikbjorn merged 15 commits into
masterfrom
extract-session

Conversation

@henrikbjorn

Copy link
Copy Markdown
Member

Summary

Completes the EventMachine → async (fiber-based I/O) migration and extracts a dedicated Session abstraction. One linear stack of 15 commits on top of master, grouped into three logical phases.

1. Promise-based core

Replaces the Async::Queue/Semaphore plumbing with plain Async::Promise arrays for reply routing, refreshes the test suite, and updates the README to match.

2. Lifecycle & correlation fixes

  • Correlate sendmsg with CHANNEL_EXECUTE_COMPLETE via Event-UUID instead of FIFO ordering, and create the app promise before sending (closes a latent ordering race).
  • Move endpoint creation and default ports into Client/Server.start.
  • Suppress expected ConnectionError on disconnect.
  • Fix CommandSocket after the protocol rename (header merge order, Server option forwarding) with tests.

3. Session extraction

  • Extract Session, moving disconnect policy into Outbound.
  • Serialize sends behind a write-lock so wire order matches @reply_promises FIFO order, with concurrency tests.
  • Add a real-socket integration test covering framing and reply routing.

Testing

rake test

The previous implementation used Async::Queue for reply/app-complete
channels and Async::Semaphore for command serialization. This was
heavier than needed — each command only ever has one pending response.

Replace with arrays of Async::Promise. Each send_message/execute_app
creates its own promise and pushes it onto an array. The reader fiber
resolves them in FIFO order. No mutex needed — cooperative fiber
scheduling guarantees the array push happens before the yield point.

Also:
- Rename send_message/read_message to send_data/receive_data on
  Protocol::Connection for consistent EM-style naming
- Move execute_app to Base so both Inbound and Outbound can execute
  applications (Outbound's application method passes session UUID)
- Extract CommandDelegate to its own file
- Use promise.reject in connection_closed instead of nil sentinel
- Remove update_session (session is updated automatically)
- Remove silent rescue in Outbound#run_session — errors propagate
  to Client/Server which already handle them
- Add ResponseError to Client reconnect loop
- Extract setup_outbound/teardown_outbound helpers to eliminate
  duplicated 7-line setup across 8+ test classes
- Rename test methods to read as prose (e.g. test_dispatches_matching_event_hooks)
- Rename files/classes to reflect current behavior:
  outbound_non_nested_test → outbound_data_passing_test
  outbound_session_test → outbound_handshake_test
  OutboundListenerWithNestedApps → OutboundListenerWithSequentialApps
- Remove dead code (attr_reader :queue, update_session tests)
- Fix stale comments referencing queues instead of promises
The "Two fibers per connection" section still referenced Async::Queue
and Async::Semaphore. Updated to describe the Async::Promise array
pattern and promise rejection on disconnect. Also updated the sendmsg
example to show the UUID parameter.
ConnectionError is expected when either side hangs up — the read loop
closes, connection_closed rejects pending promises, and the error
surfaces in run_session or event hook fibers. Previously this was
silently swallowed by a blanket rescue in Outbound#run_session (removed
in the promise refactor), causing it to propagate to Server#accept's
catch-all and log noisy "Session error" messages.

Fix by rescuing ConnectionError in three targeted places:
- Server#handle_session and Client#handle_session — suppresses the
  error from run_session so it doesn't reach the generic error handler
- Base#connection_closed — suppresses the error from event_barrier.wait
  when event hooks were mid-command at disconnect

Consumers no longer need to rescue ConnectionError in session_initiated.
Listeners no longer need to know about IO::Endpoint or default ports.
Client.start and Server.start handle endpoint creation, and listeners
just delegate: Client.start(self, **options). Runner uses barrier.async
to wrap the blocking start call.
execute_app now stores promises in a hash keyed by UUID instead of an
array. CHANNEL_EXECUTE_COMPLETE events resolve the promise matching
their unique_id. This is more correct and enables concurrent app
execution on different channels (e.g. inbound mode).
execute_app now generates a SecureRandom.uuid and sends it as the
Event-UUID header on sendmsg. FreeSWITCH echoes this back as
Application-UUID in CHANNEL_EXECUTE_COMPLETE, which is used to
resolve the correct promise. This replaces matching by channel UUID
and properly correlates each individual app execution.

Also:
- Defaults go first in header merge so params can override (e.g.
  event_lock: false)
- Use IO::Stream flush: true instead of separate flush call
- Remove redundant closed? guard in Connection#close (IO::Stream
  already guards)
CommandSocket called the old send_message/read_message methods on
Protocol::Connection which were renamed to send_data/receive_data.
Also reorder application(app, uuid, args) to match execute_app.

Add functional tests that verify auth handshake, API commands, and
sendmsg application execution against a real TCP server.
CommandSocket#application had defaults overwriting caller params
(params.merge(defaults)). Reversed to defaults.merge(params) so
overrides like event_lock: false work, matching Base#execute_app.

Server.start passed **options to the endpoint instead of to new,
so listener options were lost. Now symmetric with Client.start.
The promise must be in @app_promises before send_message sends data,
because the read loop could process CHANNEL_EXECUTE_COMPLETE before
this fiber resumes. If FreeSWITCH sends the ack and completion
back-to-back and both are buffered, read_until returns without
yielding and the completion event would be lost.
Client#handle_session and Server#handle_session were identical and
the Server's read loop knew about disconnect-notice and
CHANNEL_HANGUP_COMPLETE — outbound-listener policy leaking into the
transport supervisor.

Extract the per-connection choreography (reader fiber + session fiber
+ teardown) into Librevox::Session. Both Client and Server now just
manage their endpoint and hand each new socket to Session.new(...).run.

Move the end-of-session detection into Listener::Outbound via a
#session_complete? predicate that Session checks after each dispatched
message. The reader exits cleanly without close_write, so in-flight
event hooks drain via the barrier before the socket is actually closed
— matching the previous behaviour.

Also drops the dead `task.children.each(&:wait)` in Server#run: verified
against io-endpoint 0.17.2 that Wrapper#accept is an infinite loop
which spawns a fiber per connection via Fiber.schedule and never
returns, so the trailing line was unreachable.
- Protocol::Connection#read_loop → each_message (returns Enumerator
  when called without a block).
- Listener::Base#receive_data → receive_message (it takes a parsed
  Response, not bytes; old name was EventMachine baggage).
- Listener::Inbound#initialize / Outbound#initialize use kwargs instead
  of positional args-hashes. Client/Server splat @options through.
- CommandSocket#initialize takes kwargs.
- Normalize Protocol::Connection#close and #close_write rescue lists
  to the same set (IOError + the common Errno family).
- Drop Librevox.reopen_log (no trap ever wired it up) and the SIGHUP
  paragraph from the README.
- Drop redundant response.event? guards in Outbound — response.event
  already returns nil when not an event, so the && is dead weight.
- attr :endpoint → attr_reader :endpoint on Server.
- Drop ConnectionError from Client#run's rescue list — Session#run
  already catches it internally, so it never escapes.
- Normalize error logging to #message on both Client and Server
  (full_message was noisy on routine reconnects).
- Drop the enum_for fallback on Connection#each_message — no caller
  uses the block-less form and the stream is not re-iterable anyway.
- Document the #session_complete? contract on Base so future listeners
  don't put expensive work in the hot path.

The reader&.stop safe-nav in Session is kept: ensure can fire before
reader is assigned when a test hard-kills the thread mid-run.
The FIFO @reply_promises invariant relied on push-then-send being atomic
up to the stream's flush yield. Make it explicit with an Async::Semaphore
around the enqueue+send critical section. The lock deliberately excludes
promise.wait, so a slow reply does not queue the next sender behind it.

Adds unit tests with a connection that yields mid-send (the real stream
does not, but this guards the lock's logic) covering: no mid-send
interleaving, FIFO reply routing under concurrency, UUID-keyed app
completion under reversed order, and disconnect unblocking pending
senders.
Drives Base#send_message -> Protocol::Connection -> Socket.pair ->
real receive_data framing on the peer, with replies routed back through
the listener's reader. Closes the gap where nothing exercised Connection
over a live fd.
@henrikbjorn henrikbjorn self-assigned this Jun 17, 2026

@c960657 c960657 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems legit

@henrikbjorn henrikbjorn merged commit bead4f5 into master Jun 17, 2026
3 checks passed
@henrikbjorn henrikbjorn deleted the extract-session branch June 17, 2026 09:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants