diff --git a/.travis.yml b/.travis.yml index 31eaeab..d0d53d2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,6 @@ before_install: install: - cd $TRAVIS_BUILD_DIR - - go build -o app example/Server.go - bash ./tests/swoole.install.sh - phpenv config-rm xdebug.ini || echo "xdebug not available" - phpenv config-add ./tests/ci.ini diff --git a/README.md b/README.md index 8fddeb7..1e9308f 100644 --- a/README.md +++ b/README.md @@ -53,16 +53,13 @@ func main() { ```php call("App.Hi", "Reasno")); // 打印 [ "hello" => "Reasno" ] }); @@ -101,14 +98,14 @@ declare(strict_types=1); namespace App\Controller; -use Reasno\GoTask\RemoteGoTask; +use Reasno\GoTask\GoTask; class IndexController extends AbstractController { /** * @return array */ - public function index(RemoteGoTask $task) + public function index(GoTask $task) { return $task->call('App.Hi', ['Swoole is Awesome,', 'So is Go!']); } diff --git a/cmd/app.go b/cmd/app.go index eeb0c8b..e8154b2 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -19,6 +19,6 @@ func (a *App) Hi(name interface{}, r *interface{}) error { func main() { gotask.Register(new(App)) if err := gotask.Run(); err != nil { - log.Fatal(err) + log.Fatalln(err) } } diff --git a/composer.json b/composer.json index 44f5c74..98613ce 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ "sort-packages": true }, "scripts": { - "test": "phpunit -c phpunit.xml --colors=always", + "test": "go build -o app example/Server.go && phpunit -c phpunit.xml --colors=always", "analyse": "phpstan analyse --memory-limit 300M -l 0 ./src", "cs-fix": "php-cs-fixer fix $1" }, diff --git a/example/Client.php b/example/Client.php index ccd8fbf..273313c 100644 --- a/example/Client.php +++ b/example/Client.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -10,17 +10,24 @@ * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */ -use Reasno\GoTask\Relay\CoroutineSocketRelay; -use Spiral\Goridge\RelayInterface; -use Spiral\Goridge\RPC; +use Reasno\GoTask\GoTask; +use Reasno\GoTask\IPC\SocketIPC; +use Swoole\Process; use function Swoole\Coroutine\run; require '../vendor/autoload.php'; +const ADDR = '127.0.0.1:6001'; + +$process = new Process(function (Process $process) { + $process->exec(__DIR__ . '/../app', ['-address', ADDR]); +}); +$process->start(); + +sleep(1); + run(function () { - $task = new RPC( - new CoroutineSocketRelay('127.0.0.1', 6001) - ); + $task = new SocketIPC(ADDR); var_dump($task->call('App.HelloString', 'Reasno')); var_dump($task->call('App.HelloInterface', ['jack', 'jill'])); var_dump($task->call('App.HelloStruct', [ @@ -28,7 +35,7 @@ 'lastName' => 'James', 'id' => 23, ])); - var_dump($task->call('App.HelloBytes', base64_encode('My Bytes'), RelayInterface::PAYLOAD_RAW)); + var_dump($task->call('App.HelloBytes', base64_encode('My Bytes'), GoTask::PAYLOAD_RAW)); try { $task->call('App.HelloError', 'Reasno'); } catch (\Throwable $e) { diff --git a/example/Server.go b/example/Server.go index 123d152..d70aefd 100644 --- a/example/Server.go +++ b/example/Server.go @@ -51,6 +51,6 @@ func (a *App) HelloError(name interface{}, r *interface{}) error { func main() { gotask.Register(new(App)) if err := gotask.Run(); err != nil { - log.Fatal(err) + log.Fatalln(err) } } diff --git a/publish/gotask.php b/publish/gotask.php index 0624412..0bb583c 100644 --- a/publish/gotask.php +++ b/publish/gotask.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index f5fac80..a45a3c6 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -12,8 +12,8 @@ namespace Reasno\GoTask; +use Reasno\GoTask\Listener\BootApplicationListener; use Reasno\GoTask\Process\GoTaskProcess; -use Spiral\Goridge\RPC; class ConfigProvider { @@ -21,13 +21,16 @@ public function __invoke(): array { return [ 'dependencies' => [ - GoTask::class => RemoteGoTask::class, + GoTask::class => GoTaskFactory::class, ], 'commands' => [ ], 'processes' => [ GoTaskProcess::class, ], + 'listener' => [ + BootApplicationListener::class, + ], 'annotations' => [ 'scan' => [ 'paths' => [ diff --git a/src/Exception/InvalidGoTaskConnectionException.php b/src/Exception/InvalidGoTaskConnectionException.php index 94e3a0e..4cebb4c 100644 --- a/src/Exception/InvalidGoTaskConnectionException.php +++ b/src/Exception/InvalidGoTaskConnectionException.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/GoTask.php b/src/GoTask.php index c2c486a..44efd5d 100644 --- a/src/GoTask.php +++ b/src/GoTask.php @@ -1,10 +1,29 @@ factory = $factory; diff --git a/src/GoTaskConnectionPool.php b/src/GoTaskConnectionPool.php index 0f08a80..552f8f7 100644 --- a/src/GoTaskConnectionPool.php +++ b/src/GoTaskConnectionPool.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/GoTaskFactory.php b/src/GoTaskFactory.php new file mode 100644 index 0000000..af8bd3b --- /dev/null +++ b/src/GoTaskFactory.php @@ -0,0 +1,28 @@ +get(ConfigInterface::class); + if ($config->get('gotask.socket_address', false)) { + return $container->get(SocketGoTask::class); + } + return $container->get(PipeGoTask::class); + } +} diff --git a/src/IPC/IPCInterface.php b/src/IPC/IPCInterface.php new file mode 100644 index 0000000..59a7880 --- /dev/null +++ b/src/IPC/IPCInterface.php @@ -0,0 +1,17 @@ +handler = new RPC( + new IPCRelay($process) + ); + } + + public function __call($name, $arguments) + { + $this->handler->{$name}(...$arguments); + } + + public function call(string $method, $payload, int $flags = 0) + { + return $this->handler->call($method, $payload, $flags); + } +} diff --git a/src/IPC/SocketIPC.php b/src/IPC/SocketIPC.php new file mode 100644 index 0000000..aae2e0e --- /dev/null +++ b/src/IPC/SocketIPC.php @@ -0,0 +1,54 @@ +handler = new RPC( + new CoroutineSocketRelay($split[0], 0, CoroutineSocketRelay::SOCK_UNIX) + ); + return; + } + [$host, $port] = $split; + $this->handler = new RPC( + new CoroutineSocketRelay($host, (int) $port, CoroutineSocketRelay::SOCK_TCP) + ); + } + + public function __call($name, $arguments) + { + $this->handler->{$name}(...$arguments); + } + + public function call(string $method, $payload, int $flags = 0) + { + return $this->handler->call($method, $payload, $flags); + } +} diff --git a/src/Listener/BootApplicationListener.php b/src/Listener/BootApplicationListener.php new file mode 100644 index 0000000..f9f8cb6 --- /dev/null +++ b/src/Listener/BootApplicationListener.php @@ -0,0 +1,39 @@ +process= $process; + $this->process = $process; } - public function call(string $method, $payload, int $flags = 0){ - if ($this->taskChannel == null){ + public function call(string $method, $payload, int $flags = 0) + { + if ($this->taskChannel == null) { $this->taskChannel = new Channel(100); - go(function(){ + go(function () { $this->start(); }); } $returnChannel = new Channel(1); $this->taskChannel->push([$method, $payload, $flags, $returnChannel]); $result = $returnChannel->pop(); - if ($result instanceof \Throwable){ + if ($result instanceof \Throwable) { throw $result; } return $result; @@ -43,19 +59,20 @@ public function call(string $method, $payload, int $flags = 0){ private function start() { - if ($this->process == null){ + if ($this->process == null) { $this->process = ProcessCollector::get('gotask')[0]; } - $task = new RPC( - new IPCRelay($this->process) - ); + $task = make(PipeIPC::class, ['process' => $this->process]); while (true) { [$method, $payload, $flag, $returnChannel] = $this->taskChannel->pop(); - try{ + self::$lock->lock(); + try { $result = $task->call($method, $payload, $flag); $returnChannel->push($result); - } catch (\Throwable $e){ + } catch (\Throwable $e) { $returnChannel->push($e); + } finally { + self::$lock->unlock(); } } } diff --git a/src/Process/GoTaskProcess.php b/src/Process/GoTaskProcess.php index 5736059..9cefbb9 100644 --- a/src/Process/GoTaskProcess.php +++ b/src/Process/GoTaskProcess.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -15,7 +15,6 @@ use Hyperf\Contract\ConfigInterface; use Hyperf\Process\AbstractProcess; use Psr\Container\ContainerInterface; -use Swoole\Atomic; class GoTaskProcess extends AbstractProcess { @@ -23,10 +22,6 @@ class GoTaskProcess extends AbstractProcess * @var string */ public $name = 'gotask'; - /** - * @var Atomic - */ - public static $taskPid; /** * @var ConfigInterface @@ -37,7 +32,7 @@ public function __construct(ContainerInterface $container) { parent::__construct($container); $this->config = $container->get(ConfigInterface::class); - self::$taskPid = new Atomic(); + $this->redirectStdinStdout = empty($this->config->get('gotask.socket_address')) ?? false; } public function isEnable(): bool @@ -53,7 +48,6 @@ public function handle(): void $executable = $this->config->get('gotask.executable', BASE_PATH . '/app'); $address = $this->config->get('gotask.socket_address', '/tmp/gotask.sock'); $args = $this->config->get('gotask.args', []); - self::$taskPid->set($this->process->pid); $argArr = ['-address', $address]; array_push($argArr, ...$args); $this->process->exec($executable, $argArr); diff --git a/src/Relay/CoroutineSocketRelay.php b/src/Relay/CoroutineSocketRelay.php index 7388a29..6598fc0 100644 --- a/src/Relay/CoroutineSocketRelay.php +++ b/src/Relay/CoroutineSocketRelay.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -97,31 +97,24 @@ public function __toString(): string return "unix://{$this->address}"; } - /** - * @return string - */ public function getAddress(): string { return $this->address; } /** - * @return int|null + * @return null|int */ public function getPort() { return $this->port; } - /** - * @return int - */ public function getType(): int { return $this->type; } - /** * Ensure socket connection. Returns true if socket successfully connected * or have already been connected. diff --git a/src/Relay/IPCRelay.php b/src/Relay/IPCRelay.php index ba1d30a..eee5a33 100644 --- a/src/Relay/IPCRelay.php +++ b/src/Relay/IPCRelay.php @@ -1,5 +1,14 @@ process->exportSocket(); } } - diff --git a/src/Relay/RelayInterface.php b/src/Relay/RelayInterface.php index 4202069..bf497df 100644 --- a/src/Relay/RelayInterface.php +++ b/src/Relay/RelayInterface.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/src/Relay/SocketTransporter.php b/src/Relay/SocketTransporter.php index 8b9f191..5939a31 100644 --- a/src/Relay/SocketTransporter.php +++ b/src/Relay/SocketTransporter.php @@ -1,9 +1,17 @@ container->get(ConfigInterface::class); $address = $config->get('gotask.socket_address', '/tmp/gotask.sock'); - $split = explode(':', $address, 2); - if (count($split) === 1) { - return new RPC( - new CoroutineSocketRelay($split[0], 0, CoroutineSocketRelay::SOCK_UNIX) - ); - } - [$host, $port] = $split; - return new RPC( - new CoroutineSocketRelay($host, (int) $port, CoroutineSocketRelay::SOCK_TCP) - ); + return make(SocketIPC::class, ['address' => $address]); } } diff --git a/tests/Cases/AbstractTestCase.php b/tests/Cases/AbstractTestCase.php index 1f9af83..88811b4 100644 --- a/tests/Cases/AbstractTestCase.php +++ b/tests/Cases/AbstractTestCase.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask diff --git a/tests/Cases/CoroutineSocketTest.php b/tests/Cases/CoroutineSocketTest.php index 497bfe0..4990d1d 100644 --- a/tests/Cases/CoroutineSocketTest.php +++ b/tests/Cases/CoroutineSocketTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask @@ -21,10 +21,10 @@ use Hyperf\Framework\Logger\StdoutLogger; use Hyperf\Utils\ApplicationContext; use Hyperf\Utils\WaitGroup; -use Reasno\GoTask\RemoteGoTask; use Reasno\GoTask\Relay\CoroutineSocketRelay; use Reasno\GoTask\Relay\RelayInterface; -use Reasno\GoTask\RPCFactory; +use Reasno\GoTask\SocketGoTask; +use Reasno\GoTask\SocketIPCFactory; use Spiral\Goridge\Exceptions\ServiceException; use Spiral\Goridge\RPC; use Swoole\Process; @@ -36,6 +36,7 @@ class CoroutineSocketTest extends AbstractTestCase { const UNIX_SOCKET = '/tmp/test.sock'; + /** * @var Process */ @@ -45,7 +46,7 @@ public function setUp() { ! defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1)); @unlink(self::UNIX_SOCKET); - $this->p = new Process(function (Process $process) { + $this->p = new Process(function (Process $process) { $process->exec(__DIR__ . '/../../app', ['-address', self::UNIX_SOCKET]); }); $this->p->start(); @@ -57,7 +58,6 @@ public function tearDown() Process::kill($this->p->pid); } - public function testOnCoroutine() { \Swoole\Coroutine\run(function () { @@ -68,8 +68,8 @@ public function testOnCoroutine() }); } - public function testConcurrently(){ - + public function testConcurrently() + { $container = new Container(new DefinitionSource([], new ScanConfig())); $container->set(ConfigInterface::class, new Config([ 'gotask' => [ @@ -83,25 +83,27 @@ public function testConcurrently(){ 'heartbeat' => -1, 'max_idle_time' => (float) env('GOTASK_MAX_IDLE_TIME', 60), ], - ] + ], ])); - $container->define(RPC::class,RPCFactory::class); - $container->define(StdoutLoggerInterface::class, - StdoutLogger::class); + $container->define(RPC::class, SocketIPCFactory::class); + $container->define( + StdoutLoggerInterface::class, + StdoutLogger::class + ); ApplicationContext::setContainer($container); \Swoole\Coroutine\run(function () { sleep(1); - $task = make(RemoteGoTask::class); + $task = make(SocketGoTask::class); $wg = new WaitGroup(); $wg->add(); $this->baseExample($task); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); $wg->add(); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); @@ -109,7 +111,8 @@ public function testConcurrently(){ }); } - public function baseExample($task){ + public function baseExample($task) + { $this->assertEquals( 'Hello, Reasno!', $task->call('App.HelloString', 'Reasno') diff --git a/tests/Cases/IPCRelayTest.php b/tests/Cases/IPCRelayTest.php index 8d90ac1..1a3b200 100644 --- a/tests/Cases/IPCRelayTest.php +++ b/tests/Cases/IPCRelayTest.php @@ -1,18 +1,30 @@ p = new Process(function (Process $process) { $process->exec(__DIR__ . '/../../app', []); }, true); @@ -48,17 +61,17 @@ public function testConcurrently() { \Swoole\Coroutine\run(function () { sleep(1); - $task = make(LocalGoTask::class, [ + $task = make(PipeGoTask::class, [ 'process' => $this->p, ]); $wg = new WaitGroup(); $wg->add(); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); $wg->add(); - go(function() use ($wg, $task) { + go(function () use ($wg, $task) { $this->baseExample($task); $wg->done(); }); @@ -66,7 +79,8 @@ public function testConcurrently() }); } - public function baseExample($task){ + public function baseExample($task) + { $this->assertEquals( 'Hello, Reasno!', $task->call('App.HelloString', 'Reasno') diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 01edac3..26133e8 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -2,7 +2,7 @@ declare(strict_types=1); /** - * This file is part of Reasno/RemoteGoTask. + * This file is part of Reasno/GoTask. * * @link https://www.github.com/reasno/gotask * @document https://www.github.com/reasno/gotask