From de5aac473f80ed1720e1a730b01b3c011bf77ff4 Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Mon, 11 Mar 2024 18:24:40 +0545 Subject: [PATCH 1/2] improve doc comments --- CHANGELOG.md | 5 +++++ lib/src/client.dart | 8 +++++++- lib/src/connection.dart | 8 ++++---- lib/src/connection/redis.dart | 21 ++++++++++++++++----- lib/src/job.dart | 1 + lib/src/message.dart | 1 + lib/src/server.dart | 27 +++++++++++++++++++++++---- pubspec.lock | 4 ++-- pubspec.yaml | 5 ++--- 9 files changed, 61 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0712a7..101d6ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.2.0 + +- Refactor namings +- Support threads + ## 0.1.0 - Initial version. diff --git a/lib/src/client.dart b/lib/src/client.dart index 93ef2f1..b94effe 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -3,6 +3,9 @@ import 'package:uuid/uuid.dart'; import 'connection.dart'; import 'message.dart'; +/// Utopia Queue client +/// +/// Client to connect and add items to the queue class Client { final String queue; final String namespace; @@ -14,6 +17,7 @@ class Client { this.namespace = 'utopia-queue', }); + /// Add item to the queue Future enqueue(Map payload) async { final data = { 'pid': Uuid().v1(), @@ -21,10 +25,11 @@ class Client { 'timestamp': DateTime.now().millisecondsSinceEpoch, 'payload': payload }; - await connection.leftPushArray('$namespace.queue.$queue', data); + await connection.leftPushJson('$namespace.queue.$queue', data); return true; } + /// Retry failed jobs in the queue void retry([int? limit]) async { int processed = 0; final start = DateTime.now().millisecondsSinceEpoch; @@ -53,6 +58,7 @@ class Client { } } + /// Get a job Future getJob(String pid) async { final value = await connection.get('$namespace.jobs.$queue.$pid'); diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 741a3a6..fbd1697 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -1,9 +1,9 @@ abstract class Connection { - Future rightPushArray(String queue, Map payload); - Future?> rightPopArray(String queue, int timeout); - Future leftPushArray(String queue, Map payload); + Future rightPushJson(String queue, Map payload); + Future?> rightPopJson(String queue, int timeout); + Future leftPushJson(String queue, Map payload); Future leftPush(String queue, String value); - Future?> leftPopArray(String queue, int timeout); + Future?> leftPopJson(String queue, int timeout); Future leftPop(String queue, int timeout); Future rightPop(String queue, int timeout); Future get(String key); diff --git a/lib/src/connection/redis.dart b/lib/src/connection/redis.dart index 129a5bc..2c53863 100644 --- a/lib/src/connection/redis.dart +++ b/lib/src/connection/redis.dart @@ -4,6 +4,10 @@ import 'package:redis/redis.dart'; import '../connection.dart'; +/// Connection redis +/// +/// Used to connect to redis server +/// and manage queue class ConnectionRedis extends Connection { final String host; final int port; @@ -26,8 +30,9 @@ class ConnectionRedis extends Connection { return redis!; } + /// Left pop json item from the queue @override - Future?> leftPopArray(String queue, int timeout) async { + Future?> leftPopJson(String queue, int timeout) async { final res = await (await _getRedis()).send_object(['BLPOP', queue, timeout]); if (res == null) { @@ -39,8 +44,9 @@ class ConnectionRedis extends Connection { return jsonDecode(res[1]); } + /// Left push json item to the queue @override - Future leftPushArray(String queue, Map payload) async { + Future leftPushJson(String queue, Map payload) async { try { await (await _getRedis()) .send_object(['LPUSH', queue, jsonEncode(payload)]); @@ -50,6 +56,7 @@ class ConnectionRedis extends Connection { } } + /// Left push item to the queue @override Future leftPush(String queue, String value) async { try { @@ -60,8 +67,9 @@ class ConnectionRedis extends Connection { } } + /// Right pop json item from the queue @override - Future?> rightPopArray(String queue, int timeout) async { + Future?> rightPopJson(String queue, int timeout) async { final res = await (await _getRedis()).send_object(['BRPOP', queue, timeout]); @@ -74,9 +82,9 @@ class ConnectionRedis extends Connection { return jsonDecode(res[1]); } + /// Right push json item to the queue @override - Future rightPushArray( - String queue, Map payload) async { + Future rightPushJson(String queue, Map payload) async { try { await (await _getRedis()) .send_object(['RPUSH', queue, jsonEncode(payload)]); @@ -86,6 +94,7 @@ class ConnectionRedis extends Connection { } } + /// Left pop item from the queue @override Future leftPop(String queue, int timeout) async { final res = @@ -96,6 +105,7 @@ class ConnectionRedis extends Connection { return res[1]; } + /// Right pop item from the queue @override Future rightPop(String queue, int timeout) async { final res = @@ -106,6 +116,7 @@ class ConnectionRedis extends Connection { return res[1]; } + /// Get a key from redis @override Future get(String key) async { return (await _getRedis()).send_object(['GET', key]); diff --git a/lib/src/job.dart b/lib/src/job.dart index ee5a268..c691b5c 100644 --- a/lib/src/job.dart +++ b/lib/src/job.dart @@ -1,5 +1,6 @@ import 'package:utopia_di/utopia_di.dart'; +/// Queue Job class Job extends Hook { bool hook = true; } diff --git a/lib/src/message.dart b/lib/src/message.dart index 399564d..ecdc72f 100644 --- a/lib/src/message.dart +++ b/lib/src/message.dart @@ -1,5 +1,6 @@ import 'dart:convert'; +/// Queue message data model class Message { final String pid; final String queue; diff --git a/lib/src/server.dart b/lib/src/server.dart index 58ec55e..5904f82 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -6,6 +6,10 @@ import 'package:utopia_queue/src/message.dart'; import 'connection.dart'; import 'job.dart'; +/// Queue server +/// +/// Runs and listens to the queue and processes +/// jobs as theyare received class Server { final Connection connection; final String queue; @@ -24,6 +28,7 @@ class Server { this.namespace = 'utopia-queue', }); + /// Set resource void setResource( String name, Function callback, { @@ -31,26 +36,39 @@ class Server { }) => di.set(name, callback, injections: injections); - dynamic getResource(String name, {bool fresh = false}) => - di.get(name, fresh: fresh); + /// Get resource + dynamic getResource(String name, {bool fresh = false}) => + di.get(name, fresh: fresh); + /// Set job handler Job job() { _job = Job(); return _job; } + /// Setup init hooks + /// + /// Init hooks are executed before the job + /// is executed Hook init() { final hook = Hook()..groups(['*']); _init.add(hook); return hook; } + /// Setup shutdown hooks + /// + /// Shutdown hooks are executed after the job + /// is executed Hook shutdown() { final hook = Hook()..groups(['*']); _shutdown.add(hook); return hook; } + /// Error hooks + /// + /// Error hooks are executed for each error Hook error() { final hook = Hook()..groups(['*']); _errors.add(hook); @@ -62,7 +80,7 @@ class Server { print('Server $id waiting for queue'); while (true) { var nextMessage = - await connection.rightPopArray('$namespace.queue.$queue', 5); + await connection.rightPopJson('$namespace.queue.$queue', 5); if (nextMessage == null) { continue; @@ -115,9 +133,10 @@ class Server { } } + /// Start queue server Future start({int threads = 1}) async { + iso.ReceivePort(); await _spawnOffIsolates(threads); - stdin.readByteSync(); } Map _getArguments( diff --git a/pubspec.lock b/pubspec.lock index 007cff0..2fe5595 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -357,10 +357,10 @@ packages: dependency: "direct main" description: name: utopia_di - sha256: fbd0a0324412b806671385d3204eb1692d37f50d1a3bc28378025ff4ba3f58c4 + sha256: f4a618e0278346e08b6cec8c88ae0135e279b6f4aaeed97739302d014c4024df url: "https://pub.dev" source: hosted - version: "0.0.2" + version: "0.2.0" uuid: dependency: "direct main" description: diff --git a/pubspec.yaml b/pubspec.yaml index 054937c..531b801 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,15 +1,14 @@ name: utopia_queue description: Light and easy to use queue library for Dart server projects -version: 0.1.0 +version: 0.2.0 repository: https://github.com/utopia-dart/utopia_queue environment: sdk: ^3.3.0 -# Add regular dependencies here. dependencies: redis: ^4.0.0 - utopia_di: ^0.0.2 + utopia_di: ^0.2.0 uuid: ^4.3.3 dev_dependencies: From 49722f27d1d02df81c064ffe00c4796cb5a31a4c Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Mon, 11 Mar 2024 18:27:45 +0545 Subject: [PATCH 2/2] remove unused import --- lib/src/server.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/src/server.dart b/lib/src/server.dart index 5904f82..a7af05c 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -1,4 +1,3 @@ -import 'dart:io'; import 'dart:isolate' as iso; import 'package:utopia_di/utopia_di.dart'; import 'package:utopia_queue/src/message.dart';