Stream from FTP server to HTTP client

'ello,

I'm looking to replace my Python ftp proxy script with some rust on Axum. Any pointer would be grand as I have no idea in which I direction I should go.

I can connect to the ftp server and fetch metadata about the files just fine. What I don't get is how I can stream the data back, without read the whole file in into either mem or a temp file.

A big thanks up front for any input!

-Roel

The gist of the Python script is as follows:

import aioftp
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get('/fetch/{filename}', name='fetch')
async def ftp_fetch(request: Request, filename: str):
    return StreamingResponse(stream_file(filename))

async def stream_file(filename):
    async with aioftp.Client.context("insert credentials here") as client:
        async with client.download_stream(filename) as stream:
            async for block in stream.iter_by_block():
                yield block

Ideally I'd end up with something in the lines of

async fn stream_ftp_file ... {
  let mut ftp_conn = AnFtpThingy::connect("hostname");
  let ftp_conn.login("username", "password");
  let data_stream = ftp_stream.retr_as_stream(filename);
  return Axum::body::StreamBody(data_stream);
}

#[tokio::main]
async fn main() {
  let app = Router::new()
    .route("/path/:filename", get(stream_ftp_file));
  // more details omitted.
}

What are you using to connect to the FTP server? It looks to me like ftp has several methods for reading data in smaller chunks.

1 Like

I guess he'd need to use ftp_stream.get_ref(), which returns a &std::net::TcpStream and use tcp_stream.read()?

Not unless you wanted to decode responses manually. You'd have to do some work to bridge the blocking reader(s) to an async stream, but retr or get should be workable.

1 Like

I'm using suppaftp - Rust as it have async support, the label on the package claimed.

What I've made so far is:

  async fn stream_ftp_file(
    Path(filename): Path<String>,
  ) -> StreamBody<impl Stream<Item = Result<_, _>>> {
    let mut ftp_stream = AsyncFtpStream::connect("hostname")
      .await
      .unwrap();
    ftp_stream.login("username", "password").await.unwrap();
    ftp_stream.transfer_type(FileType::Binary).await;

    // Either I can take a stream, which I somehow have to close after sending all data. Or I can use
    // ftp_stream.retr(filename, some_reader_func). I have no idea how to make a function that will read
    // from the stream and output it inside the StreamBody?

    // This can't be the way to go.
    let mut data_stream = ftp_stream.retr_as_stream(filename).await.unwrap();
    let le_stream = data_stream.get_ref();
    StreamBody::new(le_stream)
  }

Maybe I'm to create a Stream that will open the ftp thingy, read it?
Maybe I should just mount the ftp server on the FS and read that directory instead?

Any Input would be grand while I'm messing this up a bit more! :smiley:

First you need something to bridge from AsyncRead to the Stream trait. I wasn't able to find a simple way to do that just with the futures crate, but tokio_util has a ReaderStream type which does just that. Since that's a tokio type, you have to also convert the type implementing futures::io::AsyncRead to tokio's AsyncRead, which you can do with the FuturesAsyncReadCompatExt trait.

I added this to my Cargo.toml

tokio-util = { version = "0.7.8", features = ["io", "compat"] }

and was able to get your code to compile with those changes

use axum::{
    body::{Bytes, StreamBody},
    extract::Path,
};
use futures::stream::Stream;
use suppaftp::{types::FileType, AsyncFtpStream};
use tokio_util::{compat::FuturesAsyncReadCompatExt, io::ReaderStream};

async fn stream_ftp_file(
    Path(filename): Path<String>,
) -> StreamBody<impl Stream<Item = std::io::Result<Bytes>>> {
    let mut ftp_stream = AsyncFtpStream::connect("hostname").await.unwrap();
    ftp_stream.login("username", "password").await.unwrap();
    ftp_stream.transfer_type(FileType::Binary).await.unwrap();

    let data_stream = ftp_stream.retr_as_stream(filename).await.unwrap();

    StreamBody::new(ReaderStream::new(data_stream.compat()))
}
2 Likes

Thank you @semicoleon for helping me out. The rust promise "if it compiles, it will do something" is correct!

I get a nice 32KB out of the about 137M the file is big, so I have a new thing to look into tomorrow :smiley:

curl http://localhost/uzg/filename.mp3

2023-07-17T20:09:02.572Z DEBUG [hyper::proto::h1::io] flushed 31812 bytes
2023-07-17T20:09:02.573Z DEBUG [hyper::proto::h1::conn] read eof

Cheers!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.