Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@
"ext-json": "*",
"psr/http-factory": "^1.0.1",
"psr/http-message": "^1.0.1 || ^2.0",
"spiral/roadrunner": "^2023.1",
"spiral/roadrunner-worker": "^3.0"
"spiral/roadrunner": "^2023.3",
"spiral/roadrunner-worker": "^3.1.0"
},
"require-dev": {
"buggregator/trap": "^1.0",
"jetbrains/phpstorm-attributes": "^1.0",
"nyholm/psr7": "^1.3",
"phpunit/phpunit": "^10.0",
"symfony/process": "^6.2",
"symfony/var-dumper": "^6.3",
"vimeo/psalm": "^5.9"
},
"autoload": {
Expand Down
44 changes: 36 additions & 8 deletions src/HttpWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
use Spiral\RoadRunner\Message\Command\StreamStop;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\StreamWorkerInterface;
use Spiral\RoadRunner\WorkerInterface;

/**
Expand Down Expand Up @@ -64,10 +65,14 @@ public function waitRequest(): ?Request
/**
* @throws \JsonException
*/
public function respond(int $status, string|Generator $body, array $headers = []): void
public function respond(int $status, string|Generator $body = '', array $headers = [], bool $endOfStream = true): void
{
if ($status < 200 && $status >= 100 && $body !== '') {
throw new \InvalidArgumentException('Unable to send a body with informational status code.');
}

if ($body instanceof Generator) {
$this->respondStream($status, $body, $headers);
$this->respondStream($status, $body, $headers, $endOfStream);
return;
}

Expand All @@ -76,30 +81,53 @@ public function respond(int $status, string|Generator $body, array $headers = []
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$this->worker->respond(new Payload($body, $head));
$this->worker->respond(new Payload($body, $head, $endOfStream));
}

private function respondStream(int $status, Generator $body, array $headers = []): void
private function respondStream(int $status, Generator $body, array $headers = [], bool $endOfStream = true): void
{
$head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$worker = $this->worker instanceof StreamWorkerInterface
? $this->worker->withStreamMode()
: $this->worker;

do {
if (!$body->valid()) {
// End of generator
$content = (string)$body->getReturn();
$this->worker->respond(new Payload($content, $head, true));
if ($endOfStream === false && $content === '') {
// We don't need to send an empty frame if the stream is not ended
return;
}
$worker->respond(new Payload($content, $head, $endOfStream));
break;
}

$content = (string)$body->current();
if ($this->worker->getPayload(StreamStop::class) !== null) {
if ($worker->getPayload(StreamStop::class) !== null) {
$body->throw(new StreamStoppedException());

// RoadRunner is waiting for a Stream Stop Frame to confirm that the stream is closed
// and the worker doesn't hang
$worker->respond(new Payload(''));
return;
}
$this->worker->respond(new Payload($content, $head, false));
$body->next();

// Send a chunk of data
$worker->respond(new Payload($content, $head, false));
$head = null;

try {
$body->next();
} catch (\Throwable) {
// Stop the stream if an exception is thrown from the generator
$worker->respond(new Payload(''));
return;
}
} while (true);
}

Expand Down
3 changes: 1 addition & 2 deletions src/HttpWorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public function waitRequest(): ?Request;
* @param Generator<mixed, scalar|Stringable, mixed, Stringable|scalar|null>|string $body Body of response.
* If the body is a generator, then each yielded value will be sent as a separated stream chunk.
* Returned value will be sent as a last stream package.
* Note: Stream response is experimental feature and isn't supported by RoadRunner yet.
* But you can try to use RoadRunner 2.9-alpha to test it.
* Note: Stream response is supported by RoadRunner since version 2023.3
* @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name,
* and each value MUST be an array of strings for that header.
*/
Expand Down
47 changes: 44 additions & 3 deletions tests/Feature/StreamResponseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

namespace Spiral\RoadRunner\Tests\Http\Feature;

use Exception;
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\SocketRelay;
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
use Spiral\RoadRunner\Http\HttpWorker;
use Spiral\RoadRunner\Message\Command\GetProcessId;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\Tests\Http\Server\Command\BaseCommand;
use Spiral\RoadRunner\Tests\Http\Server\Command\StreamStop;
Expand Down Expand Up @@ -94,10 +96,46 @@ public function testStopStreamResponse(): void
self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer()));
}

public function testSend1xxWithBody(): void
{
$httpWorker = $this->makeHttpWorker();

$this->expectExceptionMessage('Unable to send a body with informational status code');

$httpWorker->respond(
103,
(function () {
yield 'Hel';
yield 'lo,';
})(),
);
}

public function testExceptionInGenerator(): void
{
$httpWorker = $this->makeHttpWorker();

// Flush buffer
ServerRunner::getBuffer();

$httpWorker->respond(
200,
(function () {
yield 'Hel';
yield 'lo,';
throw new Exception('test');
})(),
);


\usleep(100_000);
self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer()));
}

/**
* StopStream should be ignored if stream is already ended.
* Commented because doesn't pass in CI
* todo: check after RoadRunner Stream Response release
*/
public function testStopStreamAfterStreamEnd(): void
{
$httpWorker = $this->makeHttpWorker();
Expand All @@ -116,11 +154,14 @@ public function testStopStreamAfterStreamEnd(): void
$this->assertFalse($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class));
$this->sendCommand(new StreamStop());
\usleep(200_000);
self::assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer()));
$this->assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer()));
$this->assertTrue($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class));

$this->getWorker()->getPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class);
$this->getWorker()->getPayload(GetProcessId::class);

$this->assertFalse($this->getWorker()->hasPayload());
}
*/

private function getRelay(): SocketRelay
{
Expand Down