From b35b2ed2d93b37f6186ec1807e5f407449ff00c2 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 10 Feb 2019 21:32:30 +0100 Subject: [PATCH 1/6] Initial skeleton for lazy client connection Originally filed in https://github.com/clue/reactphp-redis/pull/82 by @WyriHaximus --- README.md | 7 + examples/cli.php | 9 +- examples/incr.php | 14 +- examples/publish.php | 12 +- examples/subscribe.php | 14 +- src/Factory.php | 5 + src/LazyStreamingClient.php | 122 ++++++++++ tests/FactoryLazyStreamingClientTest.php | 148 ++++++++++++ ...est.php => FactoryStreamingClientTest.php} | 2 +- tests/LazyStreamingClientTest.php | 217 ++++++++++++++++++ 10 files changed, 526 insertions(+), 24 deletions(-) create mode 100644 src/LazyStreamingClient.php create mode 100644 tests/FactoryLazyStreamingClientTest.php rename tests/{FactoryTest.php => FactoryStreamingClientTest.php} (99%) create mode 100644 tests/LazyStreamingClientTest.php diff --git a/README.md b/README.md index a9f6559..b5ddf4d 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ It enables you to set and query its data or use its PubSub topics to react to in * [Usage](#usage) * [Factory](#factory) * [createClient()](#createclient) + * [createLazyClient()](#createlazyclient) * [Client](#client) * [Commands](#commands) * [Promises](#promises) @@ -195,6 +196,12 @@ authentication. You can explicitly pass a custom timeout value in seconds $factory->createClient('localhost?timeout=0.5'); ``` +#### createLazyClient() + +The `createLazyClient($redisUri)` method can be used to create a new [`Client`](#client) which lazily +creates and connects to the configured redis server on the first command. Internally it will use `createClient()` +when the first command comes in, queues all commands while connecting, and pass on all commands directly when connected. + ### Client The `Client` is responsible for exchanging messages with Redis diff --git a/examples/cli.php b/examples/cli.php index 6da8c25..d11b2e7 100644 --- a/examples/cli.php +++ b/examples/cli.php @@ -11,7 +11,10 @@ echo '# connecting to redis...' . PHP_EOL; -$factory->createClient('localhost')->then(function (Client $client) use ($loop) { +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); + +try { echo '# connected! Entering interactive mode, hit CTRL-D to quit' . PHP_EOL; $loop->addReadStream(STDIN, function () use ($client, $loop) { @@ -48,10 +51,10 @@ $loop->removeReadStream(STDIN); }); -}, function (Exception $error) { +} catch (Exception $error) { echo 'CONNECTION ERROR: ' . $error->getMessage() . PHP_EOL; exit(1); -}); +}; $loop->run(); diff --git a/examples/incr.php b/examples/incr.php index 35c0684..7667b2c 100644 --- a/examples/incr.php +++ b/examples/incr.php @@ -8,14 +8,14 @@ $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); -$factory->createClient('localhost')->then(function (Client $client) { - $client->incr('test'); +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); +$client->incr('test'); - $client->get('test')->then(function ($result) { - var_dump($result); - }); - - $client->end(); +$client->get('test')->then(function ($result) { + var_dump($result); }); +$client->end(); + $loop->run(); diff --git a/examples/publish.php b/examples/publish.php index 2d6e8f1..b0eef97 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -11,12 +11,12 @@ $channel = isset($argv[1]) ? $argv[1] : 'channel'; $message = isset($argv[2]) ? $argv[2] : 'message'; -$factory->createClient('localhost')->then(function (Client $client) use ($channel, $message) { - $client->publish($channel, $message)->then(function ($received) { - echo 'successfully published. Received by ' . $received . PHP_EOL; - }); - - $client->end(); +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); +$client->publish($channel, $message)->then(function ($received) { + echo 'successfully published. Received by ' . $received . PHP_EOL; }); +$client->end(); + $loop->run(); diff --git a/examples/subscribe.php b/examples/subscribe.php index efae0a4..8ddbeb8 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -10,14 +10,14 @@ $channel = isset($argv[1]) ? $argv[1] : 'channel'; -$factory->createClient('localhost')->then(function (Client $client) use ($channel) { - $client->subscribe($channel)->then(function () { - echo 'Now subscribed to channel ' . PHP_EOL; - }); +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); +$client->subscribe($channel)->then(function () { + echo 'Now subscribed to channel ' . PHP_EOL; +}); - $client->on('message', function ($channel, $message) { - echo 'Message on ' . $channel . ': ' . $message . PHP_EOL; - }); +$client->on('message', function ($channel, $message) { + echo 'Message on ' . $channel . ': ' . $message . PHP_EOL; }); $loop->run(); diff --git a/src/Factory.php b/src/Factory.php index 6686411..1280d6d 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -38,6 +38,11 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector = $this->protocol = $protocol; } + public function createLazyClient($target) + { + return new LazyStreamingClient($target, $this); + } + /** * create redis client connected to address of given redis instance * diff --git a/src/LazyStreamingClient.php b/src/LazyStreamingClient.php new file mode 100644 index 0000000..03d061d --- /dev/null +++ b/src/LazyStreamingClient.php @@ -0,0 +1,122 @@ +target = $target; + $this->factory = $factory; + + $this->on('close', array($this, 'removeAllListeners')); + } + + private function client() + { + if ($this->promise instanceof PromiseInterface) { + return $this->promise; + } + + if ($this->client instanceof Client) { + return new FulfilledPromise($this->client()); + } + + $self = $this; + return $this->promise = $this->factory->createClient($this->target)->then(function (Client $client) use ($self) { + $self->client = $client; + $self->promise = null; + + Util::forwardEvents( + $self->client, + $self, + array( + 'error', + 'close', + 'message', + 'subscribe', + 'unsubscribe', + 'pmessage', + 'psubscribe', + 'punsubscribe', + ) + ); + + return $client; + }, function (\Exception $e) use ($self) { + // connection failed => emit error if connection is not already closed + if ($self->closed) { + return; + } + $self->emit('error', array($e)); + $self->close(); + + return $e; + }); + } + + public function __call($name, $args) + { + if ($this->client instanceof Client) { + return \call_user_func_array(array($this->client, $name), $args); + } + + return $this->client()->then(function (Client $client) use ($name, $args) { + return \call_user_func_array(array($client, $name), $args); + }); + } + + public function end() + { + if ($this->client instanceof Client) { + return $this->client->end(); + } + + return $this->client()->then(function (Client $client) { + return $client->end(); + }); + } + + public function close() + { + if ($this->client instanceof Client) { + return $this->client->close(); + } + + return $this->client()->then(function (Client $client) { + return $client->close(); + }); + } +} diff --git a/tests/FactoryLazyStreamingClientTest.php b/tests/FactoryLazyStreamingClientTest.php new file mode 100644 index 0000000..bb0b349 --- /dev/null +++ b/tests/FactoryLazyStreamingClientTest.php @@ -0,0 +1,148 @@ +loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $this->connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); + $this->factory = new Factory($this->loop, $this->connector); + } + + public function testWillConnectWithDefaultPort() + { + $this->connector->expects($this->never())->method('connect')->with('redis.example.com:6379')->willReturn(Promise\reject(new \RuntimeException())); + $this->factory->createLazyClient('redis.example.com'); + } + + public function testWillConnectToLocalhost() + { + $this->connector->expects($this->never())->method('connect')->with('localhost:1337')->willReturn(Promise\reject(new \RuntimeException())); + $this->factory->createLazyClient('localhost:1337'); + } + + public function testWillResolveIfConnectorResolves() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write'); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $client = $this->factory->createLazyClient('localhost'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } + + public function testWillWriteSelectCommandIfTargetContainsPath() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://127.0.0.1/demo'); + } + + public function testWillWriteSelectCommandIfTargetContainsDbQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$1\r\n4\r\n"); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://127.0.0.1?db=4'); + } + + public function testWillWriteAuthCommandIfRedisUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://hello:world@example.com'); + } + + public function testWillWriteAuthCommandIfRedisUriContainsEncodedUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://:h%40llo@example.com'); + } + + public function testWillWriteAuthCommandIfTargetContainsPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$6\r\nsecret\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://example.com?password=secret'); + } + + public function testWillWriteAuthCommandIfTargetContainsEncodedPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://example.com?password=h%40llo'); + } + + public function testWillWriteAuthCommandIfRedissUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('tls://example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('rediss://hello:world@example.com'); + } + + public function testWillWriteAuthCommandIfRedisUnixUriContainsPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?password=world'); + } + + public function testWillWriteAuthCommandIfRedisUnixUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix://hello:world@/tmp/redis.sock'); + } + + public function testWillWriteSelectCommandIfRedisUnixUriContainsDbQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?db=demo'); + } + + public function testWillRejectIfConnectorRejects() + { + $this->connector->expects($this->never())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException())); + $client = $this->factory->createLazyClient('redis://127.0.0.1:2'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } + + public function testWillRejectIfTargetIsInvalid() + { + $client = $this->factory->createLazyClient('http://invalid target'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } +} diff --git a/tests/FactoryTest.php b/tests/FactoryStreamingClientTest.php similarity index 99% rename from tests/FactoryTest.php rename to tests/FactoryStreamingClientTest.php index 883cbb6..89c5555 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryStreamingClientTest.php @@ -6,7 +6,7 @@ use React\Promise; use React\Promise\Deferred; -class FactoryTest extends TestCase +class FactoryStreamingClientTest extends TestCase { private $loop; private $connector; diff --git a/tests/LazyStreamingClientTest.php b/tests/LazyStreamingClientTest.php new file mode 100644 index 0000000..682cd3f --- /dev/null +++ b/tests/LazyStreamingClientTest.php @@ -0,0 +1,217 @@ +factory = $this->getMockBuilder('Clue\React\Redis\Factory')->setConstructorArgs(array(Factory::create()))->getMock(); + $this->stream = $this->getMockBuilder('React\Stream\DuplexStreamInterface')->getMock(); + $this->parser = $this->getMockBuilder('Clue\Redis\Protocol\Parser\ParserInterface')->getMock(); + $this->serializer = $this->getMockBuilder('Clue\Redis\Protocol\Serializer\SerializerInterface')->getMock(); + + $this->factory->expects($this->any())->method('createClient')->with('localhost')->will($this->returnValue(new FulfilledPromise(new StreamingClient($this->stream, $this->parser, $this->serializer)))); + + $this->client = new LazyStreamingClient('localhost', $this->factory); + } + + public function testSending() + { + $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message')); + $this->stream->expects($this->once())->method('write')->with($this->equalTo('message')); + + $this->client->ping(); + } + + public function testClosingClientEmitsEvent() + { + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + } + + public function testClosingStreamClosesClient() + { + $this->stream = new ThroughStream(); + $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); + + $this->client->on('close', $this->expectCallableOnce()); + + $this->stream->emit('close'); + } + + public function testReceiveParseErrorEmitsErrorEvent() + { + $this->stream = new ThroughStream(); + $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); + + $this->client->on('error', $this->expectCallableOnce()); + $this->client->on('close', $this->expectCallableOnce()); + + $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException())); + $this->stream->emit('data', array('message')); + } + + public function testReceiveThrowMessageEmitsErrorEvent() + { + $this->stream = new ThroughStream(); + $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); + + $this->client->on('error', $this->expectCallableOnce()); + + $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); + $this->stream->emit('data', array('message')); + } + + public function testPingPong() + { + $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping')); + + $promise = $this->client->ping(); + + $this->client->handleMessage(new BulkReply('PONG')); + + $this->expectPromiseResolve($promise); + $promise->then($this->expectCallableOnce('PONG')); + } + + public function testMonitorCommandIsNotSupported() + { + $promise = $this->client->monitor(); + + $this->expectPromiseReject($promise); + } + + public function testErrorReply() + { + $promise = $this->client->invalid(); + + $err = new ErrorReply("ERR unknown command 'invalid'"); + $this->client->handleMessage($err); + + $this->expectPromiseReject($promise); + $promise->then(null, $this->expectCallableOnce($err)); + } + + public function testClosingClientRejectsAllRemainingRequests() + { + $promise = $this->client->ping(); + $this->client->close(); + + $this->expectPromiseReject($promise); + } + + public function testClosedClientRejectsAllNewRequests() + { + $this->client->close(); + $promise = $this->client->ping(); + + $this->expectPromiseReject($promise); + } + + public function testEndingNonBusyClosesClient() + { + $this->client->on('close', $this->expectCallableOnce()); + $this->client->end(); + } + + public function testEndingBusyClosesClientWhenNotBusyAnymore() + { + // count how often the "close" method has been called + $closed = 0; + $this->client->on('close', function() use (&$closed) { + ++$closed; + }); + + $promise = $this->client->ping(); + $this->assertEquals(0, $closed); + + $this->client->end(); + $this->assertEquals(0, $closed); + + $this->client->handleMessage(new BulkReply('PONG')); + $promise->then($this->expectCallableOnce('PONG')); + $this->assertEquals(1, $closed); + } + + public function testClosingMultipleTimesEmitsOnce() + { + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + $this->client->close(); + } + + public function testReceivingUnexpectedMessageThrowsException() + { + $that = $this; + $this->client->handleMessage(new BulkReply('PONG'))->then(function($value) use ($that) { + $that->assertNull($value); + $that->fail('promise resolved'); + }, function($value) use ($that) { + $that->assertInstanceOf('UnderflowException', $value); + })->done(); + } + + public function testPubsubSubscribe() + { + $promise = $this->client->subscribe('test'); + $this->expectPromiseResolve($promise); + + $this->client->on('subscribe', $this->expectCallableOnce()); + $this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('test'), new IntegerReply(1)))); + + return $this->client; + } + + /** + * @depends testPubsubSubscribe + * @param Client $client + */ + public function testPubsubPatternSubscribe(Client $client) + { + $promise = $client->psubscribe('demo_*'); + $this->expectPromiseResolve($promise); + + $client->on('psubscribe', $this->expectCallableOnce()); + $client->handleMessage(new MultiBulkReply(array(new BulkReply('psubscribe'), new BulkReply('demo_*'), new IntegerReply(1)))); + + return $client; + } + + /** + * @depends testPubsubPatternSubscribe + * @param Client $client + */ + public function testPubsubMessage(Client $client) + { + $client->on('message', $this->expectCallableOnce()); + $client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload')))); + } + + public function testPubsubSubscribeSingleOnly() + { + $this->expectPromiseReject($this->client->subscribe('a', 'b')); + $this->expectPromiseReject($this->client->unsubscribe('a', 'b')); + $this->expectPromiseReject($this->client->unsubscribe()); + } +} From 82dc4c7ce31b395af891abb017b4374a2bfdd356 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Tue, 26 Feb 2019 21:01:27 +0100 Subject: [PATCH 2/6] Make sure lazy client follows exactly Client semantics --- README.md | 131 +++++++++-- examples/cli.php | 9 +- examples/incr.php | 2 - examples/publish.php | 2 - examples/subscribe.php | 2 - src/Factory.php | 20 +- ...LazyStreamingClient.php => LazyClient.php} | 67 +++--- ...ientTest.php => FactoryLazyClientTest.php} | 2 +- tests/FunctionalTest.php | 58 +++-- tests/LazyClientTest.php | 220 ++++++++++++++++++ tests/LazyStreamingClientTest.php | 217 ----------------- 11 files changed, 420 insertions(+), 310 deletions(-) rename src/{LazyStreamingClient.php => LazyClient.php} (57%) rename tests/{FactoryLazyStreamingClientTest.php => FactoryLazyClientTest.php} (99%) create mode 100644 tests/LazyClientTest.php delete mode 100644 tests/LazyStreamingClientTest.php diff --git a/README.md b/README.md index b5ddf4d..0abe1fa 100644 --- a/README.md +++ b/README.md @@ -47,23 +47,22 @@ local Redis server and send some requests: $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); -$factory->createClient('localhost')->then(function (Client $client) use ($loop) { - $client->set('greeting', 'Hello world'); - $client->append('greeting', '!'); - - $client->get('greeting')->then(function ($greeting) { - // Hello world! - echo $greeting . PHP_EOL; - }); - - $client->incr('invocation')->then(function ($n) { - echo 'This is invocation #' . $n . PHP_EOL; - }); - - // end connection once all pending requests have been resolved - $client->end(); +$client = $factory->createLazyClient('localhost'); +$client->set('greeting', 'Hello world'); +$client->append('greeting', '!'); + +$client->get('greeting')->then(function ($greeting) { + // Hello world! + echo $greeting . PHP_EOL; +}); + +$client->incr('invocation')->then(function ($n) { + echo 'This is invocation #' . $n . PHP_EOL; }); +// end connection once all pending requests have been resolved +$client->end(); + $loop->run(); ``` @@ -101,7 +100,7 @@ $factory = new Factory($loop, $connector); #### createClient() -The `createClient($redisUri): PromiseInterface` method can be used to +The `createClient(string $redisUri): PromiseInterface` method can be used to create a new [`Client`](#client). It helps with establishing a plain TCP/IP or secure TLS connection to Redis @@ -198,9 +197,103 @@ $factory->createClient('localhost?timeout=0.5'); #### createLazyClient() -The `createLazyClient($redisUri)` method can be used to create a new [`Client`](#client) which lazily -creates and connects to the configured redis server on the first command. Internally it will use `createClient()` -when the first command comes in, queues all commands while connecting, and pass on all commands directly when connected. +The `createLazyClient(string $redisUri): Client` method can be used to +create a new [`Client`](#client). + +It helps with establishing a plain TCP/IP or secure TLS connection to Redis +and optionally authenticating (AUTH) and selecting the right database (SELECT). + +```php +$client = $factory->createLazyClient('redis://localhost:6379'); + +$client->incr('hello'); +$client->end(); +``` + +This method immediately returns a "virtual" connection implementing the +[`Client`](#client) that can be used to interface with your Redis database. +Internally, it lazily creates the underlying database connection (which may +take some time) only once the first request is invoked on this instance and +will queue all outstanding requests until the underlying connection is ready. + +From a consumer side this means that you can start sending commands to the +database right away while the actual connection may still be outstanding. +It will ensure that all commands will be executed in the order they are +enqueued once the connection is ready. If the database connection fails, +it will emit an `error` event, reject all outstanding commands and `close` +the connection as described in the `Client`. In other words, it behaves just +like a real connection and frees you from having to deal with its async +resolution. + +Note that creating the underlying connection will be deferred until the +first request is invoked. Accordingly, any eventual connection issues +will be detected once this instance is first used. Similarly, calling +`end()` on this instance before invoking any requests will succeed +immediately and will not wait for an actual underlying connection. + +Depending on your particular use case, you may prefer this method or the +underlying `createClient()` which resolves with a promise. For many +simple use cases it may be easier to create a lazy connection. + +The `$redisUri` can be given in the +[standard](https://www.iana.org/assignments/uri-schemes/prov/redis) form +`[redis[s]://][:auth@]host[:port][/db]`. +You can omit the URI scheme and port if you're connecting to the default port 6379: + +```php +// both are equivalent due to defaults being applied +$factory->createLazyClient('localhost'); +$factory->createLazyClient('redis://localhost:6379'); +``` + +Redis supports password-based authentication (`AUTH` command). Note that Redis' +authentication mechanism does not employ a username, so you can pass the +password `h@llo` URL-encoded (percent-encoded) as part of the URI like this: + +```php +// all forms are equivalent +$factory->createLazyClient('redis://:h%40llo@localhost'); +$factory->createLazyClient('redis://ignored:h%40llo@localhost'); +$factory->createLazyClient('redis://localhost?password=h%40llo'); +``` + +You can optionally include a path that will be used to select (SELECT command) the right database: + +```php +// both forms are equivalent +$factory->createLazyClient('redis://localhost/2'); +$factory->createLazyClient('redis://localhost?db=2'); +``` + +You can use the [standard](https://www.iana.org/assignments/uri-schemes/prov/rediss) +`rediss://` URI scheme if you're using a secure TLS proxy in front of Redis: + +```php +$factory->createLazyClient('rediss://redis.example.com:6340'); +``` + +You can use the `redis+unix://` URI scheme if your Redis instance is listening +on a Unix domain socket (UDS) path: + +```php +$factory->createLazyClient('redis+unix:///tmp/redis.sock'); + +// the URI MAY contain `password` and `db` query parameters as seen above +$factory->createLazyClient('redis+unix:///tmp/redis.sock?password=secret&db=2'); + +// the URI MAY contain authentication details as userinfo as seen above +// should be used with care, also note that database can not be passed as path +$factory->createLazyClient('redis+unix://:secret@/tmp/redis.sock'); +``` + +This method respects PHP's `default_socket_timeout` setting (default 60s) +as a timeout for establishing the underlying connection and waiting for +successful authentication. You can explicitly pass a custom timeout value +in seconds (or use a negative number to not apply a timeout) like this: + +```php +$factory->createLazyClient('localhost?timeout=0.5'); +``` ### Client diff --git a/examples/cli.php b/examples/cli.php index d11b2e7..6da8c25 100644 --- a/examples/cli.php +++ b/examples/cli.php @@ -11,10 +11,7 @@ echo '# connecting to redis...' . PHP_EOL; -/** @var Client $client */ -$client = $factory->createLazyClient('localhost'); - -try { +$factory->createClient('localhost')->then(function (Client $client) use ($loop) { echo '# connected! Entering interactive mode, hit CTRL-D to quit' . PHP_EOL; $loop->addReadStream(STDIN, function () use ($client, $loop) { @@ -51,10 +48,10 @@ $loop->removeReadStream(STDIN); }); -} catch (Exception $error) { +}, function (Exception $error) { echo 'CONNECTION ERROR: ' . $error->getMessage() . PHP_EOL; exit(1); -}; +}); $loop->run(); diff --git a/examples/incr.php b/examples/incr.php index 7667b2c..f61a033 100644 --- a/examples/incr.php +++ b/examples/incr.php @@ -1,6 +1,5 @@ createLazyClient('localhost'); $client->incr('test'); diff --git a/examples/publish.php b/examples/publish.php index b0eef97..e4fe494 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -1,6 +1,5 @@ createLazyClient('localhost'); $client->publish($channel, $message)->then(function ($received) { echo 'successfully published. Received by ' . $received . PHP_EOL; diff --git a/examples/subscribe.php b/examples/subscribe.php index 8ddbeb8..1c17741 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -1,6 +1,5 @@ createLazyClient('localhost'); $client->subscribe($channel)->then(function () { echo 'Now subscribed to channel ' . PHP_EOL; diff --git a/src/Factory.php b/src/Factory.php index 1280d6d..c49f33b 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -38,16 +38,11 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector = $this->protocol = $protocol; } - public function createLazyClient($target) - { - return new LazyStreamingClient($target, $this); - } - /** - * create redis client connected to address of given redis instance + * Create Redis client connected to address of given redis instance * * @param string $target Redis server URI to connect to - * @return \React\Promise\PromiseInterface resolves with Client or rejects with \Exception + * @return \React\Promise\PromiseInterface resolves with Client or rejects with \Exception */ public function createClient($target) { @@ -120,6 +115,17 @@ function ($error) use ($client) { }); } + /** + * Create Redis client connected to address of given redis instance + * + * @param string $target + * @return Client + */ + public function createLazyClient($target) + { + return new LazyClient($target, $this); + } + /** * @param string $target * @return array with keys authority, auth and db diff --git a/src/LazyStreamingClient.php b/src/LazyClient.php similarity index 57% rename from src/LazyStreamingClient.php rename to src/LazyClient.php index 03d061d..a88175a 100644 --- a/src/LazyStreamingClient.php +++ b/src/LazyClient.php @@ -3,35 +3,20 @@ namespace Clue\React\Redis; use Evenement\EventEmitter; -use Clue\Redis\Protocol\Parser\ParserInterface; -use Clue\Redis\Protocol\Parser\ParserException; -use Clue\Redis\Protocol\Serializer\SerializerInterface; -use Clue\Redis\Protocol\Factory as ProtocolFactory; -use React\Promise\FulfilledPromise; -use React\Promise\Promise; use React\Promise\PromiseInterface; use React\Stream\Util; -use UnderflowException; -use RuntimeException; -use InvalidArgumentException; -use React\Promise\Deferred; -use Clue\Redis\Protocol\Model\ErrorReply; -use Clue\Redis\Protocol\Model\ModelInterface; -use Clue\Redis\Protocol\Model\MultiBulkReply; -use React\Stream\DuplexStreamInterface; /** * @internal */ -class LazyStreamingClient extends EventEmitter implements Client +class LazyClient extends EventEmitter implements Client { private $target; /** @var Factory */ private $factory; private $ending = false; private $closed = false; - public $promise = null; - public $client = null; + private $promise; /** * @param $target @@ -50,21 +35,13 @@ private function client() return $this->promise; } - if ($this->client instanceof Client) { - return new FulfilledPromise($this->client()); - } - $self = $this; return $this->promise = $this->factory->createClient($this->target)->then(function (Client $client) use ($self) { - $self->client = $client; - $self->promise = null; - Util::forwardEvents( - $self->client, + $client, $self, array( 'error', - 'close', 'message', 'subscribe', 'unsubscribe', @@ -74,6 +51,8 @@ private function client() ) ); + $client->on('close', array($self, 'close')); + return $client; }, function (\Exception $e) use ($self) { // connection failed => emit error if connection is not already closed @@ -83,14 +62,14 @@ private function client() $self->emit('error', array($e)); $self->close(); - return $e; + throw $e; }); } public function __call($name, $args) { - if ($this->client instanceof Client) { - return \call_user_func_array(array($this->client, $name), $args); + if ($this->closed) { + return \React\Promise\reject(new \RuntimeException('Connection closed')); } return $this->client()->then(function (Client $client) use ($name, $args) { @@ -100,23 +79,37 @@ public function __call($name, $args) public function end() { - if ($this->client instanceof Client) { - return $this->client->end(); + if ($this->promise === null) { + $this->close(); + } + + if ($this->closed) { + return; } return $this->client()->then(function (Client $client) { - return $client->end(); + $client->end(); }); } public function close() { - if ($this->client instanceof Client) { - return $this->client->close(); + if ($this->closed) { + return; } - return $this->client()->then(function (Client $client) { - return $client->close(); - }); + $this->closed = true; + + // either close active connection or cancel pending connection attempt + if ($this->promise !== null) { + $this->promise->then(function (Client $client) { + $client->close(); + }); + $this->promise->cancel(); + $this->promise = null; + } + + $this->emit('close'); + $this->removeAllListeners(); } } diff --git a/tests/FactoryLazyStreamingClientTest.php b/tests/FactoryLazyClientTest.php similarity index 99% rename from tests/FactoryLazyStreamingClientTest.php rename to tests/FactoryLazyClientTest.php index bb0b349..bd63c68 100644 --- a/tests/FactoryLazyStreamingClientTest.php +++ b/tests/FactoryLazyClientTest.php @@ -5,7 +5,7 @@ use Clue\React\Redis\Factory; use React\Promise; -class FactoryLazyStreamingClientTest extends TestCase +class FactoryLazyClientTest extends TestCase { private $loop; private $connector; diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index d3a81f9..56f1867 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -14,23 +14,22 @@ class FunctionalTest extends TestCase { private $loop; private $factory; - private $client; + private $uri; public function setUp() { - $uri = getenv('REDIS_URI'); - if ($uri === false) { + $this->uri = getenv('REDIS_URI'); + if ($this->uri === false) { $this->markTestSkipped('No REDIS_URI environment variable given'); } $this->loop = new StreamSelectLoop(); $this->factory = new Factory($this->loop); - $this->client = $this->createClient($uri); } public function testPing() { - $client = $this->client; + $client = $this->createClient($this->uri); $promise = $client->ping(); $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); @@ -38,13 +37,23 @@ public function testPing() $ret = Block\await($promise, $this->loop); $this->assertEquals('PONG', $ret); + } + + public function testPingLazy() + { + $client = $this->factory->createLazyClient($this->uri); - return $client; + $promise = $client->ping(); + $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); + + $ret = Block\await($promise, $this->loop); + + $this->assertEquals('PONG', $ret); } public function testMgetIsNotInterpretedAsSubMessage() { - $client = $this->client; + $client = $this->createClient($this->uri); $client->mset('message', 'message', 'channel', 'channel', 'payload', 'payload'); @@ -56,7 +65,7 @@ public function testMgetIsNotInterpretedAsSubMessage() public function testPipeline() { - $client = $this->client; + $client = $this->createClient($this->uri); $client->set('a', 1)->then($this->expectCallableOnceWith('OK')); $client->incr('a')->then($this->expectCallableOnceWith(2)); @@ -68,7 +77,8 @@ public function testPipeline() public function testInvalidCommand() { - $promise = $this->client->doesnotexist(1, 2, 3); + $client = $this->createClient($this->uri); + $promise = $client->doesnotexist(1, 2, 3); if (method_exists($this, 'expectException')) { $this->expectException('Exception'); @@ -80,15 +90,16 @@ public function testInvalidCommand() public function testMultiExecEmpty() { - $this->client->multi()->then($this->expectCallableOnceWith('OK')); - $promise = $this->client->exec()->then($this->expectCallableOnceWith(array())); + $client = $this->createClient($this->uri); + $client->multi()->then($this->expectCallableOnceWith('OK')); + $promise = $client->exec()->then($this->expectCallableOnceWith(array())); Block\await($promise, $this->loop); } public function testMultiExecQueuedExecHasValues() { - $client = $this->client; + $client = $this->createClient($this->uri); $client->multi()->then($this->expectCallableOnceWith('OK')); $client->set('b', 10)->then($this->expectCallableOnceWith('QUEUED')); @@ -102,8 +113,8 @@ public function testMultiExecQueuedExecHasValues() public function testPubSub() { - $consumer = $this->client; - $producer = $this->createClient(getenv('REDIS_URI')); + $consumer = $this->createClient($this->uri); + $producer = $this->createClient($this->uri); $channel = 'channel:test:' . mt_rand(); @@ -122,11 +133,24 @@ public function testPubSub() public function testClose() { - $this->client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); + $client = $this->createClient($this->uri); + + $client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); + + $client->close(); + + $client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); + } + + public function testCloseLazy() + { + $client = $this->factory->createLazyClient($this->uri); + + $client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); - $this->client->close(); + $client->close(); - $this->client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); + $client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); } public function testInvalidProtocol() diff --git a/tests/LazyClientTest.php b/tests/LazyClientTest.php new file mode 100644 index 0000000..6ea2680 --- /dev/null +++ b/tests/LazyClientTest.php @@ -0,0 +1,220 @@ +factory = $this->getMockBuilder('Clue\React\Redis\Factory')->disableOriginalConstructor()->getMock(); + + $this->client = new LazyClient('localhost', $this->factory); + } + + public function testPingWillCreateUnderlyingClientAndReturnPendingPromise() + { + $promise = new Promise(function () { }); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $promise = $this->client->ping(); + + $promise->then($this->expectCallableNever()); + } + + public function testPingTwiceWillCreateOnceUnderlyingClient() + { + $promise = new Promise(function () { }); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $this->client->ping(); + $this->client->ping(); + } + + public function testPingWillResolveWhenUnderlyingClientResolvesPing() + { + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then($this->expectCallableOnceWith('PONG')); + } + + public function testPingWillRejectWhenUnderlyingClientRejectsPing() + { + $error = new \RuntimeException(); + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\reject($error)); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then(null, $this->expectCallableOnceWith($error)); + } + + public function testPingWillRejectAndEmitErrorAndCloseWhenFactoryRejectsUnderlyingClient() + { + $error = new \RuntimeException(); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->on('error', $this->expectCallableOnceWith($error)); + $this->client->on('close', $this->expectCallableOnce()); + + $promise = $this->client->ping(); + $deferred->reject($error); + + $promise->then(null, $this->expectCallableOnceWith($error)); + } + + public function testPingAfterCloseWillRejectWithoutCreatingUnderlyingConnection() + { + $this->factory->expects($this->never())->method('createClient'); + + $this->client->close(); + $promise = $this->client->ping(); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testCloseWillEmitCloseEventWithoutCreatingUnderlyingClient() + { + $this->factory->expects($this->never())->method('createClient'); + + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + } + + public function testCloseTwiceWillEmitCloseEventOnce() + { + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + $this->client->close(); + } + + public function testCloseAfterPingWillCancelUnderlyingClientConnectionWhenStillPending() + { + $promise = new Promise(function () { }, $this->expectCallableOnce()); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $this->client->ping(); + $this->client->close(); + } + + public function testCloseAfterPingWillEmitCloseWithoutErrorWhenUnderlyingClientConnectionThrowsDueToCancellation() + { + $promise = new Promise(function () { }, function () { + throw new \RuntimeException('Discarded'); + }); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $this->client->on('error', $this->expectCallableNever()); + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->ping(); + $this->client->close(); + } + + public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlreadyResolved() + { + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('close'); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + $this->client->close(); + } + + public function testEndWillCloseClientIfUnderlyingConnectionIsNotPending() + { + $this->client->on('close', $this->expectCallableOnce()); + $this->client->end(); + } + + public function testEndAfterPingWillEndUnderlyingClient() + { + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('end'); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + $this->client->end(); + } + + public function testEmitsErrorEventWhenUnderlyingClientEmitsError() + { + $error = new \RuntimeException(); + + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + + $this->client->on('error', $this->expectCallableOnceWith($error)); + $client->emit('error', array($error)); + } + + public function testEmitsCloseEventWhenUnderlyingClientEmitsClose() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + + $this->client->on('close', $this->expectCallableOnce()); + $client->emit('close'); + } + + public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubChannel() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->subscribe('foo'); + $deferred->resolve($client); + + $this->client->on('message', $this->expectCallableOnce()); + $client->emit('message', array('foo', 'bar')); + } +} diff --git a/tests/LazyStreamingClientTest.php b/tests/LazyStreamingClientTest.php deleted file mode 100644 index 682cd3f..0000000 --- a/tests/LazyStreamingClientTest.php +++ /dev/null @@ -1,217 +0,0 @@ -factory = $this->getMockBuilder('Clue\React\Redis\Factory')->setConstructorArgs(array(Factory::create()))->getMock(); - $this->stream = $this->getMockBuilder('React\Stream\DuplexStreamInterface')->getMock(); - $this->parser = $this->getMockBuilder('Clue\Redis\Protocol\Parser\ParserInterface')->getMock(); - $this->serializer = $this->getMockBuilder('Clue\Redis\Protocol\Serializer\SerializerInterface')->getMock(); - - $this->factory->expects($this->any())->method('createClient')->with('localhost')->will($this->returnValue(new FulfilledPromise(new StreamingClient($this->stream, $this->parser, $this->serializer)))); - - $this->client = new LazyStreamingClient('localhost', $this->factory); - } - - public function testSending() - { - $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message')); - $this->stream->expects($this->once())->method('write')->with($this->equalTo('message')); - - $this->client->ping(); - } - - public function testClosingClientEmitsEvent() - { - $this->client->on('close', $this->expectCallableOnce()); - - $this->client->close(); - } - - public function testClosingStreamClosesClient() - { - $this->stream = new ThroughStream(); - $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); - - $this->client->on('close', $this->expectCallableOnce()); - - $this->stream->emit('close'); - } - - public function testReceiveParseErrorEmitsErrorEvent() - { - $this->stream = new ThroughStream(); - $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); - - $this->client->on('error', $this->expectCallableOnce()); - $this->client->on('close', $this->expectCallableOnce()); - - $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException())); - $this->stream->emit('data', array('message')); - } - - public function testReceiveThrowMessageEmitsErrorEvent() - { - $this->stream = new ThroughStream(); - $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); - - $this->client->on('error', $this->expectCallableOnce()); - - $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); - $this->stream->emit('data', array('message')); - } - - public function testPingPong() - { - $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping')); - - $promise = $this->client->ping(); - - $this->client->handleMessage(new BulkReply('PONG')); - - $this->expectPromiseResolve($promise); - $promise->then($this->expectCallableOnce('PONG')); - } - - public function testMonitorCommandIsNotSupported() - { - $promise = $this->client->monitor(); - - $this->expectPromiseReject($promise); - } - - public function testErrorReply() - { - $promise = $this->client->invalid(); - - $err = new ErrorReply("ERR unknown command 'invalid'"); - $this->client->handleMessage($err); - - $this->expectPromiseReject($promise); - $promise->then(null, $this->expectCallableOnce($err)); - } - - public function testClosingClientRejectsAllRemainingRequests() - { - $promise = $this->client->ping(); - $this->client->close(); - - $this->expectPromiseReject($promise); - } - - public function testClosedClientRejectsAllNewRequests() - { - $this->client->close(); - $promise = $this->client->ping(); - - $this->expectPromiseReject($promise); - } - - public function testEndingNonBusyClosesClient() - { - $this->client->on('close', $this->expectCallableOnce()); - $this->client->end(); - } - - public function testEndingBusyClosesClientWhenNotBusyAnymore() - { - // count how often the "close" method has been called - $closed = 0; - $this->client->on('close', function() use (&$closed) { - ++$closed; - }); - - $promise = $this->client->ping(); - $this->assertEquals(0, $closed); - - $this->client->end(); - $this->assertEquals(0, $closed); - - $this->client->handleMessage(new BulkReply('PONG')); - $promise->then($this->expectCallableOnce('PONG')); - $this->assertEquals(1, $closed); - } - - public function testClosingMultipleTimesEmitsOnce() - { - $this->client->on('close', $this->expectCallableOnce()); - - $this->client->close(); - $this->client->close(); - } - - public function testReceivingUnexpectedMessageThrowsException() - { - $that = $this; - $this->client->handleMessage(new BulkReply('PONG'))->then(function($value) use ($that) { - $that->assertNull($value); - $that->fail('promise resolved'); - }, function($value) use ($that) { - $that->assertInstanceOf('UnderflowException', $value); - })->done(); - } - - public function testPubsubSubscribe() - { - $promise = $this->client->subscribe('test'); - $this->expectPromiseResolve($promise); - - $this->client->on('subscribe', $this->expectCallableOnce()); - $this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('test'), new IntegerReply(1)))); - - return $this->client; - } - - /** - * @depends testPubsubSubscribe - * @param Client $client - */ - public function testPubsubPatternSubscribe(Client $client) - { - $promise = $client->psubscribe('demo_*'); - $this->expectPromiseResolve($promise); - - $client->on('psubscribe', $this->expectCallableOnce()); - $client->handleMessage(new MultiBulkReply(array(new BulkReply('psubscribe'), new BulkReply('demo_*'), new IntegerReply(1)))); - - return $client; - } - - /** - * @depends testPubsubPatternSubscribe - * @param Client $client - */ - public function testPubsubMessage(Client $client) - { - $client->on('message', $this->expectCallableOnce()); - $client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload')))); - } - - public function testPubsubSubscribeSingleOnly() - { - $this->expectPromiseReject($this->client->subscribe('a', 'b')); - $this->expectPromiseReject($this->client->unsubscribe('a', 'b')); - $this->expectPromiseReject($this->client->unsubscribe()); - } -} From 46e4cd295a1d8cfbe4812ff732d60191d9efdf2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 27 Feb 2019 11:16:15 +0100 Subject: [PATCH 3/6] Keep track of underlying connection and create new when connection lost --- README.md | 36 ++++++++++++------- src/LazyClient.php | 33 +++++++++--------- tests/LazyClientTest.php | 75 ++++++++++++++++++++++++++++++++++------ 3 files changed, 104 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 0abe1fa..e0553a4 100644 --- a/README.md +++ b/README.md @@ -212,24 +212,34 @@ $client->end(); This method immediately returns a "virtual" connection implementing the [`Client`](#client) that can be used to interface with your Redis database. -Internally, it lazily creates the underlying database connection (which may -take some time) only once the first request is invoked on this instance and -will queue all outstanding requests until the underlying connection is ready. +Internally, it lazily creates the underlying database connection only on +demand once the first request is invoked on this instance and will queue +all outstanding requests until the underlying connection is ready. +Additionally, it will keep track of this underlying connection and will +create a new underlying connection on demand when the current connection +is lost. From a consumer side this means that you can start sending commands to the -database right away while the actual connection may still be outstanding. -It will ensure that all commands will be executed in the order they are -enqueued once the connection is ready. If the database connection fails, -it will emit an `error` event, reject all outstanding commands and `close` -the connection as described in the `Client`. In other words, it behaves just -like a real connection and frees you from having to deal with its async -resolution. +database right away while the underlying connection may still be +outstanding. Because creating this underlying connection may take some +time, it will enqueue all oustanding commands and will ensure that all +commands will be executed in correct order once the connection is ready. +In other words, this "virtual" connection behaves just like a "real" +connection as described in the `Client` interface and frees you from having +to deal with its async resolution. + +If the underlying database connection fails, it will reject all +outstanding commands and will return to the initial "idle" state. This +means that you can keep sending additional commands at a later time which +will again try to open the underlying connection. Note that creating the underlying connection will be deferred until the first request is invoked. Accordingly, any eventual connection issues -will be detected once this instance is first used. Similarly, calling -`end()` on this instance before invoking any requests will succeed -immediately and will not wait for an actual underlying connection. +will be detected once this instance is first used. You can use the +`end()` method to ensure that the "virtual" connection will be soft-closed +and no further commands can be enqueued. Similarly, calling `end()` on +this instance before invoking any requests will succeed immediately and +will not wait for an actual underlying connection. Depending on your particular use case, you may prefer this method or the underlying `createClient()` which resolves with a promise. For many diff --git a/src/LazyClient.php b/src/LazyClient.php index a88175a..227e103 100644 --- a/src/LazyClient.php +++ b/src/LazyClient.php @@ -3,7 +3,6 @@ namespace Clue\React\Redis; use Evenement\EventEmitter; -use React\Promise\PromiseInterface; use React\Stream\Util; /** @@ -14,7 +13,6 @@ class LazyClient extends EventEmitter implements Client private $target; /** @var Factory */ private $factory; - private $ending = false; private $closed = false; private $promise; @@ -25,23 +23,26 @@ public function __construct($target, Factory $factory) { $this->target = $target; $this->factory = $factory; - - $this->on('close', array($this, 'removeAllListeners')); } private function client() { - if ($this->promise instanceof PromiseInterface) { + if ($this->promise !== null) { return $this->promise; } $self = $this; - return $this->promise = $this->factory->createClient($this->target)->then(function (Client $client) use ($self) { + $pending =& $this->promise; + return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending) { + // connection completed => remember only until closed + $client->on('close', function () use (&$pending) { + $pending = null; + }); + Util::forwardEvents( $client, $self, array( - 'error', 'message', 'subscribe', 'unsubscribe', @@ -51,16 +52,10 @@ private function client() ) ); - $client->on('close', array($self, 'close')); - return $client; - }, function (\Exception $e) use ($self) { - // connection failed => emit error if connection is not already closed - if ($self->closed) { - return; - } - $self->emit('error', array($e)); - $self->close(); + }, function (\Exception $e) use (&$pending) { + // connection failed => discard connection attempt + $pending = null; throw $e; }); @@ -87,7 +82,11 @@ public function end() return; } - return $this->client()->then(function (Client $client) { + $that = $this; + return $this->client()->then(function (Client $client) use ($that) { + $client->on('close', function () use ($that) { + $that->close(); + }); $client->end(); }); } diff --git a/tests/LazyClientTest.php b/tests/LazyClientTest.php index 6ea2680..7c7a722 100644 --- a/tests/LazyClientTest.php +++ b/tests/LazyClientTest.php @@ -75,15 +75,15 @@ public function testPingWillRejectWhenUnderlyingClientRejectsPing() $promise->then(null, $this->expectCallableOnceWith($error)); } - public function testPingWillRejectAndEmitErrorAndCloseWhenFactoryRejectsUnderlyingClient() + public function testPingWillRejectAndNotEmitErrorOrCloseWhenFactoryRejectsUnderlyingClient() { $error = new \RuntimeException(); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); - $this->client->on('error', $this->expectCallableOnceWith($error)); - $this->client->on('close', $this->expectCallableOnce()); + $this->client->on('error', $this->expectCallableNever()); + $this->client->on('close', $this->expectCallableNever()); $promise = $this->client->ping(); $deferred->reject($error); @@ -91,6 +91,38 @@ public function testPingWillRejectAndEmitErrorAndCloseWhenFactoryRejectsUnderlyi $promise->then(null, $this->expectCallableOnceWith($error)); } + public function testPingAfterPreviousFactoryRejectsUnderlyingClientWillCreateNewUnderlyingConnection() + { + $error = new \RuntimeException(); + + $deferred = new Deferred(); + $this->factory->expects($this->exactly(2))->method('createClient')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); + + $this->client->ping(); + $deferred->reject($error); + + $this->client->ping(); + } + + public function testPingAfterPreviousUnderlyingClientAlreadyClosedWillCreateNewUnderlyingConnection() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $this->factory->expects($this->exactly(2))->method('createClient')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($client), + new Promise(function () { }) + ); + + $this->client->ping(); + $client->emit('close'); + + $this->client->ping(); + } + public function testPingAfterCloseWillRejectWithoutCreatingUnderlyingConnection() { $this->factory->expects($this->never())->method('createClient'); @@ -144,6 +176,7 @@ public function testCloseAfterPingWillEmitCloseWithoutErrorWhenUnderlyingClientC public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlreadyResolved() { $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $client->expects($this->once())->method('close'); $deferred = new Deferred(); @@ -174,11 +207,31 @@ public function testEndAfterPingWillEndUnderlyingClient() $this->client->end(); } - public function testEmitsErrorEventWhenUnderlyingClientEmitsError() + public function testEndAfterPingWillCloseClientWhenUnderlyingClientEmitsClose() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'end'))->getMock(); + //$client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('end'); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + + $this->client->on('close', $this->expectCallableOnce()); + $this->client->end(); + + $client->emit('close'); + } + + public function testEmitsNoErrorEventWhenUnderlyingClientEmitsError() { $error = new \RuntimeException(); - $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -186,13 +239,14 @@ public function testEmitsErrorEventWhenUnderlyingClientEmitsError() $this->client->ping(); $deferred->resolve($client); - $this->client->on('error', $this->expectCallableOnceWith($error)); + $this->client->on('error', $this->expectCallableNever()); $client->emit('error', array($error)); } - public function testEmitsCloseEventWhenUnderlyingClientEmitsClose() + public function testEmitsNoCloseEventWhenUnderlyingClientEmitsClose() { - $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -200,13 +254,14 @@ public function testEmitsCloseEventWhenUnderlyingClientEmitsClose() $this->client->ping(); $deferred->resolve($client); - $this->client->on('close', $this->expectCallableOnce()); + $this->client->on('close', $this->expectCallableNever()); $client->emit('close'); } public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubChannel() { - $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); From 5e4a75c929ee54e5d176607b200eb6a4187c9a02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 27 Feb 2019 12:35:01 +0100 Subject: [PATCH 4/6] Automatically send unsubscribe/punsubscribe events for PubSub channels --- README.md | 7 +++++++ examples/subscribe.php | 16 ++++++++++++++++ src/LazyClient.php | 28 +++++++++++++++++++++++++++- tests/LazyClientTest.php | 30 ++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e0553a4..c617914 100644 --- a/README.md +++ b/README.md @@ -233,6 +233,13 @@ outstanding commands and will return to the initial "idle" state. This means that you can keep sending additional commands at a later time which will again try to open the underlying connection. +If the underlying database connection drops while using PubSub channels +(see `SUBSCRIBE` and `PSUBSCRIBE` commands), it will automatically send the +appropriate `unsubscribe` and `punsubscribe` events for all currently active +channel and pattern subscriptions. This allows you to react to these +events and restore your subscriptions by creating a new underlying +connection with the above commands. + Note that creating the underlying connection will be deferred until the first request is invoked. Accordingly, any eventual connection issues will be detected once this instance is first used. You can use the diff --git a/examples/subscribe.php b/examples/subscribe.php index 1c17741..87695ff 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -12,10 +12,26 @@ $client = $factory->createLazyClient('localhost'); $client->subscribe($channel)->then(function () { echo 'Now subscribed to channel ' . PHP_EOL; +}, function (Exception $e) { + echo 'Unable to subscribe: ' . $e->getMessage() . PHP_EOL; }); $client->on('message', function ($channel, $message) { echo 'Message on ' . $channel . ': ' . $message . PHP_EOL; }); +// automatically re-subscribe to channel on connection issues +$client->on('unsubscribe', function ($channel) use ($client, $loop) { + echo 'Unsubscribed from ' . $channel . PHP_EOL; + + $loop->addPeriodicTimer(2.0, function ($timer) use ($client, $channel, $loop){ + $client->subscribe($channel)->then(function () use ($timer, $loop) { + echo 'Now subscribed again' . PHP_EOL; + $loop->cancelTimer($timer); + }, function (Exception $e) { + echo 'Unable to subscribe again: ' . $e->getMessage() . PHP_EOL; + }); + }); +}); + $loop->run(); diff --git a/src/LazyClient.php b/src/LazyClient.php index 227e103..8270310 100644 --- a/src/LazyClient.php +++ b/src/LazyClient.php @@ -35,8 +35,34 @@ private function client() $pending =& $this->promise; return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending) { // connection completed => remember only until closed - $client->on('close', function () use (&$pending) { + $subscribed = array(); + $psubscribed = array(); + $client->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed) { $pending = null; + + // foward unsubscribe/punsubscribe events when underlying connection closes + $n = count($subscribed); + foreach ($subscribed as $channel => $_) { + $self->emit('unsubscribe', array($channel, --$n)); + } + $n = count($psubscribed); + foreach ($psubscribed as $pattern => $_) { + $self->emit('punsubscribe', array($pattern, --$n)); + } + }); + + // keep track of all channels and patterns this connection is subscribed to + $client->on('subscribe', function ($channel) use (&$subscribed) { + $subscribed[$channel] = true; + }); + $client->on('psubscribe', function ($pattern) use (&$psubscribed) { + $psubscribed[$pattern] = true; + }); + $client->on('unsubscribe', function ($channel) use (&$subscribed) { + unset($subscribed[$channel]); + }); + $client->on('punsubscribe', function ($pattern) use (&$psubscribed) { + unset($psubscribed[$pattern]); }); Util::forwardEvents( diff --git a/tests/LazyClientTest.php b/tests/LazyClientTest.php index 7c7a722..98d05d3 100644 --- a/tests/LazyClientTest.php +++ b/tests/LazyClientTest.php @@ -272,4 +272,34 @@ public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubCh $this->client->on('message', $this->expectCallableOnce()); $client->emit('message', array('foo', 'bar')); } + + public function testEmitsUnsubscribeAndPunsubscribeEventsWhenUnderlyingClientClosesWhileUsingPubSubChannel() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(6))->method('__call')->willReturn(\React\Promise\resolve()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->client->subscribe('foo'); + $client->emit('subscribe', array('foo', 1)); + + $this->client->subscribe('bar'); + $client->emit('subscribe', array('bar', 2)); + + $this->client->unsubscribe('bar'); + $client->emit('unsubscribe', array('bar', 1)); + + $this->client->psubscribe('foo*'); + $client->emit('psubscribe', array('foo*', 1)); + + $this->client->psubscribe('bar*'); + $client->emit('psubscribe', array('bar*', 2)); + + $this->client->punsubscribe('bar*'); + $client->emit('punsubscribe', array('bar*', 1)); + + $this->client->on('unsubscribe', $this->expectCallableOnce()); + $this->client->on('punsubscribe', $this->expectCallableOnce()); + $client->emit('close'); + } } From 64bd30dd3bfce7d87b63c8d9f97b08e090fafc3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 6 Mar 2019 10:56:49 +0100 Subject: [PATCH 5/6] Implement "idle" timeout to close underlying connection when unused --- README.md | 27 +++++-- composer.json | 2 +- src/Factory.php | 2 +- src/LazyClient.php | 79 +++++++++++++++++++-- tests/FunctionalTest.php | 24 +++++++ tests/LazyClientTest.php | 150 +++++++++++++++++++++++++++++++++++++-- 6 files changed, 267 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index c617914..91f2ad9 100644 --- a/README.md +++ b/README.md @@ -215,9 +215,9 @@ This method immediately returns a "virtual" connection implementing the Internally, it lazily creates the underlying database connection only on demand once the first request is invoked on this instance and will queue all outstanding requests until the underlying connection is ready. -Additionally, it will keep track of this underlying connection and will -create a new underlying connection on demand when the current connection -is lost. +Additionally, it will only keep this underlying connection in an "idle" state +for 60s by default and will automatically close the underlying connection when +it is no longer needed. From a consumer side this means that you can start sending commands to the database right away while the underlying connection may still be @@ -231,7 +231,9 @@ to deal with its async resolution. If the underlying database connection fails, it will reject all outstanding commands and will return to the initial "idle" state. This means that you can keep sending additional commands at a later time which -will again try to open the underlying connection. +will again try to open a new underlying connection. Note that this may +require special care if you're using transactions (`MULTI`/`EXEC`) that are kept +open for longer than the idle period. If the underlying database connection drops while using PubSub channels (see `SUBSCRIBE` and `PSUBSCRIBE` commands), it will automatically send the @@ -245,8 +247,8 @@ first request is invoked. Accordingly, any eventual connection issues will be detected once this instance is first used. You can use the `end()` method to ensure that the "virtual" connection will be soft-closed and no further commands can be enqueued. Similarly, calling `end()` on -this instance before invoking any requests will succeed immediately and -will not wait for an actual underlying connection. +this instance when not currently connected will succeed immediately and +will not have to wait for an actual underlying connection. Depending on your particular use case, you may prefer this method or the underlying `createClient()` which resolves with a promise. For many @@ -312,6 +314,19 @@ in seconds (or use a negative number to not apply a timeout) like this: $factory->createLazyClient('localhost?timeout=0.5'); ``` +By default, this method will keep "idle" connection open for 60s and will +then end the underlying connection. The next request after an "idle" +connection ended will automatically create a new underlying connection. +This ensure you always get a "fresh" connection and as such should not be +confused with a "keepalive" or "heartbeat" mechanism, as this will not +actively try to probe the connection. You can explicitly pass a custom +idle timeout value in seconds (or use a negative number to not apply a +timeout) like this: + +```php +$factory->createLazyClient('localhost?idle=0.1'); +``` + ### Client The `Client` is responsible for exchanging messages with Redis diff --git a/composer.json b/composer.json index b2b549e..a2346c4 100644 --- a/composer.json +++ b/composer.json @@ -14,7 +14,7 @@ "php": ">=5.3", "clue/redis-protocol": "0.3.*", "evenement/evenement": "^3.0 || ^2.0 || ^1.0", - "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3", + "react/event-loop": "^1.0 || ^0.5", "react/promise": "^2.0 || ^1.1", "react/promise-timer": "^1.5", "react/socket": "^1.1" diff --git a/src/Factory.php b/src/Factory.php index c49f33b..d9491a5 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -123,7 +123,7 @@ function ($error) use ($client) { */ public function createLazyClient($target) { - return new LazyClient($target, $this); + return new LazyClient($target, $this, $this->loop); } /** diff --git a/src/LazyClient.php b/src/LazyClient.php index 8270310..9db3d12 100644 --- a/src/LazyClient.php +++ b/src/LazyClient.php @@ -4,6 +4,7 @@ use Evenement\EventEmitter; use React\Stream\Util; +use React\EventLoop\LoopInterface; /** * @internal @@ -16,13 +17,25 @@ class LazyClient extends EventEmitter implements Client private $closed = false; private $promise; + private $loop; + private $idlePeriod = 60.0; + private $idleTimer; + private $pending = 0; + /** * @param $target */ - public function __construct($target, Factory $factory) + public function __construct($target, Factory $factory, LoopInterface $loop) { + $args = array(); + \parse_str(\parse_url($target, \PHP_URL_QUERY), $args); + if (isset($args['idle'])) { + $this->idlePeriod = (float)$args['idle']; + } + $this->target = $target; $this->factory = $factory; + $this->loop = $loop; } private function client() @@ -33,11 +46,13 @@ private function client() $self = $this; $pending =& $this->promise; - return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending) { + $idleTimer=& $this->idleTimer; + $loop = $this->loop; + return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending, &$idleTimer, $loop) { // connection completed => remember only until closed $subscribed = array(); $psubscribed = array(); - $client->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed) { + $client->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed, &$idleTimer, $loop) { $pending = null; // foward unsubscribe/punsubscribe events when underlying connection closes @@ -49,6 +64,11 @@ private function client() foreach ($psubscribed as $pattern => $_) { $self->emit('punsubscribe', array($pattern, --$n)); } + + if ($idleTimer !== null) { + $loop->cancelTimer($idleTimer); + $idleTimer = null; + } }); // keep track of all channels and patterns this connection is subscribed to @@ -93,8 +113,19 @@ public function __call($name, $args) return \React\Promise\reject(new \RuntimeException('Connection closed')); } - return $this->client()->then(function (Client $client) use ($name, $args) { - return \call_user_func_array(array($client, $name), $args); + $that = $this; + return $this->client()->then(function (Client $client) use ($name, $args, $that) { + $that->awake(); + return \call_user_func_array(array($client, $name), $args)->then( + function ($result) use ($that) { + $that->idle(); + return $result; + }, + function ($error) use ($that) { + $that->idle(); + throw $error; + } + ); }); } @@ -134,7 +165,45 @@ public function close() $this->promise = null; } + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + $this->emit('close'); $this->removeAllListeners(); } + + /** + * @internal + */ + public function awake() + { + ++$this->pending; + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + } + + /** + * @internal + */ + public function idle() + { + --$this->pending; + + if ($this->pending < 1 && $this->idlePeriod >= 0) { + $idleTimer =& $this->idleTimer; + $promise =& $this->promise; + $idleTimer = $this->loop->addTimer($this->idlePeriod, function () use (&$idleTimer, &$promise) { + $promise->then(function (Client $client) { + $client->close(); + }); + $promise = null; + $idleTimer = null; + }); + } + } } diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index 56f1867..b8b722b 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -51,6 +51,30 @@ public function testPingLazy() $this->assertEquals('PONG', $ret); } + /** + * @doesNotPerformAssertions + */ + public function testPingLazyWillNotBlockLoopWhenIdleTimeIsSmall() + { + $client = $this->factory->createLazyClient($this->uri . '?idle=0'); + + $client->ping(); + + $this->loop->run(); + } + + /** + * @doesNotPerformAssertions + */ + public function testLazyClientWithoutCommandsWillNotBlockLoop() + { + $client = $this->factory->createLazyClient($this->uri); + + $this->loop->run(); + + unset($client); + } + public function testMgetIsNotInterpretedAsSubMessage() { $client = $this->createClient($this->uri); diff --git a/tests/LazyClientTest.php b/tests/LazyClientTest.php index 98d05d3..3f05895 100644 --- a/tests/LazyClientTest.php +++ b/tests/LazyClientTest.php @@ -18,13 +18,15 @@ class LazyClientTest extends TestCase { private $factory; + private $loop; private $client; public function setUp() { $this->factory = $this->getMockBuilder('Clue\React\Redis\Factory')->disableOriginalConstructor()->getMock(); + $this->loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $this->client = new LazyClient('localhost', $this->factory); + $this->client = new LazyClient('localhost', $this->factory, $this->loop); } public function testPingWillCreateUnderlyingClientAndReturnPendingPromise() @@ -32,6 +34,8 @@ public function testPingWillCreateUnderlyingClientAndReturnPendingPromise() $promise = new Promise(function () { }); $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + $this->loop->expects($this->never())->method('addTimer'); + $promise = $this->client->ping(); $promise->then($this->expectCallableNever()); @@ -46,7 +50,7 @@ public function testPingTwiceWillCreateOnceUnderlyingClient() $this->client->ping(); } - public function testPingWillResolveWhenUnderlyingClientResolvesPing() + public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleTimer() { $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); @@ -54,13 +58,51 @@ public function testPingWillResolveWhenUnderlyingClientResolvesPing() $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + $this->loop->expects($this->once())->method('addTimer')->with(60.0, $this->anything()); + $promise = $this->client->ping(); $deferred->resolve($client); $promise->then($this->expectCallableOnceWith('PONG')); } - public function testPingWillRejectWhenUnderlyingClientRejectsPing() + public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleTimerWithIdleTimeFromQueryParam() + { + $this->client = new LazyClient('localhost?idle=10', $this->factory, $this->loop); + + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->loop->expects($this->once())->method('addTimer')->with(10.0, $this->anything()); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then($this->expectCallableOnceWith('PONG')); + } + + public function testPingWillResolveWhenUnderlyingClientResolvesPingAndNotStartIdleTimerWhenIdleParamIsNegative() + { + $this->client = new LazyClient('localhost?idle=-1', $this->factory, $this->loop); + + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->loop->expects($this->never())->method('addTimer'); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then($this->expectCallableOnceWith('PONG')); + } + + public function testPingWillRejectWhenUnderlyingClientRejectsPingAndStartIdleTimer() { $error = new \RuntimeException(); $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); @@ -69,6 +111,8 @@ public function testPingWillRejectWhenUnderlyingClientRejectsPing() $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + $this->loop->expects($this->once())->method('addTimer'); + $promise = $this->client->ping(); $deferred->resolve($client); @@ -133,6 +177,67 @@ public function testPingAfterCloseWillRejectWithoutCreatingUnderlyingConnection( $promise->then(null, $this->expectCallableOnce()); } + public function testPingAfterPingWillNotStartIdleTimerWhenFirstPingResolves() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->loop->expects($this->never())->method('addTimer'); + + $this->client->ping(); + $this->client->ping(); + $deferred->resolve(); + } + + public function testPingAfterPingWillStartAndCancelIdleTimerWhenSecondPingStartsAfterFirstResolves() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $this->client->ping(); + $deferred->resolve(); + $this->client->ping(); + } + + public function testPingFollowedByIdleTimerWillCloseUnderlyingConnectionWithoutCloseEvent() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'close'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); + $client->expects($this->once())->method('close')->willReturn(\React\Promise\resolve()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timeout = null; + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->with($this->anything(), $this->callback(function ($cb) use (&$timeout) { + $timeout = $cb; + return true; + }))->willReturn($timer); + + $this->client->on('close', $this->expectCallableNever()); + + $this->client->ping(); + + $this->assertNotNull($timeout); + $timeout(); + } + public function testCloseWillEmitCloseEventWithoutCreatingUnderlyingClient() { $this->factory->expects($this->never())->method('createClient'); @@ -187,6 +292,24 @@ public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlready $this->client->close(); } + public function testCloseAfterPingWillCancelIdleTimerWhenPingIsAlreadyResolved() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'close'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); + $client->expects($this->once())->method('close'); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $this->client->ping(); + $deferred->resolve(); + $this->client->close(); + } + public function testEndWillCloseClientIfUnderlyingConnectionIsNotPending() { $this->client->on('close', $this->expectCallableOnce()); @@ -210,7 +333,6 @@ public function testEndAfterPingWillEndUnderlyingClient() public function testEndAfterPingWillCloseClientWhenUnderlyingClientEmitsClose() { $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'end'))->getMock(); - //$client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $client->expects($this->once())->method('end'); @@ -258,6 +380,26 @@ public function testEmitsNoCloseEventWhenUnderlyingClientEmitsClose() $client->emit('close'); } + public function testEmitsNoCloseEventButWillCancelIdleTimerWhenUnderlyingConnectionEmitsCloseAfterPingIsAlreadyResolved() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $this->client->on('close', $this->expectCallableNever()); + + $this->client->ping(); + $deferred->resolve(); + + $client->emit('close'); + } + public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubChannel() { $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); From 917585c3b750e37223ae13adb3aaa0592ba61a64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Fri, 8 Mar 2019 14:41:51 +0100 Subject: [PATCH 6/6] Do not enter "idle" state when using PubSub channels --- README.md | 13 +++++++------ src/LazyClient.php | 17 +++++++++++------ tests/LazyClientTest.php | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 91f2ad9..e2c8289 100644 --- a/README.md +++ b/README.md @@ -235,12 +235,13 @@ will again try to open a new underlying connection. Note that this may require special care if you're using transactions (`MULTI`/`EXEC`) that are kept open for longer than the idle period. -If the underlying database connection drops while using PubSub channels -(see `SUBSCRIBE` and `PSUBSCRIBE` commands), it will automatically send the -appropriate `unsubscribe` and `punsubscribe` events for all currently active -channel and pattern subscriptions. This allows you to react to these -events and restore your subscriptions by creating a new underlying -connection with the above commands. +While using PubSub channels (see `SUBSCRIBE` and `PSUBSCRIBE` commands), this client +will never reach an "idle" state and will keep pending forever (or until the +underlying database connection is lost). Additionally, if the underlying +database connection drops, it will automatically send the appropriate `unsubscribe` +and `punsubscribe` events for all currently active channel and pattern subscriptions. +This allows you to react to these events and restore your subscriptions by +creating a new underlying connection repeating the above commands again. Note that creating the underlying connection will be deferred until the first request is invoked. Accordingly, any eventual connection issues diff --git a/src/LazyClient.php b/src/LazyClient.php index 9db3d12..17f80c0 100644 --- a/src/LazyClient.php +++ b/src/LazyClient.php @@ -22,6 +22,9 @@ class LazyClient extends EventEmitter implements Client private $idleTimer; private $pending = 0; + private $subscribed = array(); + private $psubscribed = array(); + /** * @param $target */ @@ -47,11 +50,11 @@ private function client() $self = $this; $pending =& $this->promise; $idleTimer=& $this->idleTimer; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; $loop = $this->loop; - return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending, &$idleTimer, $loop) { + return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending, &$idleTimer, &$subscribed, &$psubscribed, $loop) { // connection completed => remember only until closed - $subscribed = array(); - $psubscribed = array(); $client->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed, &$idleTimer, $loop) { $pending = null; @@ -64,6 +67,8 @@ private function client() foreach ($psubscribed as $pattern => $_) { $self->emit('punsubscribe', array($pattern, --$n)); } + $subscribed = array(); + $psubscribed = array(); if ($idleTimer !== null) { $loop->cancelTimer($idleTimer); @@ -194,15 +199,15 @@ public function idle() { --$this->pending; - if ($this->pending < 1 && $this->idlePeriod >= 0) { + if ($this->pending < 1 && $this->idlePeriod >= 0 && !$this->subscribed && !$this->psubscribed) { $idleTimer =& $this->idleTimer; $promise =& $this->promise; $idleTimer = $this->loop->addTimer($this->idlePeriod, function () use (&$idleTimer, &$promise) { $promise->then(function (Client $client) { $client->close(); }); - $promise = null; - $idleTimer = null; + $promise = null; + $idleTimer = null; }); } } diff --git a/tests/LazyClientTest.php b/tests/LazyClientTest.php index 3f05895..0d374d8 100644 --- a/tests/LazyClientTest.php +++ b/tests/LazyClientTest.php @@ -444,4 +444,43 @@ public function testEmitsUnsubscribeAndPunsubscribeEventsWhenUnderlyingClientClo $this->client->on('punsubscribe', $this->expectCallableOnce()); $client->emit('close'); } + + public function testSubscribeWillResolveWhenUnderlyingClientResolvesSubscribeAndNotStartIdleTimerWithIdleDueToSubscription() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->with('subscribe')->willReturn($deferred->promise()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->loop->expects($this->never())->method('addTimer'); + + $promise = $this->client->subscribe('foo'); + $client->emit('subscribe', array('foo', 1)); + $deferred->resolve(array('subscribe', 'foo', 1)); + + $promise->then($this->expectCallableOnceWith(array('subscribe', 'foo', 1))); + } + + public function testUnsubscribeAfterSubscribeWillResolveWhenUnderlyingClientResolvesUnsubscribeAndStartIdleTimerWhenSubscriptionStopped() + { + $deferredSubscribe = new Deferred(); + $deferredUnsubscribe = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls($deferredSubscribe->promise(), $deferredUnsubscribe->promise()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->loop->expects($this->once())->method('addTimer'); + + $promise = $this->client->subscribe('foo'); + $client->emit('subscribe', array('foo', 1)); + $deferredSubscribe->resolve(array('subscribe', 'foo', 1)); + $promise->then($this->expectCallableOnceWith(array('subscribe', 'foo', 1))); + + $promise = $this->client->unsubscribe('foo'); + $client->emit('unsubscribe', array('foo', 0)); + $deferredUnsubscribe->resolve(array('unsubscribe', 'foo', 0)); + $promise->then($this->expectCallableOnceWith(array('unsubscribe', 'foo', 0))); + } }