Skip to content
22 changes: 13 additions & 9 deletions example/advanced/multi_isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ void main() async {

// Spawn all isolates concurrently and wait for them to start.
log('Starting $isolateCount isolates');
final isolates = await Future.wait(
List.generate(
isolateCount,
(final index) =>
Isolate.spawn((final _) => _serve(), null, debugName: '$index'),
),
final isolates =
await List.generate(
isolateCount,
(final index) =>
Isolate.spawn((final _) => _serve(), null, debugName: '$index'),
).wait;

// Wait for SIGINT (Ctrl-C) or SIGTERM signal before shutting down.
await Future.any(
[
ProcessSignal.sigterm,
ProcessSignal.sigint,
].map((final s) => s.watch().first),
);

// Wait for SIGINT (Ctrl-C) signal before shutting down.
await ProcessSignal.sigint.watch().first;

// Gracefully terminate all spawned isolates.
for (final i in isolates) {
i.kill(priority: Isolate.immediate);
Expand Down
4 changes: 2 additions & 2 deletions test/isolated_object/isolated_object_evaluate_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void main() {
(final i) => isolated.evaluate((final counter) => counter.increment()),
);

await Future.wait(futures);
await futures.wait;

final result = await isolated.evaluate((final counter) => counter.value);
expect(result, 10);
Expand Down Expand Up @@ -190,7 +190,7 @@ void main() {
);
}

final results = await Future.wait(futures);
final results = await futures.wait;

// All operations should complete
expect(results.length, 100);
Expand Down
268 changes: 268 additions & 0 deletions test/relic_server_graceful_shutdown_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import 'dart:async';
import 'dart:io';

import 'package:http/http.dart' as http;
import 'package:relic/io_adapter.dart';
import 'package:relic/relic.dart';
import 'package:test/test.dart';

/// Creates a handler that signals when processing starts and waits for
/// a completer before responding.
///
/// [onRequestStarted] is called when the request starts processing.
/// [canComplete] is a completer that the handler waits for before responding.
Handler _createSignalingHandler({
required final void Function() onRequestStarted,
required final Completer<void> canComplete,
}) {
return (final req) async {
onRequestStarted();
await canComplete.future;
return Response.ok(body: Body.fromString('Completed'));
};
}

/// Creates a handler that delays for the specified duration before responding.
/// Used for multi-isolate tests where Completers cannot cross isolate boundaries.
Handler _createDelayedHandler(final Duration delay) {
return (final req) async {
await Future<void>.delayed(delay);
return Response.ok(body: Body.fromString('Completed'));
};
}

void main() {
group('Given a RelicServer with in-flight requests', () {
late RelicServer server;
bool serverClosed = false;

setUp(() async {
serverClosed = false;
server = RelicServer(
() => IOAdapter.bind(InternetAddress.loopbackIPv4, port: 0),
);
});

tearDown(() async {
// Server may already be closed by the test
if (!serverClosed) {
try {
await server.close();
} catch (_) {}
}
});

test(
'when server.close() is called during an in-flight request, '
'then the request completes successfully before server shuts down',
() async {
final requestStarted = Completer<void>();
final canComplete = Completer<void>();

await server.mountAndStart(
_createSignalingHandler(
onRequestStarted: requestStarted.complete,
canComplete: canComplete,
),
);

// Start a request
final responseFuture = http.get(Uri.http('localhost:${server.port}'));

// Wait for the request to start processing
await requestStarted.future;

// Verify request is in-flight
final infoBeforeClose = await server.connectionsInfo();
expect(
infoBeforeClose.active + infoBeforeClose.idle,
greaterThan(0),
reason: 'Expected at least one active or idle connection',
);

// Close the server while request is in-flight
final closeFuture = server.close();
serverClosed = true;

// Allow the request to complete
canComplete.complete();

// Wait for both the response and server close to complete
final (response, _) = await (responseFuture, closeFuture).wait;

// Verify the in-flight request completed successfully
expect(response.statusCode, HttpStatus.ok);
expect(response.body, 'Completed');
},
);

test(
'when server.close() is called with multiple concurrent in-flight requests, '
'then all requests complete successfully',
() async {
const numberOfRequests = 5;
var requestsStarted = 0;
final allRequestsStarted = Completer<void>();
final canComplete = Completer<void>();

await server.mountAndStart(
_createSignalingHandler(
onRequestStarted: () {
requestsStarted++;
if (requestsStarted == numberOfRequests) {
allRequestsStarted.complete();
}
},
canComplete: canComplete,
),
);

// Start multiple concurrent requests
final responseFutures = List.generate(
numberOfRequests,
(_) => http.get(Uri.http('localhost:${server.port}')),
);

// Wait for all requests to start processing
await allRequestsStarted.future;

// Close the server while requests are in-flight
final closeFuture = server.close();
serverClosed = true;

// Allow the requests to complete
canComplete.complete();

// Wait for all responses and server close at the same time
final (responses, _) = await (responseFutures.wait, closeFuture).wait;

// Verify all requests completed successfully
for (var i = 0; i < responses.length; i++) {
expect(
responses[i].statusCode,
HttpStatus.ok,
reason: 'Request $i should have completed with 200 OK',
);
expect(
responses[i].body,
'Completed',
reason: 'Request $i should have the expected body',
);
}
},
);

test('when server.close() is called, '
'then new requests are not accepted after close begins', () async {
final requestStarted = Completer<void>();
final canComplete = Completer<void>();

await server.mountAndStart(
_createSignalingHandler(
onRequestStarted: () {
if (!requestStarted.isCompleted) {
requestStarted.complete();
}
},
canComplete: canComplete,
),
);

// Start an in-flight request
final inFlightRequest = http.get(Uri.http('localhost:${server.port}'));

// Wait for the request to start processing
await requestStarted.future;

// Close the server
final closeFuture = server.close();

// Try to start a new request after close is initiated
// (This should fail or be rejected)
late http.Response? newRequestResponse;
Object? newRequestError;
try {
newRequestResponse = await http.get(
Uri.http('localhost:${server.port}'),
);
} catch (e) {
newRequestError = e;
}

// Allow the in-flight request to complete
canComplete.complete();

// Wait for close and in-flight request to complete
await (inFlightRequest, closeFuture).wait;

// New request should have either failed with an error
// or received a connection refused/reset error
// The exact behavior depends on timing and the underlying HTTP server
expect(
newRequestError != null || newRequestResponse?.statusCode != 200,
isTrue,
reason: 'New requests should be rejected after server begins closing',
);
});
});

group('Given a RelicServer with multi-isolate configuration', () {
late RelicServer server;
bool serverClosed = false;

setUp(() async {
serverClosed = false;
server = RelicServer(
() => IOAdapter.bind(InternetAddress.loopbackIPv4, port: 0),
noOfIsolates: 2,
);
});

tearDown(() async {
if (!serverClosed) {
try {
await server.close();
} catch (_) {}
}
});

test(
'when server.close() is called during in-flight requests across isolates, '
'then all requests complete successfully',
() async {
// Use delay-based handler because Completers cannot cross isolate
// boundaries
const requestDelay = Duration(milliseconds: 300);
const numberOfRequests = 4;

await server.mountAndStart(_createDelayedHandler(requestDelay));

// Start multiple concurrent requests that will be distributed
// across isolates
final responseFutures = List.generate(
numberOfRequests,
(_) => http.get(Uri.http('localhost:${server.port}')),
);

// Give requests time to start processing
await Future<void>.delayed(const Duration(milliseconds: 50));

// Close the server while requests are in-flight
final closeFuture = server.close();
serverClosed = true;

// Wait for all responses and server close at the same time
final (responses, _) = await (responseFutures.wait, closeFuture).wait;

// Verify all requests completed successfully
for (var i = 0; i < responses.length; i++) {
expect(
responses[i].statusCode,
HttpStatus.ok,
reason: 'Request $i should have completed with 200 OK',
);
}
},
);
});
}
Loading