Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.2.0

- Refactor namings
- Support threads

## 0.1.0

- Initial version.
8 changes: 7 additions & 1 deletion lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import 'package:uuid/uuid.dart';
import 'connection.dart';
import 'message.dart';

/// Utopia Queue client
///
/// Client to connect and add items to the queue
class Client {
final String queue;
final String namespace;
Expand All @@ -14,17 +17,19 @@ class Client {
this.namespace = 'utopia-queue',
});

/// Add item to the queue
Future<bool> enqueue(Map<String, dynamic> payload) async {
final data = {
'pid': Uuid().v1(),
'queue': queue,
'timestamp': DateTime.now().millisecondsSinceEpoch,
'payload': payload
};
await connection.leftPushArray('$namespace.queue.$queue', data);
await connection.leftPushJson('$namespace.queue.$queue', data);
return true;
}

/// Retry failed jobs in the queue
void retry([int? limit]) async {
int processed = 0;
final start = DateTime.now().millisecondsSinceEpoch;
Expand Down Expand Up @@ -53,6 +58,7 @@ class Client {
}
}

/// Get a job
Future<Message?> getJob(String pid) async {
final value = await connection.get('$namespace.jobs.$queue.$pid');

Expand Down
8 changes: 4 additions & 4 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
abstract class Connection {
Future<bool> rightPushArray(String queue, Map<String, dynamic> payload);
Future<Map<String, dynamic>?> rightPopArray(String queue, int timeout);
Future<bool> leftPushArray(String queue, Map<String, dynamic> payload);
Future<bool> rightPushJson(String queue, Map<String, dynamic> payload);
Future<Map<String, dynamic>?> rightPopJson(String queue, int timeout);
Future<bool> leftPushJson(String queue, Map<String, dynamic> payload);
Future<bool> leftPush(String queue, String value);
Future<Map<String, dynamic>?> leftPopArray(String queue, int timeout);
Future<Map<String, dynamic>?> leftPopJson(String queue, int timeout);
Future<dynamic> leftPop(String queue, int timeout);
Future<dynamic> rightPop(String queue, int timeout);
Future get(String key);
Expand Down
21 changes: 16 additions & 5 deletions lib/src/connection/redis.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import 'package:redis/redis.dart';

import '../connection.dart';

/// Connection redis
///
/// Used to connect to redis server
/// and manage queue
class ConnectionRedis extends Connection {
final String host;
final int port;
Expand All @@ -26,8 +30,9 @@ class ConnectionRedis extends Connection {
return redis!;
}

/// Left pop json item from the queue
@override
Future<Map<String, dynamic>?> leftPopArray(String queue, int timeout) async {
Future<Map<String, dynamic>?> leftPopJson(String queue, int timeout) async {
final res =
await (await _getRedis()).send_object(['BLPOP', queue, timeout]);
if (res == null) {
Expand All @@ -39,8 +44,9 @@ class ConnectionRedis extends Connection {
return jsonDecode(res[1]);
}

/// Left push json item to the queue
@override
Future<bool> leftPushArray(String queue, Map<String, dynamic> payload) async {
Future<bool> leftPushJson(String queue, Map<String, dynamic> payload) async {
try {
await (await _getRedis())
.send_object(['LPUSH', queue, jsonEncode(payload)]);
Expand All @@ -50,6 +56,7 @@ class ConnectionRedis extends Connection {
}
}

/// Left push item to the queue
@override
Future<bool> leftPush(String queue, String value) async {
try {
Expand All @@ -60,8 +67,9 @@ class ConnectionRedis extends Connection {
}
}

/// Right pop json item from the queue
@override
Future<Map<String, dynamic>?> rightPopArray(String queue, int timeout) async {
Future<Map<String, dynamic>?> rightPopJson(String queue, int timeout) async {
final res =
await (await _getRedis()).send_object(['BRPOP', queue, timeout]);

Expand All @@ -74,9 +82,9 @@ class ConnectionRedis extends Connection {
return jsonDecode(res[1]);
}

/// Right push json item to the queue
@override
Future<bool> rightPushArray(
String queue, Map<String, dynamic> payload) async {
Future<bool> rightPushJson(String queue, Map<String, dynamic> payload) async {
try {
await (await _getRedis())
.send_object(['RPUSH', queue, jsonEncode(payload)]);
Expand All @@ -86,6 +94,7 @@ class ConnectionRedis extends Connection {
}
}

/// Left pop item from the queue
@override
Future leftPop(String queue, int timeout) async {
final res =
Expand All @@ -96,6 +105,7 @@ class ConnectionRedis extends Connection {
return res[1];
}

/// Right pop item from the queue
@override
Future rightPop(String queue, int timeout) async {
final res =
Expand All @@ -106,6 +116,7 @@ class ConnectionRedis extends Connection {
return res[1];
}

/// Get a key from redis
@override
Future get(String key) async {
return (await _getRedis()).send_object(['GET', key]);
Expand Down
1 change: 1 addition & 0 deletions lib/src/job.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'package:utopia_di/utopia_di.dart';

/// Queue Job
class Job extends Hook {
bool hook = true;
}
1 change: 1 addition & 0 deletions lib/src/message.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:convert';

/// Queue message data model
class Message {
final String pid;
final String queue;
Expand Down
28 changes: 23 additions & 5 deletions lib/src/server.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import 'dart:io';
import 'dart:isolate' as iso;
import 'package:utopia_di/utopia_di.dart';
import 'package:utopia_queue/src/message.dart';

import 'connection.dart';
import 'job.dart';

/// Queue server
///
/// Runs and listens to the queue and processes
/// jobs as theyare received
class Server {
final Connection connection;
final String queue;
Expand All @@ -24,33 +27,47 @@ class Server {
this.namespace = 'utopia-queue',
});

/// Set resource
void setResource(
String name,
Function callback, {
List<String> injections = const [],
}) =>
di.set(name, callback, injections: injections);

dynamic getResource(String name, {bool fresh = false}) =>
di.get(name, fresh: fresh);
/// Get resource
dynamic getResource<T>(String name, {bool fresh = false}) =>
di.get<T>(name, fresh: fresh);

/// Set job handler
Job job() {
_job = Job();
return _job;
}

/// Setup init hooks
///
/// Init hooks are executed before the job
/// is executed
Hook init() {
final hook = Hook()..groups(['*']);
_init.add(hook);
return hook;
}

/// Setup shutdown hooks
///
/// Shutdown hooks are executed after the job
/// is executed
Hook shutdown() {
final hook = Hook()..groups(['*']);
_shutdown.add(hook);
return hook;
}

/// Error hooks
///
/// Error hooks are executed for each error
Hook error() {
final hook = Hook()..groups(['*']);
_errors.add(hook);
Expand All @@ -62,7 +79,7 @@ class Server {
print('Server $id waiting for queue');
while (true) {
var nextMessage =
await connection.rightPopArray('$namespace.queue.$queue', 5);
await connection.rightPopJson('$namespace.queue.$queue', 5);

if (nextMessage == null) {
continue;
Expand Down Expand Up @@ -115,9 +132,10 @@ class Server {
}
}

/// Start queue server
Future<void> start({int threads = 1}) async {
iso.ReceivePort();
await _spawnOffIsolates(threads);
stdin.readByteSync();
}

Map<String, dynamic> _getArguments(
Expand Down
4 changes: 2 additions & 2 deletions pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,10 @@ packages:
dependency: "direct main"
description:
name: utopia_di
sha256: fbd0a0324412b806671385d3204eb1692d37f50d1a3bc28378025ff4ba3f58c4
sha256: f4a618e0278346e08b6cec8c88ae0135e279b6f4aaeed97739302d014c4024df
url: "https://pub.dev"
source: hosted
version: "0.0.2"
version: "0.2.0"
uuid:
dependency: "direct main"
description:
Expand Down
5 changes: 2 additions & 3 deletions pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
name: utopia_queue
description: Light and easy to use queue library for Dart server projects
version: 0.1.0
version: 0.2.0
repository: https://github.com/utopia-dart/utopia_queue

environment:
sdk: ^3.3.0

# Add regular dependencies here.
dependencies:
redis: ^4.0.0
utopia_di: ^0.0.2
utopia_di: ^0.2.0
uuid: ^4.3.3

dev_dependencies:
Expand Down