Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 142 additions & 81 deletions src/WebSocket/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

use Swoole\Coroutine\Http\Client as SwooleClient;
use Swoole\WebSocket\Frame;
use Swoole\Coroutine;

use function Swoole\Coroutine\run as Co;

class Client
{
Expand Down Expand Up @@ -50,62 +53,95 @@ public function __construct(string $url, array $options = [])
$this->timeout = $options['timeout'] ?? 30;
}

public function connect(): void
/**
* Ensures code runs in a coroutine context
* @param callable $callback Function to run in coroutine
* @return mixed Result of the callback
* @throws \Throwable If the callback throws an exception
*/
private function ensureCoroutine(callable $callback): mixed
{
$this->client = new SwooleClient($this->host, $this->port, $this->port === 443);
$this->client->set([
'timeout' => $this->timeout,
'websocket_compression' => true,
'max_frame_size' => 32 * 1024 * 1024, // 32MB max frame size
]);

if (!empty($this->headers)) {
$this->client->setHeaders($this->headers);
}
if (Coroutine::getCid() === -1) {
$result = null;
$exception = null;

Co(function () use ($callback, &$result, &$exception) {
try {
$result = $callback();
} catch (\Throwable $e) {
$exception = $e;
}
});

$success = $this->client->upgrade($this->path);
if ($exception !== null) {
throw $exception;
}

if (!$success) {
$error = new \RuntimeException(
"WebSocket connection failed: {$this->client->errCode} - {$this->client->errMsg}"
);
$this->emit('error', $error);
throw $error;
return $result;
}
return $callback();
}

$this->connected = true;
$this->emit('open');
public function connect(): void
{
$this->ensureCoroutine(function () {
$this->client = new SwooleClient($this->host, $this->port, $this->port === 443);
$this->client->set([
'timeout' => $this->timeout,
'websocket_compression' => true,
'max_frame_size' => 32 * 1024 * 1024, // 32MB max frame size
]);

if (!empty($this->headers)) {
$this->client->setHeaders($this->headers);
}

$success = $this->client->upgrade($this->path);

if (!$success) {
$error = new \RuntimeException(
"WebSocket connection failed: {$this->client->errCode} - {$this->client->errMsg}"
);
$this->emit('error', $error);
throw $error;
}

$this->connected = true;
$this->emit('open');
});
}

public function listen(): void
{
while ($this->connected) {
try {
$frame = $this->client->recv($this->timeout);

if ($frame === false) {
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
$this->handleClose();
break;
$this->ensureCoroutine(function () {
while ($this->connected) {
try {
$frame = $this->client->recv($this->timeout);

if ($frame === false) {
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
$this->handleClose();
break;
}
throw new \RuntimeException(
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
);
}
throw new \RuntimeException(
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
);
}

if ($frame === "") {
continue;
}
if ($frame === "") {
continue;
}

if ($frame instanceof Frame) {
$this->handleFrame($frame);
if ($frame instanceof Frame) {
$this->handleFrame($frame);
}
} catch (\Throwable $e) {
$this->emit('error', $e);
$this->handleClose();
break;
}
} catch (\Throwable $e) {
$this->emit('error', $e);
$this->handleClose();
break;
}
}
});
}

private function handleFrame(Frame $frame): void
Expand Down Expand Up @@ -138,18 +174,25 @@ private function handleClose(): void

public function send(string $data): void
{
if (!$this->connected) {
throw new \RuntimeException('Not connected to WebSocket server');
}
try {
$this->ensureCoroutine(function () use ($data) {
if (!$this->connected) {
throw new \RuntimeException('Not connected to WebSocket server');
}

$success = $this->client->push($data);
$success = $this->client->push($data);

if ($success === false) {
$error = new \RuntimeException(
"Failed to send data: {$this->client->errCode} - {$this->client->errMsg}"
);
$this->emit('error', $error);
throw $error;
if ($success === false) {
$error = new \RuntimeException(
"Failed to send data: {$this->client->errCode} - {$this->client->errMsg}"
);
$this->emit('error', $error);
throw $error;
}
});
} catch (\Throwable $e) {
$this->emit('error', $e);
throw $e;
}
}

Expand Down Expand Up @@ -223,43 +266,61 @@ private function emit(string $event, mixed $data = null): void

public function receive(): ?string
{
if (!$this->connected) {
throw new \RuntimeException('Not connected to WebSocket server');
}
/** @var string|null */
return $this->ensureCoroutine(function (): ?string {
if (!$this->connected) {
throw new \RuntimeException('Not connected to WebSocket server');
}

$frame = $this->client->recv($this->timeout);
$frame = $this->client->recv($this->timeout);

if ($frame === false) {
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
$this->handleClose();
if ($frame === false) {
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
$this->handleClose();
return null;
}
throw new \RuntimeException(
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
);
}

if ($frame === "") {
return null;
}
throw new \RuntimeException(
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
);
}

if ($frame === "") {
if ($frame instanceof Frame) {
switch ($frame->opcode) {
case WEBSOCKET_OPCODE_TEXT:
return $frame->data;
case WEBSOCKET_OPCODE_CLOSE:
$this->handleClose();
return null;
case WEBSOCKET_OPCODE_PING:
$this->emit('ping', $frame->data);
$this->client->push('', WEBSOCKET_OPCODE_PONG);
return null;
case WEBSOCKET_OPCODE_PONG:
$this->emit('pong', $frame->data);
return null;
}
}

return null;
}
});
}

if ($frame instanceof Frame) {
switch ($frame->opcode) {
case WEBSOCKET_OPCODE_TEXT:
return $frame->data;
case WEBSOCKET_OPCODE_CLOSE:
$this->handleClose();
return null;
case WEBSOCKET_OPCODE_PING:
$this->emit('ping', $frame->data);
$this->client->push('', WEBSOCKET_OPCODE_PONG);
return null;
case WEBSOCKET_OPCODE_PONG:
$this->emit('pong', $frame->data);
return null;
/**
* Check if there is an incoming message available without consuming it
*/
public function hasIncomingMessage(): bool
{
return (bool) $this->ensureCoroutine(function (): bool {
if (!$this->connected) {
return false;
}
}

return null;
// Small timeout to check if there is an incoming message
return $this->client->recv(0.001) !== false;
});
}
}
82 changes: 39 additions & 43 deletions tests/e2e/AdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
use PHPUnit\Framework\TestCase;
use Utopia\WebSocket\Client;

use function Swoole\Coroutine\run;

class AdapterTest extends TestCase
{
private function getWebsocket(string $host, int $port): Client
Expand All @@ -32,46 +30,44 @@ public function testWorkerman(): void

private function testServer(string $host, int $port): void
{
run(function () use ($host, $port) {
$client = $this->getWebsocket($host, $port);
$client->connect();

$client->send('ping');
$this->assertEquals('pong', $client->receive());
$this->assertEquals(true, $client->isConnected());

$clientA = $this->getWebsocket($host, $port);
$clientA->connect();
$clientB = $this->getWebsocket($host, $port);
$clientB->connect();

$clientA->send('ping');
$this->assertEquals('pong', $clientA->receive());
$clientB->send('pong');
$this->assertEquals('ping', $clientB->receive());

$clientA->send('broadcast');
$this->assertEquals('broadcast', $client->receive());
$this->assertEquals('broadcast', $clientA->receive());
$this->assertEquals('broadcast', $clientB->receive());

$clientB->send('broadcast');
$this->assertEquals('broadcast', $client->receive());
$this->assertEquals('broadcast', $clientA->receive());
$this->assertEquals('broadcast', $clientB->receive());

$clientA->close();
$clientB->close();

$client->send('disconnect');
$this->assertEquals('disconnect', $client->receive());

try {
$client->receive();
$this->fail('Expected RuntimeException was not thrown');
} catch (\RuntimeException $e) {
$this->assertStringContainsString('Failed to receive data:', $e->getMessage());
}
});
$client = $this->getWebsocket($host, $port);
$client->connect();

$client->send('ping');
$this->assertEquals('pong', $client->receive());
$this->assertEquals(true, $client->isConnected());

$clientA = $this->getWebsocket($host, $port);
$clientA->connect();
$clientB = $this->getWebsocket($host, $port);
$clientB->connect();

$clientA->send('ping');
$this->assertEquals('pong', $clientA->receive());
$clientB->send('pong');
$this->assertEquals('ping', $clientB->receive());

$clientA->send('broadcast');
$this->assertEquals('broadcast', $client->receive());
$this->assertEquals('broadcast', $clientA->receive());
$this->assertEquals('broadcast', $clientB->receive());

$clientB->send('broadcast');
$this->assertEquals('broadcast', $client->receive());
$this->assertEquals('broadcast', $clientA->receive());
$this->assertEquals('broadcast', $clientB->receive());

$clientA->close();
$clientB->close();

$client->send('disconnect');
$this->assertEquals('disconnect', $client->receive());

try {
$client->receive();
$this->fail('Expected RuntimeException was not thrown');
} catch (\RuntimeException $e) {
$this->assertStringContainsString('Failed to receive data:', $e->getMessage());
}
}
}
24 changes: 24 additions & 0 deletions tests/unit/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,28 @@ public function testListenWithError(): void
$this->assertTrue($errorReceived);
$this->assertFalse($this->client->isConnected());
}

public function testHasIncomingMessage(): void
{
try {
$swooleClient = $this->createMock(\Swoole\Coroutine\Http\Client::class);
$swooleClient->method('recv')
->willReturnOnConsecutiveCalls(
new \Swoole\WebSocket\Frame(),
false
);

$reflectionClass = new \ReflectionClass(Client::class);
$reflectionClass->getProperty('connected')->setValue($this->client, true);
$reflectionClass->getProperty('client')->setValue($this->client, $swooleClient);

$this->assertTrue($this->client->hasIncomingMessage());
$this->assertFalse($this->client->hasIncomingMessage());
} catch (\Error $e) {
if (strpos($e->getMessage(), 'enum_exists') !== false) {
$this->markTestSkipped('Test skipped due to enum_exists compatibility issue');
}
throw $e;
}
}
}