Parse request body as string in h2


#1

Hi, I am trying to create REST api’s in HTTP 2 protocol using h2, but i am not able to find any way to parse the request body as string.

I am quite a newbie in rust.
It would be great if someone can point me to the right direction.

Thanks!

extern crate futures;
extern crate h2;
extern crate http;
extern crate tokio_core;

use futures::{Future, Stream};
use h2::server;
use http::{Response, StatusCode};
use tokio_core::reactor;
use tokio_core::net::TcpListener;

pub fn main () {
    let mut core = reactor::Core::new().unwrap();
    let handle = core.handle();

    let addr = "127.0.0.1:5928".parse().unwrap();
    let listener = TcpListener::bind(&addr, &handle).unwrap();

    core.run({
        // Accept all incoming TCP connections.
        listener.incoming().for_each(move |(socket, _)| {
            // Spawn a new task to process each connection.
            handle.spawn({
                // Start the HTTP/2.0 connection handshake
                server::handshake(socket)
                    .and_then(|h2| {
                        // Accept all inbound HTTP/2.0 streams sent over the
                        // connection.
                        h2.for_each(|(request, mut respond)| {
                            println!("Received request: {:?}", request);
                            let (head, mut body) = request.into_parts();
                            println!("head==> {:?}", head);
                            println!("body==> {:?}", body);

                            // Build a response with no body
                            let response = Response::builder()
                                .status(StatusCode::OK)
                                .body(())
                                .unwrap();

                            // Send the response back to the client
                            respond.send_response(response, true)
                                .unwrap();

                            Ok(())
                        })
                    })
                    .map_err(|e| panic!("unexpected error = {:?}", e))
            });

            Ok(())
        })
    }).ok().expect("failed to run HTTP/2.0 server");
}

#2

I’ve not used h2 but I believe it works similar to crates like hyper in this regard.

let (head, mut body) = request.into_parts();

body is something that implements Stream<Item=Bytes, ...>. You’d need to use something like Stream::concat2() on it; this returns a Future that folds all the individual Bytes in the stream into one Bytes instance. This Bytes is the full body of the request. Bytes, in turn, can be viewed as a byte slice (ie &[u8]) which you can then use to parse a string from.

I’m on mobile so here is a hyper example using concat2 which is similar except it concats into a Chunk whereas h2 would concat into Bytes. You can then use the various from_utf8[_xyz] methods on str to parse a string slice out.


#3

Hi,
Thanks for the suggestion.
I tried like this but the body.concat2() part does not return and the response is not send (client connection is not closed)

let mut core = reactor::Core::new().unwrap();
let handle = core.handle();

let ip_addr = "[::]";
let port = "8080";
let addr = format!("{}:{}", ip_addr, port).parse().unwrap();
let listener = TcpListener::bind(&addr, &handle).unwrap();

core.run({
    // Accept all incoming TCP connections.
    listener.incoming().for_each(move |(socket, _)| {
        // Spawn a new task to process each connection.
        handle.spawn({
            // Start the HTTP/2.0 connection handshake
            server::handshake(socket)
                .and_then(|h2| {
                    // Accept all inbound HTTP/2.0 streams sent over the
                    // connection.
                    h2.for_each(|(request, mut respond)| {
                        println!("Processing Request==> {:?}\n", request);
                        let (head, mut body) = request.into_parts();
                        println!("head==> {:?}", head);
                        println!("body==> {:?}", body);
                        println!("body.is_end_stream==> {:?}", body.is_end_stream());
                        println!("body.poll_trailers==> {:?}", body.poll_trailers());
                  
                        body.concat2().and_then(move |chunk| {
                            let req_str: Value = serde_json::from_slice(&chunk).unwrap();
                            println!("concat req_str==> {:?}", req_str);


                            // Build a response with no body
                            let response = Response::builder()
                                .header("Content-Type", "text/html")
                                //.status(StatusCode::OK)
                                .status(StatusCode::NOT_FOUND)
                                .body(())
                                .unwrap();

                            // Send the response back to the client
                            respond.send_response(response, true)
                                .unwrap();
                            Ok(())
                        })
                    })
                })
                .map_err(|e| println!("error in handshake = {:?}", e))
        });
        Ok(())
    })
}).ok().expect("failed to run HTTP/2.0 server");
}

