From 90761f579eb2ffa709167e8d08ca25132153b52f Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Wed, 6 Mar 2024 20:08:52 +0545 Subject: [PATCH 1/4] support threads using isolates --- example/example_client.dart | 8 ++++++++ example/example_server.dart | 2 +- lib/src/server.dart | 27 ++++++++++++++++++++++----- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/example/example_client.dart b/example/example_client.dart index 685a820..fe1e1da 100644 --- a/example/example_client.dart +++ b/example/example_client.dart @@ -6,6 +6,14 @@ import 'package:utopia_queue/utopia_queue.dart'; void main() async { final connection = ConnectionRedis('localhost', 6379); final client = Client(connection, queue: 'myqueue'); + await client + .enqueue({'user': Random().nextInt(20), 'name': 'Damodar Lohani'}); + await client + .enqueue({'user': Random().nextInt(20), 'name': 'Damodar Lohani'}); + await client + .enqueue({'user': Random().nextInt(20), 'name': 'Damodar Lohani'}); + await client + .enqueue({'user': Random().nextInt(20), 'name': 'Damodar Lohani'}); await client .enqueue({'user': Random().nextInt(20), 'name': 'Damodar Lohani'}); print('enqueued'); diff --git a/example/example_server.dart b/example/example_server.dart index 7126fb6..a8adc96 100644 --- a/example/example_server.dart +++ b/example/example_server.dart @@ -7,5 +7,5 @@ void main(List arguments) async { server.job().inject('message').action((Message message) { print(message.toMap()); }); - server.start(); + server.start(threads: 2); } diff --git a/lib/src/server.dart b/lib/src/server.dart index 96258ae..58ec55e 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -1,3 +1,5 @@ +import 'dart:io'; +import 'dart:isolate' as iso; import 'package:utopia_di/utopia_di.dart'; import 'package:utopia_queue/src/message.dart'; @@ -12,6 +14,7 @@ class Server { final List _errors = []; final List _init = []; final List _shutdown = []; + static Map threads = {}; Job _job = Job(); @@ -54,7 +57,9 @@ class Server { return hook; } - Future start() async { + Future _onIsolateMain((Connection, int) args) async { + final (connection, id) = args; + print('Server $id waiting for queue'); while (true) { var nextMessage = await connection.rightPopArray('$namespace.queue.$queue', 5); @@ -65,7 +70,7 @@ class Server { final message = Message.fromMap(nextMessage); setResource('message', () => message); - print('Job received ${message.pid}'); + print('$id: Job received ${message.pid}'); try { final groups = _job.getGroups(); @@ -88,11 +93,11 @@ class Server { globalHook: true, ); } - print('job ${message.pid} successfully run'); + print('$id: Job ${message.pid} successfully run'); } catch (e) { await connection.leftPush('$namespace.failed.$queue', message.pid); - print('Error: Job ${message.pid} failed to run'); - print('Error: ${e.toString()}'); + print('$id: Error: Job ${message.pid} failed to run'); + print('$id: Error: ${e.toString()}'); setResource('error', () => e); _executeHooks( _errors, @@ -103,6 +108,18 @@ class Server { } } + Future _spawnOffIsolates(int num) async { + for (var i = 0; i < num; i++) { + await iso.Isolate.spawn<(Connection, int)>( + _onIsolateMain, (connection, i)); + } + } + + Future start({int threads = 1}) async { + await _spawnOffIsolates(threads); + stdin.readByteSync(); + } + Map _getArguments( Hook hook, Map payload, From 265732c66528c385cba9ed21dad8289bfd806ee6 Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Thu, 7 Mar 2024 20:43:46 +0545 Subject: [PATCH 2/4] fix exports --- lib/utopia_queue.dart | 1 + pubspec.lock | 6 +++--- pubspec.yaml | 7 +++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/utopia_queue.dart b/lib/utopia_queue.dart index ac0d744..ad2a87b 100644 --- a/lib/utopia_queue.dart +++ b/lib/utopia_queue.dart @@ -3,3 +3,4 @@ export 'src/server.dart'; export 'src/connection.dart'; export 'src/connection/redis.dart'; export 'src/message.dart'; +export 'src/job.dart'; diff --git a/pubspec.lock b/pubspec.lock index 74d7658..007cff0 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -18,7 +18,7 @@ packages: source: hosted version: "6.4.1" args: - dependency: "direct main" + dependency: transitive description: name: args sha256: eef6c46b622e0494a36c5a12d10d77fb4e855501a91c1b9ef9339326e58f0596 @@ -389,10 +389,10 @@ packages: dependency: transitive description: name: web - sha256: "1d9158c616048c38f712a6646e317a3426da10e884447626167240d45209cbad" + sha256: "97da13628db363c635202ad97068d47c5b8aa555808e7a9411963c533b449b27" url: "https://pub.dev" source: hosted - version: "0.5.0" + version: "0.5.1" web_socket_channel: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index b2fedc3..054937c 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: utopia_queue -description: Light and fast queue library for Dart server projects -version: 0.0.1 +description: Light and easy to use queue library for Dart server projects +version: 0.1.0 repository: https://github.com/utopia-dart/utopia_queue environment: @@ -8,11 +8,10 @@ environment: # Add regular dependencies here. dependencies: - args: ^2.4.2 redis: ^4.0.0 utopia_di: ^0.0.2 uuid: ^4.3.3 dev_dependencies: lints: ^3.0.0 - test: ^1.24.0 + test: ^1.25.2 From 1b8547e77658f1ed79236aa742457e2f7ec273f0 Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Thu, 7 Mar 2024 20:44:18 +0545 Subject: [PATCH 3/4] add license and changelog --- CHANGELOG.md | 2 +- LICENSE | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 LICENSE diff --git a/CHANGELOG.md b/CHANGELOG.md index effe43c..a0712a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,3 @@ -## 1.0.0 +## 0.1.0 - Initial version. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..cbc4798 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 utopia-dart + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. From f7133b2a9aa3590f73475fc65e10c557f8dd8d59 Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Thu, 7 Mar 2024 20:45:23 +0545 Subject: [PATCH 4/4] analyze and publish workflow --- .github/workflows/analyze.yml | 22 ++++++++++++++++++++++ .github/workflows/publish.yml | 26 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 .github/workflows/analyze.yml create mode 100644 .github/workflows/publish.yml diff --git a/.github/workflows/analyze.yml b/.github/workflows/analyze.yml new file mode 100644 index 0000000..8f350bb --- /dev/null +++ b/.github/workflows/analyze.yml @@ -0,0 +1,22 @@ +name: Analyze + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + +jobs: + analyze: + runs-on: ubuntu-latest + container: + image: dart:latest + steps: + - uses: actions/checkout@v1 + - name: Install dependencies + run: dart pub get + - name: analyze + run: dart analyze \ No newline at end of file diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..0758fbd --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,26 @@ +name: Publish to pub.dev +on: + release: + types: [published] + +jobs: + build: + runs-on: ubuntu-latest + container: + image: google/dart:latest + steps: + - uses: actions/checkout@v1 + - name: Setup credentials + run: | + mkdir -p ~/.config/dart + cat < ~/.config/dart/pub-credentials.json + { + "accessToken":"${{ secrets.OAUTH_ACCESS_TOKEN }}", + "refreshToken":"${{ secrets.OAUTH_REFRESH_TOKEN }}", + "tokenEndpoint":"https://accounts.google.com/o/oauth2/token", + "scopes": [ "openid", "https://www.googleapis.com/auth/userinfo.email" ], + "expiration": 1584628470088 + } + EOF + - name: Publish package + run: dart pub publish -f \ No newline at end of file