diff --git a/example/advanced/multi_isolate.dart b/example/advanced/multi_isolate.dart index abce96bc..663ef817 100644 --- a/example/advanced/multi_isolate.dart +++ b/example/advanced/multi_isolate.dart @@ -12,17 +12,21 @@ void main() async { // Spawn all isolates concurrently and wait for them to start. log('Starting $isolateCount isolates'); - final isolates = await Future.wait( - List.generate( - isolateCount, - (final index) => - Isolate.spawn((final _) => _serve(), null, debugName: '$index'), - ), + final isolates = + await List.generate( + isolateCount, + (final index) => + Isolate.spawn((final _) => _serve(), null, debugName: '$index'), + ).wait; + + // Wait for SIGINT (Ctrl-C) or SIGTERM signal before shutting down. + await Future.any( + [ + ProcessSignal.sigterm, + ProcessSignal.sigint, + ].map((final s) => s.watch().first), ); - // Wait for SIGINT (Ctrl-C) signal before shutting down. - await ProcessSignal.sigint.watch().first; - // Gracefully terminate all spawned isolates. for (final i in isolates) { i.kill(priority: Isolate.immediate); diff --git a/test/isolated_object/isolated_object_evaluate_test.dart b/test/isolated_object/isolated_object_evaluate_test.dart index 3b06bc47..36bce4b3 100644 --- a/test/isolated_object/isolated_object_evaluate_test.dart +++ b/test/isolated_object/isolated_object_evaluate_test.dart @@ -54,7 +54,7 @@ void main() { (final i) => isolated.evaluate((final counter) => counter.increment()), ); - await Future.wait(futures); + await futures.wait; final result = await isolated.evaluate((final counter) => counter.value); expect(result, 10); @@ -190,7 +190,7 @@ void main() { ); } - final results = await Future.wait(futures); + final results = await futures.wait; // All operations should complete expect(results.length, 100); diff --git a/test/relic_server_graceful_shutdown_test.dart b/test/relic_server_graceful_shutdown_test.dart new file mode 100644 index 00000000..1790191e --- /dev/null +++ b/test/relic_server_graceful_shutdown_test.dart @@ -0,0 +1,268 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:http/http.dart' as http; +import 'package:relic/io_adapter.dart'; +import 'package:relic/relic.dart'; +import 'package:test/test.dart'; + +/// Creates a handler that signals when processing starts and waits for +/// a completer before responding. +/// +/// [onRequestStarted] is called when the request starts processing. +/// [canComplete] is a completer that the handler waits for before responding. +Handler _createSignalingHandler({ + required final void Function() onRequestStarted, + required final Completer canComplete, +}) { + return (final req) async { + onRequestStarted(); + await canComplete.future; + return Response.ok(body: Body.fromString('Completed')); + }; +} + +/// Creates a handler that delays for the specified duration before responding. +/// Used for multi-isolate tests where Completers cannot cross isolate boundaries. +Handler _createDelayedHandler(final Duration delay) { + return (final req) async { + await Future.delayed(delay); + return Response.ok(body: Body.fromString('Completed')); + }; +} + +void main() { + group('Given a RelicServer with in-flight requests', () { + late RelicServer server; + bool serverClosed = false; + + setUp(() async { + serverClosed = false; + server = RelicServer( + () => IOAdapter.bind(InternetAddress.loopbackIPv4, port: 0), + ); + }); + + tearDown(() async { + // Server may already be closed by the test + if (!serverClosed) { + try { + await server.close(); + } catch (_) {} + } + }); + + test( + 'when server.close() is called during an in-flight request, ' + 'then the request completes successfully before server shuts down', + () async { + final requestStarted = Completer(); + final canComplete = Completer(); + + await server.mountAndStart( + _createSignalingHandler( + onRequestStarted: requestStarted.complete, + canComplete: canComplete, + ), + ); + + // Start a request + final responseFuture = http.get(Uri.http('localhost:${server.port}')); + + // Wait for the request to start processing + await requestStarted.future; + + // Verify request is in-flight + final infoBeforeClose = await server.connectionsInfo(); + expect( + infoBeforeClose.active + infoBeforeClose.idle, + greaterThan(0), + reason: 'Expected at least one active or idle connection', + ); + + // Close the server while request is in-flight + final closeFuture = server.close(); + serverClosed = true; + + // Allow the request to complete + canComplete.complete(); + + // Wait for both the response and server close to complete + final (response, _) = await (responseFuture, closeFuture).wait; + + // Verify the in-flight request completed successfully + expect(response.statusCode, HttpStatus.ok); + expect(response.body, 'Completed'); + }, + ); + + test( + 'when server.close() is called with multiple concurrent in-flight requests, ' + 'then all requests complete successfully', + () async { + const numberOfRequests = 5; + var requestsStarted = 0; + final allRequestsStarted = Completer(); + final canComplete = Completer(); + + await server.mountAndStart( + _createSignalingHandler( + onRequestStarted: () { + requestsStarted++; + if (requestsStarted == numberOfRequests) { + allRequestsStarted.complete(); + } + }, + canComplete: canComplete, + ), + ); + + // Start multiple concurrent requests + final responseFutures = List.generate( + numberOfRequests, + (_) => http.get(Uri.http('localhost:${server.port}')), + ); + + // Wait for all requests to start processing + await allRequestsStarted.future; + + // Close the server while requests are in-flight + final closeFuture = server.close(); + serverClosed = true; + + // Allow the requests to complete + canComplete.complete(); + + // Wait for all responses and server close at the same time + final (responses, _) = await (responseFutures.wait, closeFuture).wait; + + // Verify all requests completed successfully + for (var i = 0; i < responses.length; i++) { + expect( + responses[i].statusCode, + HttpStatus.ok, + reason: 'Request $i should have completed with 200 OK', + ); + expect( + responses[i].body, + 'Completed', + reason: 'Request $i should have the expected body', + ); + } + }, + ); + + test('when server.close() is called, ' + 'then new requests are not accepted after close begins', () async { + final requestStarted = Completer(); + final canComplete = Completer(); + + await server.mountAndStart( + _createSignalingHandler( + onRequestStarted: () { + if (!requestStarted.isCompleted) { + requestStarted.complete(); + } + }, + canComplete: canComplete, + ), + ); + + // Start an in-flight request + final inFlightRequest = http.get(Uri.http('localhost:${server.port}')); + + // Wait for the request to start processing + await requestStarted.future; + + // Close the server + final closeFuture = server.close(); + + // Try to start a new request after close is initiated + // (This should fail or be rejected) + late http.Response? newRequestResponse; + Object? newRequestError; + try { + newRequestResponse = await http.get( + Uri.http('localhost:${server.port}'), + ); + } catch (e) { + newRequestError = e; + } + + // Allow the in-flight request to complete + canComplete.complete(); + + // Wait for close and in-flight request to complete + await (inFlightRequest, closeFuture).wait; + + // New request should have either failed with an error + // or received a connection refused/reset error + // The exact behavior depends on timing and the underlying HTTP server + expect( + newRequestError != null || newRequestResponse?.statusCode != 200, + isTrue, + reason: 'New requests should be rejected after server begins closing', + ); + }); + }); + + group('Given a RelicServer with multi-isolate configuration', () { + late RelicServer server; + bool serverClosed = false; + + setUp(() async { + serverClosed = false; + server = RelicServer( + () => IOAdapter.bind(InternetAddress.loopbackIPv4, port: 0), + noOfIsolates: 2, + ); + }); + + tearDown(() async { + if (!serverClosed) { + try { + await server.close(); + } catch (_) {} + } + }); + + test( + 'when server.close() is called during in-flight requests across isolates, ' + 'then all requests complete successfully', + () async { + // Use delay-based handler because Completers cannot cross isolate + // boundaries + const requestDelay = Duration(milliseconds: 300); + const numberOfRequests = 4; + + await server.mountAndStart(_createDelayedHandler(requestDelay)); + + // Start multiple concurrent requests that will be distributed + // across isolates + final responseFutures = List.generate( + numberOfRequests, + (_) => http.get(Uri.http('localhost:${server.port}')), + ); + + // Give requests time to start processing + await Future.delayed(const Duration(milliseconds: 50)); + + // Close the server while requests are in-flight + final closeFuture = server.close(); + serverClosed = true; + + // Wait for all responses and server close at the same time + final (responses, _) = await (responseFutures.wait, closeFuture).wait; + + // Verify all requests completed successfully + for (var i = 0; i < responses.length; i++) { + expect( + responses[i].statusCode, + HttpStatus.ok, + reason: 'Request $i should have completed with 200 OK', + ); + } + }, + ); + }); +}