The Output when i run this

Processing Request==> Request { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"}, body: RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } } }

head==> Parts { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"} }
body==> RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } }
body.is_end_stream==> false
body.poll_trailers==> Ok(NotReady)

The client call using curl cmd:

curl --http2-prior-knowledge -v -X PUT "http://127.0.0.1:8080" -d @/path/to/files/smf1.json -H 'content-type: application/json'
* Rebuilt URL to: http://127.0.0.1:8080/
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
* Using HTTP2, server supports multi-use
* Connection state changed (HTTP/2 confirmed)
* Copying HTTP/2 data in stream buffer to connection buffer after upgrade: len=0
* Using Stream ID: 1 (easy handle 0x25e5fc0)
> PUT / HTTP/2
> Host: 127.0.0.1:8080
> User-Agent: curl/7.59.0
> Accept: */*
> content-type: application/json
> Content-Length: 1648
> 
* Connection state changed (MAX_CONCURRENT_STREAMS == 100)!
* We are completely uploaded and fine

#4

Try adding debug/trace level logging and see what h2 and lower layers (eg tokio, mio) are reporting.


#5

Executed with log level trace, sometimes code executes successfully but sometimes not.

**Working log:**

TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Readable Token(0)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.000s
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop poll - Duration { secs: 485, nanos: 656175912 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop time - Instant { tv_sec: 27892, tv_nsec: 670699210 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 81946 }
DEBUG 2018-05-08T14:23:17Z: h2::codec::framed_write: send; frame=Frame::Settings(Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: None, max_concurrent_streams: None, initial_window_size: None, max_frame_size: None, max_header_list_size: None })
TRACE 2018-05-08T14:23:17Z: h2::frame::settings: encoding SETTINGS; len=0
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: encoded settings; rem=9
TRACE 2018-05-08T14:23:17Z: h2::server: Handshake::poll(); state=Handshaking::Flushing(_);
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write:   -> not a queued data frame
TRACE 2018-05-08T14:23:17Z: mio::poll: registering with poller
TRACE 2018-05-08T14:23:17Z: h2::server: Handshake::poll(); flush.poll()=NotReady
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 1576324 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop time - Instant { tv_sec: 27892, tv_nsec: 672806628 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 531551 }
TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Writable Token(4194305)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.001s
TRACE 2018-05-08T14:23:17Z: h2::server: Handshake::poll(); state=Handshaking::Flushing(_);
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write:   -> not a queued data frame
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:23:17Z: h2::server: Handshake::poll(); flush.poll()=Ready
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 1451213 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop time - Instant { tv_sec: 27892, tv_nsec: 674860668 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 165525 }
TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Readable | Writable Token(4194305)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:23:17Z: h2::server: Handshake::poll(); state=Handshaking::ReadingPreface(_);
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::flow_control: inc_window; sz=65535; old=0; new=65535
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::flow_control: inc_window; sz=65535; old=0; new=65535
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: Prioritize::new; flow=FlowControl { window_size: Window(65535), available: Window(65535) }
TRACE 2018-05-08T14:23:17Z: h2::server: Handshake::poll(); connection established!
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: poll_complete
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: schedule_pending_open
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: pop_frame
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 2611730 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop time - Instant { tv_sec: 27892, tv_nsec: 677683802 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 54145 }
TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Readable | Writable Token(4194305)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: frame decoded from buffer
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll; bytes=27B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: decoding frame from 27B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read:     -> kind=Settings
DEBUG 2018-05-08T14:23:17Z: h2::codec::framed_read: received; frame=Frame::Settings(Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: Some(0), max_concurrent_streams: Some(100), initial_window_size: Some(1073741824), max_frame_size: None, max_header_list_size: None })
TRACE 2018-05-08T14:23:17Z: h2::proto::connection: recv SETTINGS; frame=Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: Some(0), max_concurrent_streams: Some(100), initial_window_size: Some(1073741824), max_frame_size: None, max_header_list_size: None }
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=Some(Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: Some(0), max_concurrent_streams: Some(100), initial_window_size: Some(1073741824), max_frame_size: None, max_header_list_size: None })
DEBUG 2018-05-08T14:23:17Z: h2::codec::framed_write: send; frame=Frame::Settings(Settings { flags: SettingsFlags(1), header_table_size: None, enable_push: None, max_concurrent_streams: None, initial_window_size: None, max_frame_size: None, max_header_list_size: None })
TRACE 2018-05-08T14:23:17Z: h2::frame::settings: encoding SETTINGS; len=0
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: encoded settings; rem=9
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: ACK sent; applying settings
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: poll_complete
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: schedule_pending_open
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: pop_frame
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write:   -> not a queued data frame
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 2456198 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop time - Instant { tv_sec: 27892, tv_nsec: 680238220 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 269567 }
TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Readable | Writable Token(4194305)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: frame decoded from buffer
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll; bytes=62B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: decoding frame from 62B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read:     -> kind=Headers
TRACE 2018-05-08T14:23:17Z: h2::frame::headers: loading headers; flags=HeadersFlag { end_stream: false, end_headers: true, padded: false, priority: false }
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder: decode
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     LiteralWithIndexing; rem=53
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     Indexed; rem=48
TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Readable | Writable Token(4194305)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Readable | Writable Token(4194305)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     Indexed; rem=47
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     LiteralWithIndexing; rem=46
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     LiteralWithIndexing; rem=34
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     LiteralWithIndexing; rem=24
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     LiteralWithIndexing; rem=19
TRACE 2018-05-08T14:23:17Z: h2::hpack::decoder:     LiteralWithoutIndexing; rem=6
DEBUG 2018-05-08T14:23:17Z: h2::codec::framed_read: received; frame=Frame::Headers(Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: false, end_headers: true, padded: false, priority: false } })
TRACE 2018-05-08T14:23:17Z: h2::proto::connection: recv HEADERS; frame=Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: false, end_headers: true, padded: false, priority: false } }
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::flow_control: inc_window; sz=65535; old=0; new=65535
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::flow_control: inc_window; sz=1073741824; old=0; new=1073741824
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::streams: recv_headers; stream=StreamId(1); state=State { inner: Idle }
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::recv: opening stream; init_window=65535
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::store: Queue::push
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::store:  -> first entry
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: frame decoded from buffer
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll; bytes=9B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: decoding frame from 9B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read:     -> kind=Settings
DEBUG 2018-05-08T14:23:17Z: h2::codec::framed_read: received; frame=Frame::Settings(Settings { flags: SettingsFlags(1), header_table_size: None, enable_push: None, max_concurrent_streams: None, initial_window_size: None, max_frame_size: None, max_header_list_size: None })
TRACE 2018-05-08T14:23:17Z: h2::proto::connection: recv SETTINGS; frame=Settings { flags: SettingsFlags(1), header_table_size: None, enable_push: None, max_concurrent_streams: None, initial_window_size: None, max_frame_size: None, max_header_list_size: None }
DEBUG 2018-05-08T14:23:17Z: h2::proto::settings: received remote settings ack
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: frame decoded from buffer
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll; bytes=1657B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: decoding frame from 1657B
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read:     -> kind=Data
DEBUG 2018-05-08T14:23:17Z: h2::codec::framed_read: received; frame=Frame::Data(Data { stream_id: StreamId(1), flags: DataFlags { end_stream: true }, pad_len: None })
TRACE 2018-05-08T14:23:17Z: h2::proto::connection: recv DATA; frame=Data { stream_id: StreamId(1), flags: DataFlags { end_stream: true }, pad_len: None }
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::recv: recv_data; size=1648; connection=65535; stream=65535
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::flow_control: send_data; sz=1648; window=65535; available=65535
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::flow_control: send_data; sz=1648; window=65535; available=65535
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::state: recv_close: Open => HalfClosedRemote(AwaitingHeaders)
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: poll_complete
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: schedule_pending_open
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: pop_frame
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::streams: next_incoming; id=StreamId(1), state=State { inner: HalfClosedRemote(AwaitingHeaders) }
TRACE 2018-05-08T14:23:17Z: h2::server: received incoming
Processing Request==> Request { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"}, body: RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } } }

TRACE 2018-05-08T14:23:17Z: nnrf: logging Processing Request==> Request { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"}, body: RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } } }

head==> Parts { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"} }
body==> RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } }
body.is_end_stream==> false
body.poll_trailers==> Ok(NotReady)
concat req_str==> Object({"capacity": Number(1000), "fqdn": String("smf1.service"), "ipAddress": String("192.168.0.243"), "nfInstanceID": String("smf1"), "nfServiceList": Array([Object({"allowedNfTypes": Array([String("AMF")]), "allowedNssais": Array([Object({"sd": Number(34), "sst": Number(12)})]), "allowedPlmns": Array([String("50502"), String("50503")]), "apiPrefix": String(""), "callbackUri": String("http://192.168.0.243/nsmf-pdu-session"), "fqdn": String("nsmf-pdu-session.service"), "ipAddress": String("192.168.0.249"), "port": Number(7080), "schema": String("1"), "serviceInstanceID": String("nsmf-pdu-session1"), "serviceName": String("Nsmf_PDUSession"), "version": String("1")}), Object({"allowedNfTypes": Array([String("AMF"), String("PCF"), String("UPF")]), "allowedNssais": Array([Object({"sd": Number(34), "sst": Number(12)})]), "allowedPlmns": Array([String("50502"), String("50503")]), "apiPrefix": String(""), "callbackUri": String("http://192.168.0.243/nsmf-pdu-session"), "fqdn": String("nsmf-event-exposure.service"), "ipAddress": String("192.168.0.249"), "port": Number(7080), "schema": String("1"), "serviceInstanceID": String("nsmf-event-exposure1"), "serviceName": String("Nsmf_EventExposure"), "version": String("1")})]), "nfType": String("SMF"), "plmn": String("50502"), "sNssai": Array([Object({"sd": Number(34), "sst": Number(12)})])})
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::recv: release_capacity; size=1648
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::recv: release_connection_capacity; size=1648
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::send: send_headers; frame=Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: true, end_headers: true, padded: false, priority: false } }; init_window=1073741824
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: schedule_send; StreamId(1)
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::store: Queue::push
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::store:  -> first entry
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: poll_complete
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: schedule_pending_open
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: pop_frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: pop_frame; stream=StreamId(1); stream.state=State { inner: Closed(EndStream) }
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize:  --> stream=StreamId(1); is_pending_reset=false;
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: pop_frame; frame=Frame::Headers(Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: true, end_headers: true, padded: false, priority: false } })
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: writing frame=Frame::Headers(Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: true, end_headers: true, padded: false, priority: false } })
DEBUG 2018-05-08T14:23:17Z: h2::codec::framed_write: send; frame=Frame::Headers(Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: true, end_headers: true, padded: false, priority: false } })
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: schedule_pending_open
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: pop_frame
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write:   -> not a queued data frame
TRACE 2018-05-08T14:23:17Z: tokio_reactor: event Readable | Writable | Hup Token(4194305)
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:23:17Z: h2::proto::streams::prioritize: try reclaim frame
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 30692248 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop time - Instant { tv_sec: 27892, tv_nsec: 711248176 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 185734 }
TRACE 2018-05-08T14:23:17Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:23:17Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:23:17Z: h2::proto::connection: codec closed
TRACE 2018-05-08T14:23:17Z: mio::poll: deregistering handle with poller
DEBUG 2018-05-08T14:23:17Z: tokio_reactor: dropping I/O source: 1
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 465701 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop time - Instant { tv_sec: 27892, tv_nsec: 711993078 }
DEBUG 2018-05-08T14:23:17Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 140690 }

================================================================================

**Not Working log:**

TRACE 2018-05-08T14:24:30Z: tokio_reactor: event Readable Token(0)
DEBUG 2018-05-08T14:24:30Z: tokio_reactor: loop process - 1 events, 0.000s
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop poll - Duration { secs: 73, nanos: 332091391 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop time - Instant { tv_sec: 27966, tv_nsec: 44299584 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 136032 }
DEBUG 2018-05-08T14:24:30Z: h2::codec::framed_write: send; frame=Frame::Settings(Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: None, max_concurrent_streams: None, initial_window_size: None, max_frame_size: None, max_header_list_size: None })
TRACE 2018-05-08T14:24:30Z: h2::frame::settings: encoding SETTINGS; len=0
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: encoded settings; rem=9
TRACE 2018-05-08T14:24:30Z: h2::server: Handshake::poll(); state=Handshaking::Flushing(_);
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write:   -> not a queued data frame
TRACE 2018-05-08T14:24:30Z: mio::poll: registering with poller
TRACE 2018-05-08T14:24:30Z: h2::server: Handshake::poll(); flush.poll()=NotReady
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 742094 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop time - Instant { tv_sec: 27966, tv_nsec: 45554366 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 53762 }
TRACE 2018-05-08T14:24:30Z: tokio_reactor: event Readable | Writable Token(8388609)
DEBUG 2018-05-08T14:24:30Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:24:30Z: h2::server: Handshake::poll(); state=Handshaking::Flushing(_);
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write:   -> not a queued data frame
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:24:30Z: h2::server: Handshake::poll(); flush.poll()=Ready
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::flow_control: inc_window; sz=65535; old=0; new=65535
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::flow_control: inc_window; sz=65535; old=0; new=65535
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: Prioritize::new; flow=FlowControl { window_size: Window(65535), available: Window(65535) }
TRACE 2018-05-08T14:24:30Z: h2::server: Handshake::poll(); connection established!
TRACE 2018-05-08T14:24:30Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:24:30Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:24:30Z: tokio_io::framed_read: frame decoded from buffer
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: poll; bytes=27B
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: decoding frame from 27B
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read:     -> kind=Settings
DEBUG 2018-05-08T14:24:30Z: h2::codec::framed_read: received; frame=Frame::Settings(Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: Some(0), max_concurrent_streams: Some(100), initial_window_size: Some(1073741824), max_frame_size: None, max_header_list_size: None })
TRACE 2018-05-08T14:24:30Z: h2::proto::connection: recv SETTINGS; frame=Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: Some(0), max_concurrent_streams: Some(100), initial_window_size: Some(1073741824), max_frame_size: None, max_header_list_size: None }
TRACE 2018-05-08T14:24:30Z: h2::proto::settings: send_pending_ack; pending=Some(Settings { flags: SettingsFlags(0), header_table_size: None, enable_push: Some(0), max_concurrent_streams: Some(100), initial_window_size: Some(1073741824), max_frame_size: None, max_header_list_size: None })
DEBUG 2018-05-08T14:24:30Z: h2::codec::framed_write: send; frame=Frame::Settings(Settings { flags: SettingsFlags(1), header_table_size: None, enable_push: None, max_concurrent_streams: None, initial_window_size: None, max_frame_size: None, max_header_list_size: None })
TRACE 2018-05-08T14:24:30Z: h2::frame::settings: encoding SETTINGS; len=0
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: encoded settings; rem=9
TRACE 2018-05-08T14:24:30Z: h2::proto::settings: ACK sent; applying settings
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:24:30Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: poll_complete
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: schedule_pending_open
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: pop_frame
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write:   -> not a queued data frame
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: try reclaim frame
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 1041592 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop time - Instant { tv_sec: 27966, tv_nsec: 46683717 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 49985 }
TRACE 2018-05-08T14:24:30Z: tokio_reactor: event Readable | Writable Token(8388609)
DEBUG 2018-05-08T14:24:30Z: tokio_reactor: loop process - 1 events, 0.000s
TRACE 2018-05-08T14:24:30Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:24:30Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:24:30Z: tokio_io::framed_read: frame decoded from buffer
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: poll; bytes=62B
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: decoding frame from 62B
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read:     -> kind=Headers
TRACE 2018-05-08T14:24:30Z: h2::frame::headers: loading headers; flags=HeadersFlag { end_stream: false, end_headers: true, padded: false, priority: false }
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder: decode
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     LiteralWithIndexing; rem=53
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     Indexed; rem=48
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     Indexed; rem=47
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     LiteralWithIndexing; rem=46
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     LiteralWithIndexing; rem=34
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     LiteralWithIndexing; rem=24
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     LiteralWithIndexing; rem=19
TRACE 2018-05-08T14:24:30Z: h2::hpack::decoder:     LiteralWithoutIndexing; rem=6
DEBUG 2018-05-08T14:24:30Z: h2::codec::framed_read: received; frame=Frame::Headers(Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: false, end_headers: true, padded: false, priority: false } })
TRACE 2018-05-08T14:24:30Z: h2::proto::connection: recv HEADERS; frame=Headers { stream_id: StreamId(1), stream_dep: None, flags: HeadersFlag { end_stream: false, end_headers: true, padded: false, priority: false } }
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::flow_control: inc_window; sz=65535; old=0; new=65535
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::flow_control: inc_window; sz=1073741824; old=0; new=1073741824
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::streams: recv_headers; stream=StreamId(1); state=State { inner: Idle }
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::recv: opening stream; init_window=65535
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::store: Queue::push
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::store:  -> first entry
TRACE 2018-05-08T14:24:30Z: h2::proto::settings: send_pending_ack; pending=None
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_read: poll
TRACE 2018-05-08T14:24:30Z: tokio_io::framed_read: attempting to decode a frame
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: poll_complete
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: schedule_pending_open
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: pop_frame
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: flush
TRACE 2018-05-08T14:24:30Z: h2::codec::framed_write: flushing buffer
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::prioritize: try reclaim frame
TRACE 2018-05-08T14:24:30Z: h2::proto::streams::streams: next_incoming; id=StreamId(1), state=State { inner: Open { local: AwaitingHeaders, remote: Streaming } }
TRACE 2018-05-08T14:24:30Z: h2::server: received incoming
Processing Request==> Request { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"}, body: RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } } }

TRACE 2018-05-08T14:24:30Z: nnrf: logging Processing Request==> Request { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"}, body: RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } } }

head==> Parts { method: PUT, uri: http://127.0.0.1:8080/, version: HTTP/2.0, headers: {"user-agent": "curl/7.59.0", "accept": "*/*", "content-type": "application/json", "content-length": "1648"} }
body==> RecvStream { inner: ReleaseCapacity { inner: OpaqueStreamRef { stream_id: StreamId(1), ref_count: 2 } } }
body.is_end_stream==> false
body.poll_trailers==> Ok(NotReady)
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 1978036 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop time - Instant { tv_sec: 27966, tv_nsec: 48743577 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 66267 }
TRACE 2018-05-08T14:24:30Z: tokio_reactor: event Readable | Writable Token(8388609)
DEBUG 2018-05-08T14:24:30Z: tokio_reactor: loop process - 1 events, 0.000s
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop poll - Duration { secs: 0, nanos: 1004201 }
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop time - Instant { tv_sec: 27966, tv_nsec: 49955265 }
TRACE 2018-05-08T14:24:30Z: tokio_reactor: event Readable | Writable Token(8388609)
DEBUG 2018-05-08T14:24:30Z: tokio_reactor: loop process - 1 events, 0.000s
DEBUG 2018-05-08T14:24:30Z: tokio_core::reactor: loop process, Duration { secs: 0, nanos: 159609 }




Ctrl+c (curl cmd)

TRACE 2018-05-08T14:25:12Z: tokio_reactor: event Readable | Writable | Hup Token(8388609)
DEBUG 2018-05-08T14:25:12Z: tokio_reactor: loop process - 1 events, 0.000s

#6

Strange, but I’m not an h2 expert unfortunately. @carllerche visits here occasionally - maybe he’ll have an idea. If not, you can try asking on their github.


#7

HTTP/2.0 has flow control built in. This means that the remote will not send more data until you tell it to.

See: https://docs.rs/h2/0.1.6/h2/struct.RecvStream.html and https://docs.rs/h2/0.1.6/h2/struct.ReleaseCapacity.html

The h2 crate is intended to be lower level and exposes all the knobs of H2. Hyper provides HTTP/2.0 support (via the h2 crate) and handles flow control for you.


#8

@carllerche, I thought it was flow control related as well but the amount of data being sent here is way below the window size. It’s also odd that it appears to work some times but not always with the (what looks like) same inputs/requests.