Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/Http/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

namespace Phenix\Http;

use Amp\ByteStream\ReadableIterableStream;
use Amp\ByteStream\ReadableStream;
use Amp\Http\Server\Response as ServerResponse;
use Amp\Http\Server\Trailers;
use Closure;
use InvalidArgumentException;
use Phenix\Contracts\Arrayable;
use Phenix\Facades\View;
use Phenix\Http\Constants\HttpStatus;
Expand Down Expand Up @@ -75,6 +78,27 @@ public function redirect(string $location, HttpStatus $status = HttpStatus::FOUN
return $this;
}

/**
* @param Closure(): iterable<int, ServerSentEvent|string>|iterable<int, ServerSentEvent|string> $events
*/
public function eventStream(
Closure|iterable $events,
HttpStatus $status = HttpStatus::OK,
array $headers = []
): self {
$this->body = new ReadableIterableStream($this->formatEventStream($this->resolveEventStream($events)));
$this->status = $status;
$this->headers = [
...[
'content-type' => 'text/event-stream; charset=utf-8',
'cache-control' => 'no-cache',
],
...$headers,
];

return $this;
}

public function send(): ServerResponse
{
return new ServerResponse(
Expand All @@ -84,4 +108,45 @@ public function send(): ServerResponse
$this->trailers
);
}

/**
* @param Closure(): iterable<int, ServerSentEvent|string>|iterable<int, ServerSentEvent|string> $events
* @return iterable<int, ServerSentEvent|string>
*/
protected function resolveEventStream(Closure|iterable $events): iterable
{
if (! $events instanceof Closure) {
return $events;
}

$events = $events();

if (! is_iterable($events)) {
throw new InvalidArgumentException('The event stream closure must return an iterable.');
}

return $events;
}

/**
* @param iterable<int, ServerSentEvent|string> $events
* @return iterable<int, string>
*/
protected function formatEventStream(iterable $events): iterable
{
foreach ($events as $event) {
yield $event instanceof ServerSentEvent
? (string) $event
: $this->normalizeEventFrame($event);
}
}

protected function normalizeEventFrame(string $event): string
{
if (str_ends_with($event, "\n\n") || str_ends_with($event, "\r\n\r\n")) {
return $event;
}

return rtrim($event, "\r\n") . "\n\n";
}
}
61 changes: 61 additions & 0 deletions src/Http/ServerSentEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace Phenix\Http;

use Stringable;

class ServerSentEvent implements Stringable
{
public function __construct(
public readonly string $data,
public readonly string|null $event = null,
public readonly string|null $id = null,
public readonly int|null $retry = null,
public readonly string|null $comment = null
) {
}

public function __toString(): string
{
return $this->toString();
}

public function toString(): string
{
$lines = [];

if ($this->comment !== null) {
foreach ($this->lines($this->comment) as $line) {
$lines[] = ": {$line}";
}
}

if ($this->event !== null) {
$lines[] = "event: {$this->event}";
}

if ($this->id !== null) {
$lines[] = "id: {$this->id}";
}

if ($this->retry !== null) {
$lines[] = "retry: {$this->retry}";
}

foreach ($this->lines($this->data) as $line) {
$lines[] = "data: {$line}";
}

return implode("\n", $lines) . "\n\n";
}

/**
* @return array<int, string>
*/
private function lines(string $value): array
{
return explode("\n", str_replace(["\r\n", "\r"], "\n", $value));
}
}
14 changes: 14 additions & 0 deletions src/Testing/Concerns/InteractWithHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,18 @@ public function assertIsPlainText(): self

return $this;
}

public function assertIsEventStream(): self
{
$contentType = $this->response->getHeader('content-type');

Assert::assertNotNull($contentType, $this->missingHeaderMessage);
Assert::assertStringContainsString(
'text/event-stream',
$contentType,
'Response does not have an event stream content type.'
);

return $this;
}
}
60 changes: 60 additions & 0 deletions tests/Feature/RequestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Phenix\Http\Constants\HttpStatus;
use Phenix\Http\Request;
use Phenix\Http\Response;
use Phenix\Http\ServerSentEvent;
use Phenix\Testing\TestResponse;
use Tests\Feature\Requests\LimitedBodyRequest;
use Tests\Feature\Requests\LimitedStreamedRequest;
Expand Down Expand Up @@ -219,6 +220,65 @@
->assertBodyContains('plain text');
});

