diff --git a/README.md b/README.md index b7f5b38..24bf52e 100644 --- a/README.md +++ b/README.md @@ -399,7 +399,7 @@ $streamingBrowser->get($url)->then(function (ResponseInterface $response) { }); ``` -See also the [stream bandwidth example](examples/91-stream-bandwidth.php) and +See also the [stream download example](examples/91-benchmark-download.php) and the [stream forwarding example](examples/21-stream-forwarding.php). You can invoke the following methods on the message body: diff --git a/examples/91-stream-bandwidth.php b/examples/91-benchmark-download.php similarity index 64% rename from examples/91-stream-bandwidth.php rename to examples/91-benchmark-download.php index 779a314..5d75afc 100644 --- a/examples/91-stream-bandwidth.php +++ b/examples/91-benchmark-download.php @@ -1,14 +1,29 @@ withOptions(array('streaming' => true))->get($url)->then(function (ResponseInterface $response) use ($loop) { echo 'Headers received' . PHP_EOL; - echo Psr7\str($response); + echo RingCentral\Psr7\str($response); $stream = $response->getBody(); if (!$stream instanceof ReadableStreamInterface) { @@ -41,7 +56,7 @@ $time = microtime(true) - $time; - echo "\r" . 'Downloaded ' . $bytes . ' bytes in ' . round($time, 3) . 's => ' . round($bytes / $time / 1024 / 1024, 1) . ' MiB/s' . PHP_EOL; + echo "\r" . 'Downloaded ' . $bytes . ' bytes in ' . round($time, 3) . 's => ' . round($bytes / $time / 1000000, 1) . ' MB/s' . PHP_EOL; }); }, 'printf'); diff --git a/examples/92-benchmark-upload.php b/examples/92-benchmark-upload.php new file mode 100644 index 0000000..0bef763 --- /dev/null +++ b/examples/92-benchmark-upload.php @@ -0,0 +1,125 @@ +chunk = $chunk; + $this->count = $count; + } + + public function pause() + { + $this->paused = true; + } + + public function resume() + { + if (!$this->paused || $this->closed) { + return; + } + + // keep emitting until stream is paused + $this->paused = false; + while ($this->position < $this->count && !$this->paused) { + ++$this->position; + $this->emit('data', array($this->chunk)); + } + + // end once the last chunk has been written + if ($this->position >= $this->count) { + $this->emit('end'); + $this->close(); + } + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + return Util::pipe($this, $dest, $options); + } + + public function isReadable() + { + return !$this->closed; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + $this->count = 0; + $this->paused = true; + $this->emit('close'); + } + + public function getPosition() + { + return $this->position * strlen($this->chunk); + } +} + +$loop = React\EventLoop\Factory::create(); +$client = new Browser($loop); + +$url = isset($argv[1]) ? $argv[1] : 'http://httpbin.org/post'; +$n = isset($argv[2]) ? $argv[2] : 10; +$source = new ChunkRepeater(str_repeat('x', 1000000), $n); +$loop->futureTick(function () use ($source) { + $source->resume(); +}); + +echo 'POSTing ' . $n . ' MB to ' . $url . PHP_EOL; + +$start = microtime(true); +$report = $loop->addPeriodicTimer(0.05, function () use ($source, $start) { + printf("\r%d bytes in %0.3fs...", $source->getPosition(), microtime(true) - $start); +}); + +$client->post($url, array('Content-Length' => $n * 1000000), $source)->then(function (ResponseInterface $response) use ($source, $report, $loop, $start) { + $now = microtime(true); + $loop->cancelTimer($report); + + printf("\r%d bytes in %0.3fs => %.1f MB/s\n", $source->getPosition(), $now - $start, $source->getPosition() / ($now - $start) / 1000000); + + echo rtrim(preg_replace('/x{5,}/','x…', (string) $response->getBody()), PHP_EOL) . PHP_EOL; +}, function ($e) use ($loop, $report) { + $loop->cancelTimer($report); + echo 'Error: ' . $e->getMessage() . PHP_EOL; +}); + +$loop->run();