Skip to content

Commit 507fd74

Browse files
committed
Improve Linux builds, support venv and uvloop, and use pyo3_async_runtimes
1 parent c01233c commit 507fd74

File tree

4 files changed

+149
-85
lines changed

4 files changed

+149
-85
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ http-rewriter = { git = "ssh://git@github.com/platformatic/http-rewriter.git" }
2727
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
2828
napi = { version = "3.0.0-beta.8", default-features = false, features = ["napi4"], optional = true }
2929
napi-derive = { version = "3.0.0-beta.8", optional = true }
30-
pyo3 = { version = "0.25.1", features = ["auto-initialize", "experimental-async"] }
30+
pyo3 = { version = "0.25.1", features = ["experimental-async"] }
3131
pyo3-async-runtimes = { version = "0.25.0", features = ["tokio-runtime"] }
3232
thiserror = "2.0.12"
3333
tokio = { version = "1.45.1", features = ["full"] }

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"name": "python-node",
2+
"name": "@platformatic/python-node",
33
"description": "Run ASGI-compatible Python apps in Node.js",
44
"private": true,
55
"version": "0.0.0",

src/asgi/mod.rs

Lines changed: 128 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
2-
env::current_dir,
2+
env::{current_dir, var},
33
ffi::CString,
4-
fs::read_to_string,
4+
fs::{read_dir, read_to_string},
55
path::{Path, PathBuf},
66
};
77

@@ -13,11 +13,6 @@ unsafe extern "C" {
1313
fn dlopen(filename: *const i8, flag: i32) -> *mut c_void;
1414
}
1515

16-
#[cfg(target_os = "linux")]
17-
const RTLD_GLOBAL: i32 = 0x100;
18-
#[cfg(target_os = "linux")]
19-
const RTLD_NOW: i32 = 0x2;
20-
2116
use bytes::BytesMut;
2217
use http_handler::{Handler, Request, RequestExt, Response, extensions::DocumentRoot};
2318
use pyo3::exceptions::PyRuntimeError;
@@ -48,6 +43,34 @@ pub use websocket::{
4843
WebSocketConnectionScope, WebSocketReceiveMessage, WebSocketSendException, WebSocketSendMessage,
4944
};
5045

46+
/// Find all Python site-packages directories in a virtual environment
47+
fn find_python_site_packages(venv_path: &Path) -> Vec<PathBuf> {
48+
let mut site_packages_paths = Vec::new();
49+
50+
// Check both lib and lib64 directories
51+
for lib_dir in &["lib", "lib64"] {
52+
let lib_path = venv_path.join(lib_dir);
53+
if let Ok(entries) = read_dir(lib_path) {
54+
for entry in entries.flatten() {
55+
let entry_path = entry.path();
56+
if entry_path.is_dir() {
57+
if let Some(dir_name) = entry_path.file_name().and_then(|n| n.to_str()) {
58+
// Look for directories matching python3.* pattern
59+
if dir_name.starts_with("python3.") {
60+
let site_packages = entry_path.join("site-packages");
61+
if site_packages.exists() {
62+
site_packages_paths.push(site_packages);
63+
}
64+
}
65+
}
66+
}
67+
}
68+
}
69+
}
70+
71+
site_packages_paths
72+
}
73+
5174
/// Load Python library with RTLD_GLOBAL on Linux to make symbols available
5275
#[cfg(target_os = "linux")]
5376
fn ensure_python_symbols_global() {
@@ -80,28 +103,22 @@ fn ensure_python_symbols_global() {
80103
}
81104
}
82105

83-
// Fallback to common library names if find command fails
84-
let python_libs = [
85-
"libpython3.12.so.1.0\0",
86-
"libpython3.11.so.1.0\0",
87-
"libpython3.10.so.1.0\0",
88-
"libpython3.9.so.1.0\0",
89-
"libpython3.8.so.1.0\0",
90-
];
106+
const RTLD_GLOBAL: i32 = 0x100;
107+
const RTLD_NOW: i32 = 0x2;
91108

92-
for lib_name in &python_libs {
109+
// Fallback to trying common library names if find command fails
110+
// Try a range of Python versions (3.9 to 3.100 should cover future versions)
111+
for minor in 9..=100 {
112+
let lib_name = format!("libpython3.{}.so.1.0\0", minor);
93113
let handle = dlopen(lib_name.as_ptr() as *const i8, RTLD_NOW | RTLD_GLOBAL);
94114
if !handle.is_null() {
95115
// Successfully loaded Python library with RTLD_GLOBAL
96-
break;
116+
return;
97117
}
98118
}
99-
}
100-
}
101119

102-
#[cfg(not(target_os = "linux"))]
103-
fn ensure_python_symbols_global() {
104-
// On non-Linux platforms, this is typically not needed
120+
eprintln!("Failed to locate system Python library");
121+
}
105122
}
106123