it('can send server sent events', function (): void {
Route::get('/events', function (): Response {
return response()->eventStream(function (): iterable {
for ($index = 0; $index < 3; $index++) {
yield new ServerSentEvent(
data: "Event {$index}",
event: 'notification'
);
}
});
});

$this->app->run();

$this->get('/events')
->assertOk()
->assertIsEventStream()
->assertBodyContains([
"event: notification\ndata: Event 0\n\n",
"event: notification\ndata: Event 2\n\n",
]);
});

it('can resume server sent events using last event id', function (): void {
Route::get('/resumable-events', function (Request $request): Response {
$lastEventId = $request->getHeader('Last-Event-ID');
$start = $lastEventId === null ? 0 : ((int) str_replace('event-', '', $lastEventId)) + 1;

return response()->eventStream(function () use ($start): iterable {
for ($index = $start; $index < 4; $index++) {
yield new ServerSentEvent(
data: "Event {$index}",
event: 'notification',
id: "event-{$index}"
);
}
});
});

$this->app->run();

$response = $this->get('/resumable-events', [
'Last-Event-ID' => 'event-1',
]);

$response
->assertOk()
->assertIsEventStream()
->assertBodyContains([
"id: event-2\n",
"data: Event 2\n\n",
"id: event-3\n",
"data: Event 3\n\n",
]);

expect($response->getBody())->not->toContain('id: event-0')
->and($response->getBody())->not->toContain('id: event-1');
});

it('can assert json contains', function (): void {
Route::get('/api/user', function (): Response {
return response()->json([
Expand Down
66 changes: 66 additions & 0 deletions tests/Unit/Http/ResponseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Amp\Http\Server\Response as ServerResponse;
use Phenix\Data\Collection;
use Phenix\Http\Response;
use Phenix\Http\ServerSentEvent;

it('responds plain text', function () {
$response = new Response();
Expand Down Expand Up @@ -41,3 +42,68 @@
expect($serverResponse)->toBeInstanceOf(ServerResponse::class);
expect($serverResponse->getBody()->read())->toContain(json_encode($data));
});

it('responds event streams from raw frames', function () {
$response = new Response();

$serverResponse = $response->eventStream([
"event: notification\ndata: Event 0",
"event: notification\ndata: Event 1\n\n",
])->send();

expect($serverResponse)->toBeInstanceOf(ServerResponse::class);
expect($serverResponse->getHeader('Content-Type'))->toBe('text/event-stream; charset=utf-8');
expect($serverResponse->getHeader('Cache-Control'))->toBe('no-cache');
expect($serverResponse->getBody()->read())->toBe("event: notification\ndata: Event 0\n\n");
expect($serverResponse->getBody()->read())->toBe("event: notification\ndata: Event 1\n\n");
});

it('responds event streams from server sent events', function () {
$response = new Response();

$serverResponse = $response->eventStream([
new ServerSentEvent(
data: "First line\nSecond line",
event: 'notification',
id: 'event-1',
retry: 500,
comment: 'initial event'
),
])->send();

expect($serverResponse)->toBeInstanceOf(ServerResponse::class);
expect($serverResponse->getBody()->read())->toBe(
": initial event\n"
. "event: notification\n"
. "id: event-1\n"
. "retry: 500\n"
. "data: First line\n"
. "data: Second line\n\n"
);
});

it('responds event streams from closures', function () {
$response = new Response();

$serverResponse = $response->eventStream(function (): iterable {
yield new ServerSentEvent(
data: 'Event 0',
event: 'notification',
id: 'event-0'
);
})->send();

expect($serverResponse)->toBeInstanceOf(ServerResponse::class);
expect($serverResponse->getBody()->read())->toBe(
"event: notification\n"
. "id: event-0\n"
. "data: Event 0\n\n"
);
});

it('rejects event stream closures that do not return iterables', function () {
$response = new Response();

expect(fn (): Response => $response->eventStream(fn (): string => 'invalid'))
->toThrow(InvalidArgumentException::class, 'The event stream closure must return an iterable.');
});
Loading