I'm looking to replace my Python ftp proxy script with some rust on Axum. Any pointer would be grand as I have no idea in which I direction I should go.
I can connect to the ftp server and fetch metadata about the files just fine. What I don't get is how I can stream the data back, without read the whole file in into either mem or a temp file.
A big thanks up front for any input!
-Roel
The gist of the Python script is as follows:
import aioftp
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get('/fetch/{filename}', name='fetch')
async def ftp_fetch(request: Request, filename: str):
return StreamingResponse(stream_file(filename))
async def stream_file(filename):
async with aioftp.Client.context("insert credentials here") as client:
async with client.download_stream(filename) as stream:
async for block in stream.iter_by_block():
yield block
Ideally I'd end up with something in the lines of
async fn stream_ftp_file ... {
let mut ftp_conn = AnFtpThingy::connect("hostname");
let ftp_conn.login("username", "password");
let data_stream = ftp_stream.retr_as_stream(filename);
return Axum::body::StreamBody(data_stream);
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/path/:filename", get(stream_ftp_file));
// more details omitted.
}
Not unless you wanted to decode responses manually. You'd have to do some work to bridge the blocking reader(s) to an async stream, but retr or get should be workable.
I'm using suppaftp - Rust as it have async support, the label on the package claimed.
What I've made so far is:
async fn stream_ftp_file(
Path(filename): Path<String>,
) -> StreamBody<impl Stream<Item = Result<_, _>>> {
let mut ftp_stream = AsyncFtpStream::connect("hostname")
.await
.unwrap();
ftp_stream.login("username", "password").await.unwrap();
ftp_stream.transfer_type(FileType::Binary).await;
// Either I can take a stream, which I somehow have to close after sending all data. Or I can use
// ftp_stream.retr(filename, some_reader_func). I have no idea how to make a function that will read
// from the stream and output it inside the StreamBody?
// This can't be the way to go.
let mut data_stream = ftp_stream.retr_as_stream(filename).await.unwrap();
let le_stream = data_stream.get_ref();
StreamBody::new(le_stream)
}
Maybe I'm to create a Stream that will open the ftp thingy, read it?
Maybe I should just mount the ftp server on the FS and read that directory instead?
Any Input would be grand while I'm messing this up a bit more!
First you need something to bridge from AsyncRead to the Stream trait. I wasn't able to find a simple way to do that just with the futures crate, but tokio_util has a ReaderStream type which does just that. Since that's a tokio type, you have to also convert the type implementing futures::io::AsyncRead to tokio's AsyncRead, which you can do with the FuturesAsyncReadCompatExt trait.
I added this to my Cargo.toml
tokio-util = { version = "0.7.8", features = ["io", "compat"] }
and was able to get your code to compile with those changes
use axum::{
body::{Bytes, StreamBody},
extract::Path,
};
use futures::stream::Stream;
use suppaftp::{types::FileType, AsyncFtpStream};
use tokio_util::{compat::FuturesAsyncReadCompatExt, io::ReaderStream};
async fn stream_ftp_file(
Path(filename): Path<String>,
) -> StreamBody<impl Stream<Item = std::io::Result<Bytes>>> {
let mut ftp_stream = AsyncFtpStream::connect("hostname").await.unwrap();
ftp_stream.login("username", "password").await.unwrap();
ftp_stream.transfer_type(FileType::Binary).await.unwrap();
let data_stream = ftp_stream.retr_as_stream(filename).await.unwrap();
StreamBody::new(ReaderStream::new(data_stream.compat()))
}