107124
/// Core ASGI handler that loads and manages a Python ASGI application
@@ -116,7 +133,10 @@ impl Asgi {
116133
docroot: Option<String>,
117134
app_target: Option<PythonHandlerTarget>,
118135
) -> Result<Self, HandlerError> {
136+
pyo3::prepare_freethreaded_python();
137+
119138
// Ensure Python symbols are globally available before initializing
139+
#[cfg(target_os = "linux")]
120140
ensure_python_symbols_global();
121141

122142
// Determine document root
@@ -155,6 +175,29 @@ impl Asgi {
155175
let module_name = CString::new(target.file.clone()).map_err(HandlerError::StringCovertError)?;
156176

157177
Python::with_gil(|py| -> PyResult<PyObject> {
178+
// Set up sys.path with docroot and virtual environment paths
179+
let sys = py.import("sys")?;
180+
let path = sys.getattr("path")?;
181+
182+
// Add docroot to sys.path
183+
path.call_method1("insert", (0, docroot.to_string_lossy()))?;
184+
185+
// Check for VIRTUAL_ENV and add virtual environment paths
186+
if let Ok(virtual_env) = var("VIRTUAL_ENV") {
187+
let venv_path = PathBuf::from(&virtual_env);
188+
189+
// Dynamically find all Python site-packages directories
190+
let site_packages_paths = find_python_site_packages(&venv_path);
191+
192+
// Add all found site-packages paths to sys.path
193+
for site_packages in &site_packages_paths {
194+
path.call_method1("insert", (0, site_packages.to_string_lossy()))?;
195+
}
196+
197+
// Also add the virtual environment root
198+
path.call_method1("insert", (0, virtual_env))?;
199+
}
200+
158201
let module = PyModule::from_code(py, &code, &file_name, &module_name)?;
159202
Ok(module.getattr(&target.function)?.unbind())
160203
})
@@ -165,13 +208,51 @@ impl Asgi {
165208
pub fn docroot(&self) -> &Path {
166209
&self.docroot
167210
}
211+
212+
/// Handle a request synchronously using the pyo3_async_runtimes managed runtime
213+
pub fn handle_sync(&self, request: Request) -> Result<Response, HandlerError> {
214+
pyo3_async_runtimes::tokio::get_runtime().block_on(self.handle(request))
215+
}
216+
217+
/// Install an event loop for this thread, using uvloop if available
218+
pub fn install_loop(&self) -> Result<(), HandlerError> {
219+
Python::with_gil(|py| -> PyResult<()> {
220+
let asyncio = py.import("asyncio")?;
221+
222+
// Check if there's already an event loop on this thread
223+
let needs_new_loop = match asyncio.call_method0("get_event_loop") {
224+
Ok(existing_loop) => {
225+
// Check if the existing loop is closed
226+
existing_loop.call_method0("is_closed")?.extract::<bool>()?
227+
}
228+
Err(_) => true, // No event loop exists
229+
};
230+
231+
if needs_new_loop {
232+
// Set up event loop for this thread, using uvloop if available
233+
let loop_ = if let Ok(uvloop) = py.import("uvloop") {
234+
// Install uvloop policy if not already installed
235+
let _ = uvloop.call_method0("install");
236+
uvloop.call_method0("new_event_loop")?
237+
} else {
238+
asyncio.call_method0("new_event_loop")?
239+
};
240+
asyncio.call_method1("set_event_loop", (&loop_,))?;
241+
}
242+
243+
Ok(())
244+
}).map_err(HandlerError::PythonError)
245+
}
168246
}
169247

170248
#[async_trait::async_trait]
171249
impl Handler for Asgi {
172250
type Error = HandlerError;
173251

174252
async fn handle(&self, request: Request) -> Result<Response, Self::Error> {
253+
// Ensure the event loop is installed
254+
self.install_loop()?;
255+
175256
// Set document root extension
176257
let mut request = request;
177258
request.set_document_root(DocumentRoot {
@@ -237,40 +318,40 @@ impl Handler for Asgi {
237318

238319
// Channel closed without complete response
239320
if response_started {
240-
let _ = response_tx.send(Err("Response started but not completed"));
321+
let _ = response_tx.send(Err(HandlerError::ResponseInterrupted));
241322
} else {
242-
let _ = response_tx.send(Err("No response received"));
323+
let _ = response_tx.send(Err(HandlerError::NoResponse));
243324
}
244325
});
245326

246327
// Execute Python
247328
let py_func = Python::with_gil(|py| self.app_function.clone_ref(py));
248-
let python_result = tokio::task::spawn_blocking(move || {
249-
execute_coroutine(py_func, scope, rx_receiver, tx_sender)
250-
})
251-
.await
252-
.map_err(|_| HandlerError::PythonError(PyRuntimeError::new_err("Python task panicked")))?;
253-
254-
// Check if Python execution had errors
255-
if let Err(e) = python_result {
256-
return Err(HandlerError::PythonError(e));
257-
}
258329

259-
// If the channel is closed without a response, return an error
260-
let response = match response_rx.await {
261-
Err(_) => {
262-
return Err(HandlerError::PythonError(PyRuntimeError::new_err(
263-
"Response channel error",
264-
)));
265-
}
266-
Ok(response) => response,
267-
};
330+
// Now create the coroutine and convert it to a future
331+
let coroutine = Python::with_gil(|py| {
332+
let scope_py = scope.into_pyobject(py)?;
333+
py_func.call1(py, (scope_py, rx_receiver, tx_sender))
334+
})?;
335+
336+
// TODO: This will block the current thread until the coroutine completes.
337+
// We should see if there's a way to execute coroutines concurrently.
338+
// Blocking in an async function is not great as tokio will assume the
339+
// function should yield control when it's not busy, so we're wasting a
340+
// thread here. Likely we should implement `Stream` around a coroutine
341+
// wrapper to poll it instead. The `run` is internally running the
342+
// `run_until_complete` method, which blocks the current thread until
343+
// the coroutine completes.
344+
Python::with_gil(|py| {
345+
pyo3_async_runtimes::tokio::run(py, async move {
346+
Python::with_gil(|py| {
347+
pyo3_async_runtimes::tokio::into_future(coroutine.into_bound(py))
348+
})?.await
349+
})
350+
})?;
268351

269352
// If an error was sent through the channel, return it
270-
let (status, headers, body) = match response {
271-
Ok(res) => res,
272-
Err(e) => return Err(HandlerError::PythonError(PyRuntimeError::new_err(e))),
273-
};
353+
let maybe_response = response_rx.await?;
354+
let (status, headers, body) = maybe_response?;
274355

275356
// If we reach here, we have a valid response
276357
let mut builder = http_handler::response::Builder::new().status(status);
@@ -284,25 +365,3 @@ impl Handler for Asgi {
284365
.map_err(HandlerError::HttpHandlerError)
285366
}
286367
}
287-
288-
// Execute the Python coroutine for the ASGI app via asyncio
289-
fn execute_coroutine(
290-
py_func: PyObject,
291-
scope: HttpConnectionScope,
292-
rx_receiver: Receiver,
293-
tx_sender: Sender,
294-
) -> PyResult<()> {
295-
Python::with_gil(|py| {
296-
let scope_py = scope.into_pyobject(py)?;
297-
let coroutine = py_func.call1(py, (scope_py, rx_receiver, tx_sender))?;
298-
299-
let asyncio = py.import("asyncio")?;
300-
let loop_ = asyncio.call_method0("new_event_loop")?;
301-
asyncio.call_method1("set_event_loop", (&loop_,))?;
302-
303-
loop_.call_method1("run_until_complete", (coroutine,))?;
304-
loop_.call_method0("close")?;
305-
306-
Ok(())
307-
})
308-
}

src/lib.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::sync::Arc;
1414
#[cfg(feature = "napi-support")]
1515
use http_handler::napi::{Request as NapiRequest, Response as NapiResponse};
1616
#[cfg(feature = "napi-support")]
17-
use http_handler::{Handler, Request, Response};
17+
use http_handler::{Request, Response};
1818
#[cfg(feature = "napi-support")]
1919
#[allow(unused_imports)]
2020
use http_rewriter::napi::Rewriter;
@@ -23,11 +23,10 @@ use http_rewriter::napi::Rewriter;
2323
extern crate napi_derive;
2424
#[cfg(feature = "napi-support")]
2525
use napi::bindgen_prelude::*;
26-
#[cfg(feature = "napi-support")]
27-
use tokio::runtime::Runtime;
2826

2927
mod asgi;
3028
pub use asgi::Asgi;
29+
use tokio::sync::oneshot::error::RecvError;
3130

3231
/// The Python module and function for handling requests.
3332
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -306,6 +305,18 @@ pub enum HandlerError {
306305
#[error("Python error: {0}")]
307306
PythonError(#[from] pyo3::prelude::PyErr),
308307

308+
/// Error when response channel is closed before sending a response.
309+
#[error("No response sent")]
310+
NoResponse,
311+
312+
/// Error when response is interrupted.
313+
#[error("Response interrupted")]
314+
ResponseInterrupted,
315+
316+
/// Error when response channel is closed.
317+
#[error("Response channel closed")]
318+
ResponseChannelClosed(#[from] RecvError),
319+
309320
/// Error when creating an HTTP response fails.
310321
#[error("Failed to create response: {0}")]
311322
HttpHandlerError(#[from] http_handler::Error),
@@ -317,18 +328,12 @@ impl Task for PythonRequestTask {
317328
type Output = Response;
318329
type JsValue = NapiResponse;
319330

320-
// Handle the PHP request in the worker thread.
331+
// Handle the Python request in the worker thread.
321332
fn compute(&mut self) -> Result<Self::Output> {
322-
// Can't use Handle::current() as this thread won't have a runtime configured.
323-
let runtime = Runtime::new().map_err(|err| Error::from_reason(err.to_string()))?;
324-
325-
runtime.block_on(async {
326-
self
327-
.asgi
328-
.handle(self.request.clone())
329-
.await
330-
.map_err(|err| Error::from_reason(err.to_string()))
331-
})
333+
self
334+
.asgi
335+
.handle_sync(self.request.clone())
336+
.map_err(|err| Error::from_reason(err.to_string()))
332337
}
333338

334339
// Handle converting the PHP response to a JavaScript response in the main thread.

0 commit comments

Comments
 (0)