From 0d114759b3529458cb1b7ff61ee5901254dc6f0d Mon Sep 17 00:00:00 2001 From: reasno Date: Fri, 27 Mar 2020 23:08:57 +0800 Subject: [PATCH 1/4] Create GoTaskFactory, who is reponsible for creating GoTask implementation --- .travis.yml | 1 - composer.json | 2 +- example/Client.php | 2 +- publish/gotask.php | 2 +- src/ConfigProvider.php | 5 ++-- .../InvalidGoTaskConnectionException.php | 2 +- src/GoTask.php | 10 ++++++- src/GoTaskConnection.php | 3 +- src/GoTaskConnectionPool.php | 2 +- src/GoTaskFactory.php | 28 ++++++++++++++++++ src/LocalGoTask.php | 29 +++++++++++++------ src/Process/GoTaskProcess.php | 10 ++----- src/RPCFactory.php | 2 +- src/Relay/CoroutineSocketRelay.php | 11 ++----- src/Relay/IPCRelay.php | 15 +++++++--- src/Relay/RelayInterface.php | 2 +- src/Relay/SocketTransporter.php | 11 +++++-- src/RemoteGoTask.php | 2 +- tests/Cases/AbstractTestCase.php | 2 +- tests/Cases/CoroutineSocketTest.php | 29 ++++++++++--------- tests/Cases/IPCRelayTest.php | 22 ++++++++++---- tests/bootstrap.php | 2 +- 22 files changed, 128 insertions(+), 66 deletions(-) create mode 100644 src/GoTaskFactory.php diff --git a/.travis.yml b/.travis.yml index 31eaeab..d0d53d2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,6 @@ before_install: install: - cd $TRAVIS_BUILD_DIR - - go build -o app example/Server.go - bash ./tests/swoole.install.sh - phpenv config-rm xdebug.ini || echo "xdebug not available" - phpenv config-add ./tests/ci.ini diff --git a/composer.json b/composer.json index 44f5c74..98613ce 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ "sort-packages": true }, "scripts": { - "test": "phpunit -c phpunit.xml --colors=always", + "test": "go build -o app example/Server.go && phpunit -c phpunit.xml --colors=always", "analyse": "phpstan analyse --memory-limit 300M -l 0 ./src", "cs-fix": "php-cs-fixer fix $1" }, diff --git a/example/Client.php b/example/Client.php index ccd8fbf..37e2534 100644 --- a/example/Client.php +++ b/example/Client.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/publish/gotask.php b/publish/gotask.php index 0624412..0bb583c 100644 --- a/publish/gotask.php +++ b/publish/gotask.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index f5fac80..afd9794 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -13,7 +13,6 @@ namespace Reasno\GoTask; use Reasno\GoTask\Process\GoTaskProcess; -use Spiral\Goridge\RPC; class ConfigProvider { @@ -21,7 +20,7 @@ public function __invoke(): array { return [ 'dependencies' => [ - GoTask::class => RemoteGoTask::class, + GoTask::class => GoTaskFactory::class, ], 'commands' => [ ], diff --git a/src/Exception/InvalidGoTaskConnectionException.php b/src/Exception/InvalidGoTaskConnectionException.php index 94e3a0e..4cebb4c 100644 --- a/src/Exception/InvalidGoTaskConnectionException.php +++ b/src/Exception/InvalidGoTaskConnectionException.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/GoTask.php b/src/GoTask.php index c2c486a..3dc6f1a 100644 --- a/src/GoTask.php +++ b/src/GoTask.php @@ -1,9 +1,17 @@ get(ConfigInterface::class); + if ($config->get('gotask.socket_address', false)) { + return $container->get(RemoteGoTask::class); + } + return $container->get(LocalGoTask::class); + } +} diff --git a/src/LocalGoTask.php b/src/LocalGoTask.php index 9b0530a..d7e255b 100644 --- a/src/LocalGoTask.php +++ b/src/LocalGoTask.php @@ -1,5 +1,14 @@ process= $process; + $this->process = $process; } - public function call(string $method, $payload, int $flags = 0){ - if ($this->taskChannel == null){ + public function call(string $method, $payload, int $flags = 0) + { + if ($this->taskChannel == null) { $this->taskChannel = new Channel(100); - go(function(){ + go(function () { $this->start(); }); } $returnChannel = new Channel(1); $this->taskChannel->push([$method, $payload, $flags, $returnChannel]); $result = $returnChannel->pop(); - if ($result instanceof \Throwable){ + if ($result instanceof \Throwable) { throw $result; } return $result; @@ -43,7 +54,7 @@ public function call(string $method, $payload, int $flags = 0){ private function start() { - if ($this->process == null){ + if ($this->process == null) { $this->process = ProcessCollector::get('gotask')[0]; } $task = new RPC( @@ -51,10 +62,10 @@ private function start() ); while (true) { [$method, $payload, $flag, $returnChannel] = $this->taskChannel->pop(); - try{ + try { $result = $task->call($method, $payload, $flag); $returnChannel->push($result); - } catch (\Throwable $e){ + } catch (\Throwable $e) { $returnChannel->push($e); } } diff --git a/src/Process/GoTaskProcess.php b/src/Process/GoTaskProcess.php index 5736059..9cefbb9 100644 --- a/src/Process/GoTaskProcess.php +++ b/src/Process/GoTaskProcess.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -15,7 +15,6 @@ use Hyperf\Contract\ConfigInterface; use Hyperf\Process\AbstractProcess; use Psr\Container\ContainerInterface; -use Swoole\Atomic; class GoTaskProcess extends AbstractProcess { @@ -23,10 +22,6 @@ class GoTaskProcess extends AbstractProcess * @var string */ public $name = 'gotask'; - /** - * @var Atomic - */ - public static $taskPid; /** * @var ConfigInterface @@ -37,7 +32,7 @@ public function __construct(ContainerInterface $container) { parent::__construct($container); $this->config = $container->get(ConfigInterface::class); - self::$taskPid = new Atomic(); + $this->redirectStdinStdout = empty($this->config->get('gotask.socket_address')) ?? false; } public function isEnable(): bool @@ -53,7 +48,6 @@ public function handle(): void $executable = $this->config->get('gotask.executable', BASE_PATH . '/app'); $address = $this->config->get('gotask.socket_address', '/tmp/gotask.sock'); $args = $this->config->get('gotask.args', []); - self::$taskPid->set($this->process->pid); $argArr = ['-address', $address]; array_push($argArr, ...$args); $this->process->exec($executable, $argArr); diff --git a/src/RPCFactory.php b/src/RPCFactory.php index dba01d3..8322603 100644 --- a/src/RPCFactory.php +++ b/src/RPCFactory.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/Relay/CoroutineSocketRelay.php b/src/Relay/CoroutineSocketRelay.php index 7388a29..6598fc0 100644 --- a/src/Relay/CoroutineSocketRelay.php +++ b/src/Relay/CoroutineSocketRelay.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -97,31 +97,24 @@ public function __toString(): string return "unix://{$this->address}"; } - /** - * @return string - */ public function getAddress(): string { return $this->address; } /** - * @return int|null + * @return null|int */ public function getPort() { return $this->port; } - /** - * @return int - */ public function getType(): int { return $this->type; } - /** * Ensure socket connection. Returns true if socket successfully connected * or have already been connected. diff --git a/src/Relay/IPCRelay.php b/src/Relay/IPCRelay.php index ba1d30a..eee5a33 100644 --- a/src/Relay/IPCRelay.php +++ b/src/Relay/IPCRelay.php @@ -1,5 +1,14 @@ process->exportSocket(); } } - diff --git a/src/Relay/RelayInterface.php b/src/Relay/RelayInterface.php index 4202069..bf497df 100644 --- a/src/Relay/RelayInterface.php +++ b/src/Relay/RelayInterface.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/Relay/SocketTransporter.php b/src/Relay/SocketTransporter.php index 8b9f191..5939a31 100644 --- a/src/Relay/SocketTransporter.php +++ b/src/Relay/SocketTransporter.php @@ -1,9 +1,17 @@ p = new Process(function (Process $process) { + $this->p = new Process(function (Process $process) { $process->exec(__DIR__ . '/../../app', ['-address', self::UNIX_SOCKET]); }); $this->p->start(); @@ -57,7 +58,6 @@ public function tearDown() Process::kill($this->p->pid); } - public function testOnCoroutine() { \Swoole\Coroutine\run(function () { @@ -68,8 +68,8 @@ public function testOnCoroutine() }); } - public function testConcurrently(){ - + public function testConcurrently() + { $container = new Container(new DefinitionSource([], new ScanConfig())); $container->set(ConfigInterface::class, new Config([ 'gotask' => [ @@ -83,11 +83,13 @@ public function testConcurrently(){ 'heartbeat' => -1, 'max_idle_time' => (float) env('GOTASK_MAX_IDLE_TIME', 60), ], - ] + ], ])); - $container->define(RPC::class,RPCFactory::class); - $container->define(StdoutLoggerInterface::class, - StdoutLogger::class); + $container->define(RPC::class, RPCFactory::class); + $container->define( + StdoutLoggerInterface::class, + StdoutLogger::class + ); ApplicationContext::setContainer($container); \Swoole\Coroutine\run(function () { @@ -96,12 +98,12 @@ public function testConcurrently(){ $wg = new WaitGroup(); $wg->add(); $this->baseExample($task); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); $wg->add(); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); @@ -109,7 +111,8 @@ public function testConcurrently(){ }); } - public function baseExample($task){ + public function baseExample($task) + { $this->assertEquals( 'Hello, Reasno!', $task->call('App.HelloString', 'Reasno') diff --git a/tests/Cases/IPCRelayTest.php b/tests/Cases/IPCRelayTest.php index 8d90ac1..ba738b2 100644 --- a/tests/Cases/IPCRelayTest.php +++ b/tests/Cases/IPCRelayTest.php @@ -1,18 +1,29 @@ add(); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); $wg->add(); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); @@ -66,7 +77,8 @@ public function testConcurrently() }); } - public function baseExample($task){ + public function baseExample($task) + { $this->assertEquals( 'Hello, Reasno!', $task->call('App.HelloString', 'Reasno') diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 01edac3..26133e8 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask From e6561ebfd489e0f4492b9c19c0150e65aa639422 Mon Sep 17 00:00:00 2001 From: reasno Date: Fri, 27 Mar 2020 23:13:38 +0800 Subject: [PATCH 2/4] Use standard interface in README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8fddeb7..aa07d40 100644 --- a/README.md +++ b/README.md @@ -101,14 +101,14 @@ declare(strict_types=1); namespace App\Controller; -use Reasno\GoTask\RemoteGoTask; +use Reasno\GoTask\GoTask; class IndexController extends AbstractController { /** * @return array */ - public function index(RemoteGoTask $task) + public function index(GoTask $task) { return $task->call('App.Hi', ['Swoole is Awesome,', 'So is Go!']); } From 9655ebfd984c9d4ee2aea453085476f85d9f1a1c Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 28 Mar 2020 00:20:27 +0800 Subject: [PATCH 3/4] Add Swoole lock to LocalGoTask.php --- src/ConfigProvider.php | 4 +++ src/Listener/BootApplicationListener.php | 39 ++++++++++++++++++++++++ src/LocalGoTask.php | 9 ++++++ tests/Cases/IPCRelayTest.php | 2 ++ 4 files changed, 54 insertions(+) create mode 100644 src/Listener/BootApplicationListener.php diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index afd9794..a45a3c6 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -12,6 +12,7 @@ namespace Reasno\GoTask; +use Reasno\GoTask\Listener\BootApplicationListener; use Reasno\GoTask\Process\GoTaskProcess; class ConfigProvider @@ -27,6 +28,9 @@ public function __invoke(): array 'processes' => [ GoTaskProcess::class, ], + 'listener' => [ + BootApplicationListener::class, + ], 'annotations' => [ 'scan' => [ 'paths' => [ diff --git a/src/Listener/BootApplicationListener.php b/src/Listener/BootApplicationListener.php new file mode 100644 index 0000000..ca179b9 --- /dev/null +++ b/src/Listener/BootApplicationListener.php @@ -0,0 +1,39 @@ +taskChannel->pop(); + self::$lock->lock(); try { $result = $task->call($method, $payload, $flag); $returnChannel->push($result); } catch (\Throwable $e) { $returnChannel->push($e); + } finally { + self::$lock->unlock(); } } } diff --git a/tests/Cases/IPCRelayTest.php b/tests/Cases/IPCRelayTest.php index ba738b2..e33e25d 100644 --- a/tests/Cases/IPCRelayTest.php +++ b/tests/Cases/IPCRelayTest.php @@ -18,6 +18,7 @@ use Reasno\GoTask\Relay\RelayInterface; use Spiral\Goridge\Exceptions\ServiceException; use Spiral\Goridge\RPC; +use Swoole\Lock; use Swoole\Process; /** @@ -33,6 +34,7 @@ class IPCRelayTest extends AbstractTestCase public function setUp() { + LocalGoTask::$lock = new Lock(SWOOLE_SEM); $this->p = new Process(function (Process $process) { $process->exec(__DIR__ . '/../../app', []); }, true); From f399b7015c6af5b6ed2d871ba25ed594f834ebbc Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 28 Mar 2020 19:16:15 +0800 Subject: [PATCH 4/4] Rename classes. --- README.md | 7 +-- cmd/app.go | 2 +- example/Client.php | 21 +++++--- example/Server.go | 2 +- src/GoTask.php | 11 ++++ src/GoTaskConnection.php | 4 +- src/GoTaskFactory.php | 4 +- src/IPC/IPCInterface.php | 17 ++++++ src/IPC/PipeIPC.php | 47 +++++++++++++++++ src/IPC/SocketIPC.php | 54 ++++++++++++++++++++ src/Listener/BootApplicationListener.php | 4 +- src/{LocalGoTask.php => PipeGoTask.php} | 9 ++-- src/{RemoteGoTask.php => SocketGoTask.php} | 2 +- src/{RPCFactory.php => SocketIPCFactory.php} | 16 ++---- tests/Cases/CoroutineSocketTest.php | 8 +-- tests/Cases/IPCRelayTest.php | 6 +-- 16 files changed, 167 insertions(+), 47 deletions(-) create mode 100644 src/IPC/IPCInterface.php create mode 100644 src/IPC/PipeIPC.php create mode 100644 src/IPC/SocketIPC.php rename src/{LocalGoTask.php => PipeGoTask.php} (91%) rename src/{RemoteGoTask.php => SocketGoTask.php} (98%) rename src/{RPCFactory.php => SocketIPCFactory.php} (61%) diff --git a/README.md b/README.md index aa07d40..1e9308f 100644 --- a/README.md +++ b/README.md @@ -53,16 +53,13 @@ func main() { ```php call("App.Hi", "Reasno")); // 打印 [ "hello" => "Reasno" ] }); diff --git a/cmd/app.go b/cmd/app.go index eeb0c8b..e8154b2 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -19,6 +19,6 @@ func (a *App) Hi(name interface{}, r *interface{}) error { func main() { gotask.Register(new(App)) if err := gotask.Run(); err != nil { - log.Fatal(err) + log.Fatalln(err) } } diff --git a/example/Client.php b/example/Client.php index 37e2534..273313c 100644 --- a/example/Client.php +++ b/example/Client.php @@ -10,17 +10,24 @@ * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */ -use Reasno\GoTask\Relay\CoroutineSocketRelay; -use Spiral\Goridge\RelayInterface; -use Spiral\Goridge\RPC; +use Reasno\GoTask\GoTask; +use Reasno\GoTask\IPC\SocketIPC; +use Swoole\Process; use function Swoole\Coroutine\run; require '../vendor/autoload.php'; +const ADDR = '127.0.0.1:6001'; + +$process = new Process(function (Process $process) { + $process->exec(__DIR__ . '/../app', ['-address', ADDR]); +}); +$process->start(); + +sleep(1); + run(function () { - $task = new RPC( - new CoroutineSocketRelay('127.0.0.1', 6001) - ); + $task = new SocketIPC(ADDR); var_dump($task->call('App.HelloString', 'Reasno')); var_dump($task->call('App.HelloInterface', ['jack', 'jill'])); var_dump($task->call('App.HelloStruct', [ @@ -28,7 +35,7 @@ 'lastName' => 'James', 'id' => 23, ])); - var_dump($task->call('App.HelloBytes', base64_encode('My Bytes'), RelayInterface::PAYLOAD_RAW)); + var_dump($task->call('App.HelloBytes', base64_encode('My Bytes'), GoTask::PAYLOAD_RAW)); try { $task->call('App.HelloError', 'Reasno'); } catch (\Throwable $e) { diff --git a/example/Server.go b/example/Server.go index 123d152..d70aefd 100644 --- a/example/Server.go +++ b/example/Server.go @@ -51,6 +51,6 @@ func (a *App) HelloError(name interface{}, r *interface{}) error { func main() { gotask.Register(new(App)) if err := gotask.Run(); err != nil { - log.Fatal(err) + log.Fatalln(err) } } diff --git a/src/GoTask.php b/src/GoTask.php index 3dc6f1a..44efd5d 100644 --- a/src/GoTask.php +++ b/src/GoTask.php @@ -14,5 +14,16 @@ interface GoTask { + /** Payload flags.*/ + const PAYLOAD_NONE = 2; + const PAYLOAD_RAW = 4; + const PAYLOAD_ERROR = 8; + const PAYLOAD_CONTROL = 16; + /** + * @param mixed $payload an binary data or array of arguments for complex types + * @param int $flags payload control flags + * + * @return mixed + */ public function call(string $method, $payload, int $flags = 0); } diff --git a/src/GoTaskConnection.php b/src/GoTaskConnection.php index 3d9190a..41f6259 100644 --- a/src/GoTaskConnection.php +++ b/src/GoTaskConnection.php @@ -32,11 +32,11 @@ class GoTaskConnection extends Connection implements ConnectionInterface private $connection; /** - * @var RPCFactory + * @var SocketIPCFactory */ private $factory; - public function __construct(ContainerInterface $container, Pool $pool, RPCFactory $factory) + public function __construct(ContainerInterface $container, Pool $pool, SocketIPCFactory $factory) { parent::__construct($container, $pool); $this->factory = $factory; diff --git a/src/GoTaskFactory.php b/src/GoTaskFactory.php index f773089..af8bd3b 100644 --- a/src/GoTaskFactory.php +++ b/src/GoTaskFactory.php @@ -21,8 +21,8 @@ public function __invoke(ContainerInterface $container) { $config = $container->get(ConfigInterface::class); if ($config->get('gotask.socket_address', false)) { - return $container->get(RemoteGoTask::class); + return $container->get(SocketGoTask::class); } - return $container->get(LocalGoTask::class); + return $container->get(PipeGoTask::class); } } diff --git a/src/IPC/IPCInterface.php b/src/IPC/IPCInterface.php new file mode 100644 index 0000000..59a7880 --- /dev/null +++ b/src/IPC/IPCInterface.php @@ -0,0 +1,17 @@ +handler = new RPC( + new IPCRelay($process) + ); + } + + public function __call($name, $arguments) + { + $this->handler->{$name}(...$arguments); + } + + public function call(string $method, $payload, int $flags = 0) + { + return $this->handler->call($method, $payload, $flags); + } +} diff --git a/src/IPC/SocketIPC.php b/src/IPC/SocketIPC.php new file mode 100644 index 0000000..aae2e0e --- /dev/null +++ b/src/IPC/SocketIPC.php @@ -0,0 +1,54 @@ +handler = new RPC( + new CoroutineSocketRelay($split[0], 0, CoroutineSocketRelay::SOCK_UNIX) + ); + return; + } + [$host, $port] = $split; + $this->handler = new RPC( + new CoroutineSocketRelay($host, (int) $port, CoroutineSocketRelay::SOCK_TCP) + ); + } + + public function __call($name, $arguments) + { + $this->handler->{$name}(...$arguments); + } + + public function call(string $method, $payload, int $flags = 0) + { + return $this->handler->call($method, $payload, $flags); + } +} diff --git a/src/Listener/BootApplicationListener.php b/src/Listener/BootApplicationListener.php index ca179b9..f9f8cb6 100644 --- a/src/Listener/BootApplicationListener.php +++ b/src/Listener/BootApplicationListener.php @@ -13,7 +13,7 @@ namespace Reasno\GoTask\Listener; use Hyperf\Framework\Event\BootApplication; -use Reasno\GoTask\LocalGoTask; +use Reasno\GoTask\PipeGoTask; use Swoole\Lock; class BootApplicationListener @@ -34,6 +34,6 @@ public function listen(): array */ public function process(object $event) { - LocalGoTask::$lock = new Lock(SWOOLE_SEM); + PipeGoTask::$lock = new Lock(SWOOLE_SEM); } } diff --git a/src/LocalGoTask.php b/src/PipeGoTask.php similarity index 91% rename from src/LocalGoTask.php rename to src/PipeGoTask.php index e462eee..9f4697a 100644 --- a/src/LocalGoTask.php +++ b/src/PipeGoTask.php @@ -13,13 +13,12 @@ namespace Reasno\GoTask; use Hyperf\Process\ProcessCollector; -use Reasno\GoTask\Relay\IPCRelay; -use Spiral\Goridge\RPC; +use Reasno\GoTask\IPC\PipeIPC; use Swoole\Coroutine\Channel; use Swoole\Lock; use Swoole\Process; -class LocalGoTask implements GoTask +class PipeGoTask implements GoTask { /** * @var Lock @@ -63,9 +62,7 @@ private function start() if ($this->process == null) { $this->process = ProcessCollector::get('gotask')[0]; } - $task = new RPC( - new IPCRelay($this->process) - ); + $task = make(PipeIPC::class, ['process' => $this->process]); while (true) { [$method, $payload, $flag, $returnChannel] = $this->taskChannel->pop(); self::$lock->lock(); diff --git a/src/RemoteGoTask.php b/src/SocketGoTask.php similarity index 98% rename from src/RemoteGoTask.php rename to src/SocketGoTask.php index 4770ef9..89fd8c3 100644 --- a/src/RemoteGoTask.php +++ b/src/SocketGoTask.php @@ -15,7 +15,7 @@ use Hyperf\Utils\Context; use Reasno\GoTask\Exception\InvalidGoTaskConnectionException; -class RemoteGoTask implements GoTask +class SocketGoTask implements GoTask { /** * @var GoTaskConnectionPool diff --git a/src/RPCFactory.php b/src/SocketIPCFactory.php similarity index 61% rename from src/RPCFactory.php rename to src/SocketIPCFactory.php index 8322603..e1068a9 100644 --- a/src/RPCFactory.php +++ b/src/SocketIPCFactory.php @@ -14,10 +14,9 @@ use Hyperf\Contract\ConfigInterface; use Psr\Container\ContainerInterface; -use Reasno\GoTask\Relay\CoroutineSocketRelay; -use Spiral\Goridge\RPC; +use Reasno\GoTask\IPC\SocketIPC; -class RPCFactory +class SocketIPCFactory { /** * @var ContainerInterface @@ -33,15 +32,6 @@ public function make() { $config = $this->container->get(ConfigInterface::class); $address = $config->get('gotask.socket_address', '/tmp/gotask.sock'); - $split = explode(':', $address, 2); - if (count($split) === 1) { - return new RPC( - new CoroutineSocketRelay($split[0], 0, CoroutineSocketRelay::SOCK_UNIX) - ); - } - [$host, $port] = $split; - return new RPC( - new CoroutineSocketRelay($host, (int) $port, CoroutineSocketRelay::SOCK_TCP) - ); + return make(SocketIPC::class, ['address' => $address]); } } diff --git a/tests/Cases/CoroutineSocketTest.php b/tests/Cases/CoroutineSocketTest.php index f6e8f45..4990d1d 100644 --- a/tests/Cases/CoroutineSocketTest.php +++ b/tests/Cases/CoroutineSocketTest.php @@ -23,8 +23,8 @@ use Hyperf\Utils\WaitGroup; use Reasno\GoTask\Relay\CoroutineSocketRelay; use Reasno\GoTask\Relay\RelayInterface; -use Reasno\GoTask\RemoteGoTask; -use Reasno\GoTask\RPCFactory; +use Reasno\GoTask\SocketGoTask; +use Reasno\GoTask\SocketIPCFactory; use Spiral\Goridge\Exceptions\ServiceException; use Spiral\Goridge\RPC; use Swoole\Process; @@ -85,7 +85,7 @@ public function testConcurrently() ], ], ])); - $container->define(RPC::class, RPCFactory::class); + $container->define(RPC::class, SocketIPCFactory::class); $container->define( StdoutLoggerInterface::class, StdoutLogger::class @@ -94,7 +94,7 @@ public function testConcurrently() \Swoole\Coroutine\run(function () { sleep(1); - $task = make(RemoteGoTask::class); + $task = make(SocketGoTask::class); $wg = new WaitGroup(); $wg->add(); $this->baseExample($task); diff --git a/tests/Cases/IPCRelayTest.php b/tests/Cases/IPCRelayTest.php index e33e25d..1a3b200 100644 --- a/tests/Cases/IPCRelayTest.php +++ b/tests/Cases/IPCRelayTest.php @@ -13,7 +13,7 @@ namespace HyperfTest\Cases; use Hyperf\Utils\WaitGroup; -use Reasno\GoTask\LocalGoTask; +use Reasno\GoTask\PipeGoTask; use Reasno\GoTask\Relay\IPCRelay; use Reasno\GoTask\Relay\RelayInterface; use Spiral\Goridge\Exceptions\ServiceException; @@ -34,7 +34,7 @@ class IPCRelayTest extends AbstractTestCase public function setUp() { - LocalGoTask::$lock = new Lock(SWOOLE_SEM); + PipeGoTask::$lock = new Lock(SWOOLE_SEM); $this->p = new Process(function (Process $process) { $process->exec(__DIR__ . '/../../app', []); }, true); @@ -61,7 +61,7 @@ public function testConcurrently() { \Swoole\Coroutine\run(function () { sleep(1); - $task = make(LocalGoTask::class, [ + $task = make(PipeGoTask::class, [ 'process' => $this->p, ]); $wg = new WaitGroup();