Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"hyperf/config": "^1.1",
"hyperf/di": "^1.1",
"hyperf/testing": "1.1.*",
"phpstan/phpstan": "^0.10.5",
"phpstan/phpstan": "^0.12",
"swoft/swoole-ide-helper": "dev-master"
},
"config": {
Expand Down
5 changes: 4 additions & 1 deletion example/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/reasno/gotask/pkg/gotask"
"io/ioutil"
"log"
)

// App sample
Expand Down Expand Up @@ -49,5 +50,7 @@ func (a *App) HelloError(name interface{}, r *interface{}) error {

func main() {
gotask.Register(new(App))
gotask.Run()
if err := gotask.Run(); err != nil {
log.Fatal(err)
}
}
4 changes: 2 additions & 2 deletions pkg/gotask/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func GetAddress() string {

func Run() error {
var g run.Group
{
if *address == "" {
relay := goridge.NewPipeRelay(os.Stdin, os.Stdout)
codec := goridge.NewCodecWithRelay(relay)
g.Add(func() error {
rpc.ServeCodec(codec)
return nil
return fmt.Errorf("pipe is closed")
}, func(err error) {
_ = os.Stdin.Close()
_ = os.Stdout.Close()
Expand Down
4 changes: 2 additions & 2 deletions publish/gotask.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
}),
'pool' => [
'min_connections' => 1,
'max_connections' => 100,
'max_connections' => 30,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'wait_timeout' => 30.0,
'heartbeat' => -1,
'max_idle_time' => (float) env('GOTASK_MAX_IDLE_TIME', 60),
],
Expand Down
14 changes: 11 additions & 3 deletions src/LocalGoTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public function call(string $method, $payload, int $flags = 0){
}
$returnChannel = new Channel(1);
$this->taskChannel->push([$method, $payload, $flags, $returnChannel]);
return $returnChannel->pop();
$result = $returnChannel->pop();
if ($result instanceof \Throwable){
throw $result;
}
return $result;
}

private function start()
Expand All @@ -47,8 +51,12 @@ private function start()
);
while (true) {
[$method, $payload, $flag, $returnChannel] = $this->taskChannel->pop();
$result = $task->call($method, $payload, $flag);
$returnChannel->push($result);
try{
$result = $task->call($method, $payload, $flag);
$returnChannel->push($result);
} catch (\Throwable $e){
$returnChannel->push($e);
}
}
}
}
37 changes: 37 additions & 0 deletions src/Relay/CoroutineSocketRelay.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use Spiral\Goridge\Exceptions\GoridgeException;
use Spiral\Goridge\Exceptions\InvalidArgumentException;
use Spiral\Goridge\Exceptions\RelayException;
use Swoole\Coroutine\Socket;

/**
Expand Down Expand Up @@ -87,6 +88,15 @@ public function __construct(string $address, int $port = null, int $type = self:
$this->type = $type;
}

public function __toString(): string
{
if ($this->type == self::SOCK_TCP) {
return "tcp://{$this->address}:{$this->port}";
}

return "unix://{$this->address}";
}

/**
* @return string
*/
Expand All @@ -111,6 +121,33 @@ public function getType(): int
return $this->type;
}


/**
* Ensure socket connection. Returns true if socket successfully connected
* or have already been connected.
*
* @throws RelayException
* @throws \Error when sockets are used in unsupported environment
*/
public function connect(): bool
{
if ($this->isConnected()) {
return true;
}

$this->socket = $this->createSocket();
try {
// Port type needs to be int, so we convert null to 0
if ($this->socket->connect($this->address, $this->port ?? 0) === false) {
throw new RelayException(sprintf('%s (%s)', $this->socket->errMsg, $this->socket->errCode));
}
} catch (\Exception $e) {
throw new RelayException("unable to establish connection {$this}", 0, $e);
}

return true;
}

/**
* @throws GoridgeException
* @return Socket
Expand Down
35 changes: 0 additions & 35 deletions src/Relay/SocketTransporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ public function __destruct()
}
}

public function __toString(): string
{
if ($this->type == self::SOCK_TCP) {
return "tcp://{$this->address}:{$this->port}";
}

return "unix://{$this->address}";
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -82,32 +73,6 @@ public function isConnected(): bool
return $this->socket != null;
}

/**
* Ensure socket connection. Returns true if socket successfully connected
* or have already been connected.
*
* @throws RelayException
* @throws \Error when sockets are used in unsupported environment
*/
public function connect(): bool
{
if ($this->isConnected()) {
return true;
}

$this->socket = $this->createSocket();
try {
// Port type needs to be int, so we convert null to 0
if ($this->socket->connect($this->address, $this->port ?? 0) === false) {
throw new RelayException(sprintf('%s (%s)', $this->socket->errMsg, $this->socket->errCode));
}
} catch (\Exception $e) {
throw new RelayException("unable to establish connection {$this}", 0, $e);
}

return true;
}

/**
* Close connection.
*
Expand Down
16 changes: 10 additions & 6 deletions tests/Cases/CoroutineSocketTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,28 @@
class CoroutineSocketTest extends AbstractTestCase
{
const UNIX_SOCKET = '/tmp/test.sock';

/**
* @var RPC
* @var Process
*/
private $task;

private $p;

public function setUp()
{
! defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1));
@unlink(self::UNIX_SOCKET);
$p = new Process(function (Process $process) {
$this->p = new Process(function (Process $process) {
$process->exec(__DIR__ . '/../../app', ['-address', self::UNIX_SOCKET]);
});
$p->start();
$this->p->start();
sleep(1);
}

public function tearDown()
{
Process::kill($this->p->pid);
}


public function testOnCoroutine()
{
\Swoole\Coroutine\run(function () {
Expand Down
5 changes: 5 additions & 0 deletions tests/Cases/IPCRelayTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public function setUp()
sleep(1);
}

public function tearDown()
{
Process::kill($this->p->pid);
}

public function testOnCoroutine()
{
\Swoole\Coroutine\run(function () {
Expand Down