diff --git a/dcli/lib/src/process/environment.dart b/dcli/lib/src/process/environment.dart index eaed5695..5bcdfaf4 100644 --- a/dcli/lib/src/process/environment.dart +++ b/dcli/lib/src/process/environment.dart @@ -1,12 +1,11 @@ import 'package:dcli_core/dcli_core.dart'; -import 'process/process_sync.dart'; +// import 'process/process_sync.dart'; /// Used internally to pass environment variables across an isolate /// boundary when using [ProcessSync] to synchronously call a process. class ProcessEnvironment { - factory ProcessEnvironment() => - ProcessEnvironment._(envs, Env().caseSensitive); + factory ProcessEnvironment() => ProcessEnvironment._(envs, Env().caseSensitive); ProcessEnvironment._(this.envVars, this.caseSensitive); diff --git a/dcli/lib/src/process/process/mailbox.dart b/dcli/lib/src/process/process/mailbox.dart deleted file mode 100644 index 09b8649d..00000000 --- a/dcli/lib/src/process/process/mailbox.dart +++ /dev/null @@ -1,241 +0,0 @@ -// @dart=3.0 - -// ignore_for_file: non_constant_identifier_names - -import 'dart:ffi'; -import 'dart:typed_data'; - -import 'package:dcli_core/dcli_core.dart'; -import 'package:ffi/ffi.dart'; - -import 'message.dart'; -// -// POSIX threading primitives -// - -/// Represents `pthread_mutex_t` -final class PthreadMutex extends Opaque {} - -/// Represents `pthread_cond_t` -final class PthreadCond extends Opaque {} - -@Native, Pointer)>() -external int pthread_mutex_init( - Pointer mutex, Pointer attrs); - -@Native)>() -external int pthread_mutex_lock(Pointer mutex); - -@Native)>() -external int pthread_mutex_unlock(Pointer mutex); - -@Native, Pointer)>() -external int pthread_cond_init(Pointer cond, Pointer attrs); - -@Native, Pointer)>() -external int pthread_cond_wait( - Pointer cond, Pointer mutex); - -@Native)>() -external int pthread_cond_signal(Pointer cond); - -/// Runs [body] with [mutex] locked. -R lock(Pointer mutex, R Function() body) { - check(pthread_mutex_lock(mutex)); - try { - return body(); - } finally { - check(pthread_mutex_unlock(mutex)); - } -} - -void check(int retval) { - if (retval != 0) { - throw MailboxException('FFI operaton failed'); - } -} - -// -// Single producer single consumer mailbox for synchronous communication -// between two isolates. -// - -final class _MailboxRepr extends Struct { - external Pointer buffer; - - @Int32() - external int bufferLength; - - @Int32() - external int state; -} - -extension on Pointer<_MailboxRepr> { - Pointer get mutex => - Pointer.fromAddress(address + Mailbox.mutexOffs); - Pointer get condRequest => - Pointer.fromAddress(address + Mailbox.condRequestOffs); - Pointer get condResponse => - Pointer.fromAddress(address + Mailbox.condResponseOffs); -} - -/// This class allows two isolates (a worker and a dispatcher isolate which -/// spawned it) to communicate synchronously. Dispatcher sends a request to the -/// worker and synchronously waits for response to arrive. -class Mailbox { - /// Create a new mailbox for communication between dispatcher and the worker. - Mailbox() : _mailbox = calloc.allocate(Mailbox.totalSize) { - check(pthread_mutex_init(_mailbox.mutex, nullptr)); - check(pthread_cond_init(_mailbox.condRequest, nullptr)); - check(pthread_cond_init(_mailbox.condResponse, nullptr)); - } - - /// Create a mailbox pointing to an already existing mailbox. - Mailbox.fromAddress(int address) : _mailbox = Pointer.fromAddress(address); - static const int mutexSize = 64; - static const int condSize = 64; - static final int headerSize = sizeOf<_MailboxRepr>(); - static final int mutexOffs = headerSize; - static final int condRequestOffs = mutexOffs + mutexSize; - static final int condResponseOffs = condRequestOffs + condSize; - static final int totalSize = condResponseOffs + condSize; - - final Pointer<_MailboxRepr> _mailbox; - bool isRunning = true; - - static const stateNone = 0; - static const stateRequest = 1; - static const stateResponse = 2; - - /// Send the given [message] to the worker isolate and wait for it to - /// produce a response. - /// - /// Performance note: [message] is copied into native memory and response is - /// copied from native memory into the Dart heap. - Uint8List sendRequest(Uint8List message) { - final buffer = _toBuffer(message); - return _toList(lock(_mailbox.mutex, () { - if (_mailbox.ref.state != stateNone) { - throw MailboxException('Illegal Mailbox state'); - } - - _mailbox.ref.state = stateRequest; - _mailbox.ref.buffer = buffer; - _mailbox.ref.bufferLength = message.length; - - // Wake the worker. - pthread_cond_signal(_mailbox.condRequest); - - // Wait for it to produce the result. - while (_mailbox.ref.state != stateResponse) { - pthread_cond_wait(_mailbox.condResponse, _mailbox.mutex); - } - - // Handle the result. - _mailbox.ref.state = stateNone; - final response = - (buffer: _mailbox.ref.buffer, length: _mailbox.ref.bufferLength); - _mailbox.ref.buffer = nullptr; - _mailbox.ref.bufferLength = 0; - return response; - })); - } - - void respond(Uint8List? message) { - if (_mailbox.ref.state != stateNone) { - throw MailboxException('Invalid state: ${_mailbox.ref.state}'); - } - - final buffer = message != null ? _toBuffer(message) : nullptr; - lock(_mailbox.mutex, () { - if (_mailbox.ref.state != stateNone) { - throw MailboxException('Invalid state: ${_mailbox.ref.state}'); - } - - _mailbox.ref.state = stateResponse; - _mailbox.ref.buffer = buffer; - _mailbox.ref.bufferLength = message?.length ?? 0; - pthread_cond_signal(_mailbox.condResponse); - }); - } - - static final _emptyResponse = Uint8List(0); - - /// Takes one [MessageResponse] from the mailbox. - /// This method will not return until a message arrives. - Uint8List takeOneMessage() => lock(_mailbox.mutex, () { - // Wait for request to arrive. - while (_mailbox.ref.state != stateResponse) { - pthread_cond_wait(_mailbox.condResponse, _mailbox.mutex); - } - - final result = _toList( - (buffer: _mailbox.ref.buffer, length: _mailbox.ref.bufferLength)); - - _mailbox.ref.state = stateNone; - _mailbox.ref.buffer = nullptr; - _mailbox.ref.bufferLength = 0; - return result; - }); - - /// Process messages which arrive to this mailbox. - /// - /// Calls [handleMessage] for each incoming message and then sends the - // ignore: comment_references - /// response it produces back to the requestor. [msg] buffer is only valid - /// for the duration of the [handleMessage] callback. - /// - /// Performance note: copies response to the native memory. - void messageLoop( - Uint8List Function(Mailbox mailbox, Uint8List msg) handleMessage) { - lock(_mailbox.mutex, () { - while (isRunning) { - // Wait for request to arrive. - while (_mailbox.ref.state != stateRequest) { - pthread_cond_wait(_mailbox.condRequest, _mailbox.mutex); - } - - final response = handleMessage( - this, _mailbox.ref.buffer.asTypedList(_mailbox.ref.bufferLength)); - malloc.free(_mailbox.ref.buffer); - - _mailbox.ref.state = stateResponse; - _mailbox.ref.buffer = _toBuffer(response); - _mailbox.ref.bufferLength = response.length; - pthread_cond_signal(_mailbox.condResponse); - } - }); - } - - int get rawAddress => _mailbox.address; - - static Uint8List _toList(({Pointer buffer, int length}) data) { - if (data.length == 0) { - return _emptyResponse; - } - - // Ideally we would like just to do `buffer.asTypedList(length)` and - // have finaliser take care of freeing, but we currently can't express - // this in pure Dart in a reliable way without some hacks - because - // [Finalizer] only runs callbacks at the top of the event loop and - // [NativeFinalizer] does not accept Dart functions as a finalizer. - final list = Uint8List(data.length); - for (var i = 0; i < data.length; i++) { - list[i] = data.buffer[i]; - } - malloc.free(data.buffer); - return list; - } - - static Pointer _toBuffer(Uint8List list) { - final buffer = malloc.allocate(list.length); - for (var i = 0; i < list.length; i++) { - buffer[i] = list[i]; - } - return buffer; - } -} - -class MailboxException extends DCliException { - MailboxException(super.message); -} diff --git a/dcli/lib/src/process/process/native_calls.dart b/dcli/lib/src/process/process/native_calls.dart index e91a0c42..421774f7 100644 --- a/dcli/lib/src/process/process/native_calls.dart +++ b/dcli/lib/src/process/process/native_calls.dart @@ -24,18 +24,14 @@ class NativeCalls { static final Object Function(int) _connectToPort = _initNativeConnectToPort(); /// Don't really know why but we go and find the - /// native dart method to connect to a prot. + /// native dart method to connect to a port. static Object Function(int) _initNativeConnectToPort() { - final functions = - NativeApi.initializeApiDLData.cast<_DartApi>().ref.functions; + final functions = NativeApi.initializeApiDLData.cast<_DartApi>().ref.functions; late Object Function(int) connectToPort; for (var i = 0; functions[i].name != nullptr; i++) { if (functions[i].name.toDartString() == 'Dart_NewSendPort') { - connectToPort = functions[i] - .function - .cast>() - .asFunction(); + connectToPort = functions[i].function.cast>().asFunction(); break; } } diff --git a/dcli/lib/src/process/process/pipe_sync.dart b/dcli/lib/src/process/process/pipe_sync.dart index 5ae5a37d..6f02bf77 100644 --- a/dcli/lib/src/process/process/pipe_sync.dart +++ b/dcli/lib/src/process/process/pipe_sync.dart @@ -30,7 +30,7 @@ class PipeSync { /// exit code do we return? int? get exitCode => _rhsChannel.exitCode; - /// Run the two given process as defined by [lhsSettings] and [rhsSettings] + /// Run the two given process as defined by [lhsSettings] (left-hand-side settings) and [rhsSettings] (right-hand-side settings). /// piping the input from the [lhsSettings] process into the [rhsSettings] /// process. /// diff --git a/dcli/lib/src/process/process/process_channel.dart b/dcli/lib/src/process/process/process_channel.dart index a9a7bf5d..6716be14 100644 --- a/dcli/lib/src/process/process/process_channel.dart +++ b/dcli/lib/src/process/process/process_channel.dart @@ -7,9 +7,12 @@ import 'dart:convert'; import 'dart:isolate'; import 'dart:typed_data'; -import 'mailbox.dart'; +import 'package:native_synchronization/mailbox.dart'; + import 'message.dart'; -import 'process_sync.dart'; +// import 'mailbox.dart'; +import 'pipe_sync.dart'; +// import 'process_sync.dart'; /// Send and Receive data to/from a process /// running in an isolate using a pair of mailboxes. @@ -82,12 +85,12 @@ class ProcessChannel { late final StringConversionSink splitter; late final ByteConversionSink decoder; -// TODO: this probably need to be int arrays so +// TODO DONE?: this probably need to be int arrays so // we can handly binary data. final List> stdoutLines = >[]; final List> stderrLines = >[]; - int get sendAddress => send.rawAddress; - int get responseAddress => response.rawAddress; + // int get sendAddress => send.rawAddress; + // int get responseAddress => response.rawAddress; int? _exitCode; int? get exitCode => _exitCode; @@ -147,10 +150,9 @@ class ProcessChannel { /// check the data has been sent to the spawned process /// before we return - final response = send.takeOneMessage(); + final response = send.take(); if (response.isEmpty || response[0] != RECEIVED) { - throw ProcessSyncException( - 'Expecting a write confirmation: got $response'); + throw ProcessSyncException('Expecting a write confirmation: got $response'); } } @@ -158,7 +160,7 @@ class ProcessChannel { Uint8List bytes; /// drain the mailbox - bytes = response.takeOneMessage(); + bytes = response.take(); _recieveFromIsolate(bytes); // decoder.add(bytes); diff --git a/dcli/lib/src/process/process/process_in_isolate.dart b/dcli/lib/src/process/process/process_in_isolate.dart index 30ba97ec..dd248bfa 100644 --- a/dcli/lib/src/process/process/process_in_isolate.dart +++ b/dcli/lib/src/process/process/process_in_isolate.dart @@ -6,13 +6,17 @@ import 'dart:ffi'; import 'dart:isolate'; import 'dart:typed_data'; +import 'package:native_synchronization/mailbox.dart'; +import 'package:native_synchronization/sendable.dart'; + import 'in_isolate/runner.dart'; -import 'mailbox.dart'; +// import 'mailbox.dart'; import 'message.dart'; import 'native_calls.dart'; import 'process_channel.dart'; import 'process_settings.dart'; import 'process_sync.dart'; +// import 'process_sync.dart'; void startIsolate(ProcessSettings settings, ProcessChannel channel) { // print('starting isolate'); @@ -27,18 +31,19 @@ void startIsolate(ProcessSettings settings, ProcessChannel channel) { SendPort _connectSendPort(ProcessChannel channel) { /// take the initial message which contains /// the channels sendPort id. - final msg = channel.send.takeOneMessage(); + final msg = channel.send.take(); + return NativeCalls.connectToPort(msg); } /// Starts an isolate that spawns the command. void _startIsolate(ProcessSettings processSettings, ProcessChannel channel) { - unawaited(Isolate.spawn((mailboxAddrs) async { + unawaited(Isolate.spawn>>((mailboxes) async { // print('isoalte has started'); /// This code runs in the isolate. - final sendMailbox = Mailbox.fromAddress(mailboxAddrs[0]); - final responseMailbox = Mailbox.fromAddress(mailboxAddrs[1]); + final sendMailbox = mailboxes.first.materialize(); + final responseMailbox = mailboxes.last.materialize(); final runner = ProcessRunner(processSettings); // print('starting process ${processSettings.command} in isolate'); @@ -65,7 +70,7 @@ void _startIsolate(ProcessSettings processSettings, ProcessChannel channel) { /// The tell the sender that we got their data and /// sent it to stdin - sendMailbox.respond(Uint8List.fromList([ProcessChannel.RECEIVED])); + sendMailbox.put(Uint8List.fromList([ProcessChannel.RECEIVED])); } else { throw ProcessSyncException('Wrong message: $message'); } @@ -74,7 +79,7 @@ void _startIsolate(ProcessSettings processSettings, ProcessChannel channel) { /// Tell the primary isolate what our native port address is /// so it can send stuff to use sychronously. final msg = Int64List(1)..[0] = port.sendPort.nativePort; - sendMailbox.respond(msg.buffer.asUint8List()); + sendMailbox.put(msg.buffer.asUint8List()); /// used to wait for the stdout stream to finish streaming final stdoutStreamDone = Completer(); @@ -84,7 +89,7 @@ void _startIsolate(ProcessSettings processSettings, ProcessChannel channel) { stdoutSub = process!.stdout.listen((data) { stdoutSub.pause(); // print('writting to stdout: ${utf8.decode(data)}'); - responseMailbox.respond(Message.stdout(data as Uint8List).message); + responseMailbox.put(Message.stdout(data as Uint8List).message); }, onDone: () { stdoutStreamDone.complete(); // print('marking stdout in isolate done'); @@ -97,7 +102,7 @@ void _startIsolate(ProcessSettings processSettings, ProcessChannel channel) { /// it back to the parent isolate stderrSub = process.stderr.listen((data) { stderrSub.pause(); - responseMailbox.respond(Message.stderr(data as Uint8List).message); + responseMailbox.put(Message.stderr(data as Uint8List).message); }, onDone: () { stderrStreamDone.complete(); // print('marking stderr in isolate done'); @@ -115,14 +120,15 @@ void _startIsolate(ProcessSettings processSettings, ProcessChannel channel) { /// the parent isolate final exitCode = await process.exitCode; // print('process has exited with exitCode: $exitCode'); - responseMailbox.respond(Message.exit(exitCode).message); + responseMailbox.put(Message.exit(exitCode).message); await stdoutSub.cancel(); await stderrSub.cancel(); }, // pass list of mailbox addresses into the isolate entry point. - [ - channel.sendAddress, - channel.responseAddress, - ])); + List>.from([ + channel.send.asSendable, + channel.response.asSendable, + ]), + debugName: 'ProcessInIsolate')); } diff --git a/dcli/lib/src/process/process/process_sync.dart b/dcli/lib/src/process/process/process_sync.dart index 1140f002..7c191beb 100644 --- a/dcli/lib/src/process/process/process_sync.dart +++ b/dcli/lib/src/process/process/process_sync.dart @@ -20,7 +20,7 @@ class ProcessSync { void listenStdout(void Function(List) callback) { _channel.listenStdout((data) { - print('processSync recieved data from channel'); + // print('processSync recieved data from channel'); callback(data); }); } @@ -53,8 +53,7 @@ class ProcessSync { /// Start the process but redirect stdout and stderr to /// [stdout] and [stderr] respectively. - void pipe(ProcessSettings settings, Stream> stdin, - Sink> stdout) { + void pipe(ProcessSettings settings, Stream> stdin, Sink> stdout) { _channel = ProcessChannel.pipe(stdin, stdout); startIsolate(settings, _channel); diff --git a/dcli/pubspec.yaml b/dcli/pubspec.yaml index 529a8676..995e3d50 100644 --- a/dcli/pubspec.yaml +++ b/dcli/pubspec.yaml @@ -27,6 +27,7 @@ dependencies: logging: ^1.0.2 meta: ^1.3.0 mime: ^1.0.1 + native_synchronization: ^0.2.0 path: ^1.8.0 posix: ^6.0.1 pub_semver: ^2.0.0