I took a look at the unstable feature unstable-streams to convert a stream of bytes into an AsyncRead, which looks roughly like this:
fn internal_function_takes_async_read<T: AsyncRead>(reader: T) {}
#[pyclass]
pub struct AsyncGenBytes {
/// An async generator that produces byte arrays
py_async_gen: Py<PyAny>,
}
// Does not really compile because AsyncGenBytes does not implement PyFunctionArgument (not important for this example)
#[pyfunction]
fn takes_async_gen_bytes_exposed_to_python(py_async_gen: AsyncGenBytes) -> PyResult<()> {
let stream = Python::with_gil(|py| {
let bound = py_async_gen.py_async_gen.into_bound(py);
pyo3_async_runtimes::tokio::into_stream_v1(bound)
})?;
let byte_stream = stream.then(|res| async {
Python::with_gil(|py| match res {
Ok(obj) => match obj.extract::<Vec<u8>>(py) {
Ok(vec) => Ok(Bytes::from(vec)),
Err(e) => Err(std::io::Error::other(format!(
"Failed to extract bytes: {e}"
))),
},
Err(e) => Err(std::io::Error::other(format!("Python error: {e}"))),
})
});
// StreamReader implements AsyncRead
let stream_reader = tokio_util::io::StreamReader::new(byte_stream);
// do the things you want to do
internal_function_takes_async_read(stream_reader);
Ok(())
}
This seems to me like hacking around to get things done.
Suggestions on how to do it the right way are very appreciated.
Thanks all for this GREAT project!
I took a look at the unstable feature
unstable-streamsto convert a stream of bytes into anAsyncRead, which looks roughly like this:This seems to me like hacking around to get things done.
Suggestions on how to do it the right way are very appreciated.
Thanks all for this
GREATproject!