diff --git a/src/Io/ChunkedEncoder.php b/src/Io/ChunkedEncoder.php new file mode 100644 index 0000000..3b74e0c --- /dev/null +++ b/src/Io/ChunkedEncoder.php @@ -0,0 +1,93 @@ +input = $input; + + $this->input->on('data', array($this, 'handleData')); + $this->input->on('end', array($this, 'handleEnd')); + $this->input->on('error', array($this, 'handleError')); + $this->input->on('close', array($this, 'close')); + } + + public function isReadable() + { + return !$this->closed && $this->input->isReadable(); + } + + public function pause() + { + $this->input->pause(); + } + + public function resume() + { + $this->input->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + return Util::pipe($this, $dest, $options); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + $this->input->close(); + + $this->emit('close'); + $this->removeAllListeners(); + } + + /** @internal */ + public function handleData($data) + { + if ($data !== '') { + $this->emit('data', array( + dechex(strlen($data)) . "\r\n" . $data . "\r\n" + )); + } + } + + /** @internal */ + public function handleError(\Exception $e) + { + $this->emit('error', array($e)); + $this->close(); + } + + /** @internal */ + public function handleEnd() + { + $this->emit('data', array("0\r\n\r\n")); + + if (!$this->closed) { + $this->emit('end'); + $this->close(); + } + } +} diff --git a/src/Io/Sender.php b/src/Io/Sender.php index f105d86..6b99b6f 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -124,19 +124,12 @@ public function send(RequestInterface $request) if ($body instanceof ReadableStreamInterface) { if ($body->isReadable()) { - if ($size !== null) { - // length is known => just write to request - $body->pipe($requestStream); - } else { - // length unknown => apply chunked transfer-encoding - // this should be moved somewhere else obviously - $body->on('data', function ($data) use ($requestStream) { - $requestStream->write(dechex(strlen($data)) . "\r\n" . $data . "\r\n"); - }); - $body->on('end', function() use ($requestStream) { - $requestStream->end("0\r\n\r\n"); - }); + // length unknown => apply chunked transfer-encoding + if ($size === null) { + $body = new ChunkedEncoder($body); } + + $body->pipe($requestStream); } else { // stream is not readable => end request without body $requestStream->end(); diff --git a/tests/FunctionalBrowserTest.php b/tests/FunctionalBrowserTest.php index fe4a5f2..fe24577 100644 --- a/tests/FunctionalBrowserTest.php +++ b/tests/FunctionalBrowserTest.php @@ -301,9 +301,7 @@ public function testPostStreamChunked() $stream = new ThroughStream(); $this->loop->addTimer(0.001, function () use ($stream) { - $stream->emit('data', array('hello world')); - $stream->emit('end'); - $stream->close(); + $stream->end('hello world'); }); $response = Block\await($this->browser->post($this->base . 'post', array(), $stream), $this->loop); diff --git a/tests/Io/ChunkedEncoderTest.php b/tests/Io/ChunkedEncoderTest.php new file mode 100644 index 0000000..c449033 --- /dev/null +++ b/tests/Io/ChunkedEncoderTest.php @@ -0,0 +1,120 @@ +input = new ThroughStream(); + $this->chunkedStream = new ChunkedEncoder($this->input); + } + + public function testChunked() + { + $this->chunkedStream->on('data', $this->expectCallableOnceWith("5\r\nhello\r\n")); + $this->input->emit('data', array('hello')); + } + + public function testEmptyString() + { + $this->chunkedStream->on('data', $this->expectCallableNever()); + $this->input->emit('data', array('')); + } + + public function testBiggerStringToCheckHexValue() + { + $this->chunkedStream->on('data', $this->expectCallableOnceWith("1a\r\nabcdefghijklmnopqrstuvwxyz\r\n")); + $this->input->emit('data', array('abcdefghijklmnopqrstuvwxyz')); + } + + public function testHandleClose() + { + $this->chunkedStream->on('close', $this->expectCallableOnce()); + + $this->input->close(); + + $this->assertFalse($this->chunkedStream->isReadable()); + } + + public function testHandleError() + { + $this->chunkedStream->on('error', $this->expectCallableOnce()); + $this->chunkedStream->on('close', $this->expectCallableOnce()); + + $this->input->emit('error', array(new \RuntimeException())); + + $this->assertFalse($this->chunkedStream->isReadable()); + } + + public function testPauseStream() + { + $input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $input->expects($this->once())->method('pause'); + + $parser = new ChunkedEncoder($input); + $parser->pause(); + } + + public function testResumeStream() + { + $input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $input->expects($this->once())->method('pause'); + + $parser = new ChunkedEncoder($input); + $parser->pause(); + $parser->resume(); + } + + public function testPipeStream() + { + $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + + $ret = $this->chunkedStream->pipe($dest); + + $this->assertSame($dest, $ret); + } + + protected function expectCallableOnce() + { + $mock = $this->createCallableMock(); + $mock + ->expects($this->once()) + ->method('__invoke'); + + return $mock; + } + + protected function expectCallableOnceWith($value) + { + $mock = $this->createCallableMock(); + $mock + ->expects($this->once()) + ->method('__invoke') + ->with($value); + + return $mock; + } + + protected function expectCallableNever() + { + $mock = $this->createCallableMock(); + $mock + ->expects($this->never()) + ->method('__invoke'); + + return $mock; + } + + protected function createCallableMock() + { + return $this->getMockBuilder('stdClass')->setMethods(array('__invoke'))->getMock(); + } +} diff --git a/tests/Io/SenderTest.php b/tests/Io/SenderTest.php index be4eaa9..1deb0f2 100644 --- a/tests/Io/SenderTest.php +++ b/tests/Io/SenderTest.php @@ -111,6 +111,44 @@ public function testSendPostStreamWillAutomaticallySendTransferEncodingChunked() $sender->send($request); } + public function testSendPostStreamWillAutomaticallyPipeChunkEncodeBodyForWriteAndRespectRequestThrottling() + { + $outgoing = $this->getMockBuilder('React\HttpClient\Request')->disableOriginalConstructor()->getMock(); + $outgoing->expects($this->once())->method('isWritable')->willReturn(true); + $outgoing->expects($this->once())->method('write')->with("5\r\nhello\r\n")->willReturn(false); + + $client = $this->getMockBuilder('React\HttpClient\Client')->disableOriginalConstructor()->getMock(); + $client->expects($this->once())->method('request')->willReturn($outgoing); + + $sender = new Sender($client, $this->getMockBuilder('Clue\React\Buzz\Message\MessageFactory')->getMock()); + + $stream = new ThroughStream(); + $request = new Request('POST', 'http://www.google.com/', array(), new ReadableBodyStream($stream)); + $sender->send($request); + + $ret = $stream->write('hello'); + $this->assertFalse($ret); + } + + public function testSendPostStreamWillAutomaticallyPipeChunkEncodeBodyForEnd() + { + $outgoing = $this->getMockBuilder('React\HttpClient\Request')->disableOriginalConstructor()->getMock(); + $outgoing->expects($this->once())->method('isWritable')->willReturn(true); + $outgoing->expects($this->once())->method('write')->with("0\r\n\r\n")->willReturn(false); + $outgoing->expects($this->once())->method('end')->with(null); + + $client = $this->getMockBuilder('React\HttpClient\Client')->disableOriginalConstructor()->getMock(); + $client->expects($this->once())->method('request')->willReturn($outgoing); + + $sender = new Sender($client, $this->getMockBuilder('Clue\React\Buzz\Message\MessageFactory')->getMock()); + + $stream = new ThroughStream(); + $request = new Request('POST', 'http://www.google.com/', array(), new ReadableBodyStream($stream)); + $sender->send($request); + + $stream->end(); + } + public function testSendPostStreamWithExplicitContentLengthWillSendHeaderAsIs() { $client = $this->getMockBuilder('React\HttpClient\Client')->disableOriginalConstructor()->getMock();