diff --git a/composer.json b/composer.json index d0deb92..44f5c74 100644 --- a/composer.json +++ b/composer.json @@ -29,7 +29,7 @@ "hyperf/config": "^1.1", "hyperf/di": "^1.1", "hyperf/testing": "1.1.*", - "phpstan/phpstan": "^0.10.5", + "phpstan/phpstan": "^0.12", "swoft/swoole-ide-helper": "dev-master" }, "config": { diff --git a/example/Server.go b/example/Server.go index 41f063b..123d152 100644 --- a/example/Server.go +++ b/example/Server.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/reasno/gotask/pkg/gotask" "io/ioutil" + "log" ) // App sample @@ -49,5 +50,7 @@ func (a *App) HelloError(name interface{}, r *interface{}) error { func main() { gotask.Register(new(App)) - gotask.Run() + if err := gotask.Run(); err != nil { + log.Fatal(err) + } } diff --git a/pkg/gotask/server.go b/pkg/gotask/server.go index 68053c7..39e0869 100644 --- a/pkg/gotask/server.go +++ b/pkg/gotask/server.go @@ -56,12 +56,12 @@ func GetAddress() string { func Run() error { var g run.Group - { + if *address == "" { relay := goridge.NewPipeRelay(os.Stdin, os.Stdout) codec := goridge.NewCodecWithRelay(relay) g.Add(func() error { rpc.ServeCodec(codec) - return nil + return fmt.Errorf("pipe is closed") }, func(err error) { _ = os.Stdin.Close() _ = os.Stdout.Close() diff --git a/publish/gotask.php b/publish/gotask.php index 8ff84cd..0624412 100644 --- a/publish/gotask.php +++ b/publish/gotask.php @@ -20,9 +20,9 @@ }), 'pool' => [ 'min_connections' => 1, - 'max_connections' => 100, + 'max_connections' => 30, 'connect_timeout' => 10.0, - 'wait_timeout' => 3.0, + 'wait_timeout' => 30.0, 'heartbeat' => -1, 'max_idle_time' => (float) env('GOTASK_MAX_IDLE_TIME', 60), ], diff --git a/src/LocalGoTask.php b/src/LocalGoTask.php index 5fb0fab..9b0530a 100644 --- a/src/LocalGoTask.php +++ b/src/LocalGoTask.php @@ -34,7 +34,11 @@ public function call(string $method, $payload, int $flags = 0){ } $returnChannel = new Channel(1); $this->taskChannel->push([$method, $payload, $flags, $returnChannel]); - return $returnChannel->pop(); + $result = $returnChannel->pop(); + if ($result instanceof \Throwable){ + throw $result; + } + return $result; } private function start() @@ -47,8 +51,12 @@ private function start() ); while (true) { [$method, $payload, $flag, $returnChannel] = $this->taskChannel->pop(); - $result = $task->call($method, $payload, $flag); - $returnChannel->push($result); + try{ + $result = $task->call($method, $payload, $flag); + $returnChannel->push($result); + } catch (\Throwable $e){ + $returnChannel->push($e); + } } } } diff --git a/src/Relay/CoroutineSocketRelay.php b/src/Relay/CoroutineSocketRelay.php index b4dd993..7388a29 100644 --- a/src/Relay/CoroutineSocketRelay.php +++ b/src/Relay/CoroutineSocketRelay.php @@ -14,6 +14,7 @@ use Spiral\Goridge\Exceptions\GoridgeException; use Spiral\Goridge\Exceptions\InvalidArgumentException; +use Spiral\Goridge\Exceptions\RelayException; use Swoole\Coroutine\Socket; /** @@ -87,6 +88,15 @@ public function __construct(string $address, int $port = null, int $type = self: $this->type = $type; } + public function __toString(): string + { + if ($this->type == self::SOCK_TCP) { + return "tcp://{$this->address}:{$this->port}"; + } + + return "unix://{$this->address}"; + } + /** * @return string */ @@ -111,6 +121,33 @@ public function getType(): int return $this->type; } + + /** + * Ensure socket connection. Returns true if socket successfully connected + * or have already been connected. + * + * @throws RelayException + * @throws \Error when sockets are used in unsupported environment + */ + public function connect(): bool + { + if ($this->isConnected()) { + return true; + } + + $this->socket = $this->createSocket(); + try { + // Port type needs to be int, so we convert null to 0 + if ($this->socket->connect($this->address, $this->port ?? 0) === false) { + throw new RelayException(sprintf('%s (%s)', $this->socket->errMsg, $this->socket->errCode)); + } + } catch (\Exception $e) { + throw new RelayException("unable to establish connection {$this}", 0, $e); + } + + return true; + } + /** * @throws GoridgeException * @return Socket diff --git a/src/Relay/SocketTransporter.php b/src/Relay/SocketTransporter.php index c793366..8b9f191 100644 --- a/src/Relay/SocketTransporter.php +++ b/src/Relay/SocketTransporter.php @@ -20,15 +20,6 @@ public function __destruct() } } - public function __toString(): string - { - if ($this->type == self::SOCK_TCP) { - return "tcp://{$this->address}:{$this->port}"; - } - - return "unix://{$this->address}"; - } - /** * {@inheritdoc} */ @@ -82,32 +73,6 @@ public function isConnected(): bool return $this->socket != null; } - /** - * Ensure socket connection. Returns true if socket successfully connected - * or have already been connected. - * - * @throws RelayException - * @throws \Error when sockets are used in unsupported environment - */ - public function connect(): bool - { - if ($this->isConnected()) { - return true; - } - - $this->socket = $this->createSocket(); - try { - // Port type needs to be int, so we convert null to 0 - if ($this->socket->connect($this->address, $this->port ?? 0) === false) { - throw new RelayException(sprintf('%s (%s)', $this->socket->errMsg, $this->socket->errCode)); - } - } catch (\Exception $e) { - throw new RelayException("unable to establish connection {$this}", 0, $e); - } - - return true; - } - /** * Close connection. * diff --git a/tests/Cases/CoroutineSocketTest.php b/tests/Cases/CoroutineSocketTest.php index c336b66..497bfe0 100644 --- a/tests/Cases/CoroutineSocketTest.php +++ b/tests/Cases/CoroutineSocketTest.php @@ -36,24 +36,28 @@ class CoroutineSocketTest extends AbstractTestCase { const UNIX_SOCKET = '/tmp/test.sock'; - /** - * @var RPC + * @var Process */ - private $task; - + private $p; public function setUp() { ! defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1)); @unlink(self::UNIX_SOCKET); - $p = new Process(function (Process $process) { + $this->p = new Process(function (Process $process) { $process->exec(__DIR__ . '/../../app', ['-address', self::UNIX_SOCKET]); }); - $p->start(); + $this->p->start(); sleep(1); } + public function tearDown() + { + Process::kill($this->p->pid); + } + + public function testOnCoroutine() { \Swoole\Coroutine\run(function () { diff --git a/tests/Cases/IPCRelayTest.php b/tests/Cases/IPCRelayTest.php index dcf623b..8d90ac1 100644 --- a/tests/Cases/IPCRelayTest.php +++ b/tests/Cases/IPCRelayTest.php @@ -29,6 +29,11 @@ public function setUp() sleep(1); } + public function tearDown() + { + Process::kill($this->p->pid); + } + public function testOnCoroutine() { \Swoole\Coroutine\run(function () {