This exception occurs when using multithreading

This exception occurs when using multithreading

    async fn read_socket(&self, target: &mut TcpStream) -> Result<Frame, SocketDError> {
        let _ = target.readable().await;
        let mut buf = BytesMut::with_capacity(4);
        let _= target.read(&mut buf).await;
        let len = buf.get_u32();
        if len == 0 {
            return  Err(SocketDError::Channel("read 0 bytes"));
        }
        let mut byte_buff = BytesMut::with_capacity(len as usize);
        byte_buff.put_u32(len);
        let buff_size = self.config.get_read_buffer_size() as usize;
        let mut read_buff = BytesMut::with_capacity(buff_size);
        loop {
            if read_buff.capacity().eq(&0) {
                read_buff.clear();
            }
            if target.try_read(&mut read_buff).unwrap() > 0 {
                byte_buff.put(&mut buf);
            } else {
                break;
            }
        }
        read_buff.clear();
        Coder::read(byte_buff)
    }
error: future cannot be sent between threads safely
  --> src\socketd\transport\core\channel.rs:61:88
   |
61 |       async fn read_socket(&self, target: &mut TcpStream) -> Result<Frame, SocketDError> {
   |  ________________________________________________________________________________________^
62 | |         let _ = target.readable().await;
63 | |         let mut buf = BytesMut::with_capacity(4);
64 | |         let _= target.read(&mut buf).await;
...  |
84 | |         Coder::read(byte_buff)
85 | |     }
   | |_____^ future created by async block is not `Send`
   |
   = help: the trait `Sync` is not implemented for `(dyn Config + 'static)`
note: captured value is not `Send` because `&` references cannot be sent unless their referent is `Sync`
  --> src\socketd\transport\core\channel.rs:61:27
   |
61 |     async fn read_socket(&self, target: &mut TcpStream) -> Result<Frame, SocketDError> {
   |                           ^^^^ has type `&DefaultChannelAssistant` which is not `Send`, because `DefaultChannelAssistant` is not `Sync`
   = note: required for the cast from `Pin<Box<{async block@src\socketd\transport\core\channel.rs:61:88: 85:6}>>` to `Pin<Box<(dyn Future<Output = Result<domain::Frame, SocketDError>> + Send + 'async_trait)>>`

Looks to me like your DefaultChannelAssistant isn't Sync.

Can you add a + Send + Sync bound to your dyn Config + 'static? Is that the type of one of DefaultChannelAssistant's fields?

pub trait Config {
    /**
     *  是否客户端模式
     */
    fn client_mode(&self) -> bool;

    /**
     *  获取流管理器
     */
    fn get_stream_manger(&self) -> bool;

    /**
     *  获取角色名
     */
    fn get_role_name(&self) -> String;

    /**
     * 获取字符集
     */
    fn get_charset(&self) -> &str;

    /**
     * 获取模式
     */
    fn get_schema(&self) -> &str;

    /**
     * 获取地址
     */
    fn get_address(&self) -> &str;

    /**
     * 获取写缓冲大小
     */
    fn get_pwrite_buffer_size(&self) -> i32;

    /**
     * 获取读缓冲大小
     */
    fn get_read_buffer_size(&self) -> i32;
    /**
     * 获取连接超时时间
     */
    fn get_connect_timeout(&self) -> i64;
}

/**
 * Id 生成器
 */
pub trait IdGenerator {
    /**
     * 生成
     */
    fn generate() -> String;
}

#[derive(Debug, Clone)]
pub struct ClientConfig {
    /// 是否客户端模式
   pub client_mode: bool,
    /// 流管理器
    pub stream_manger: bool,
    // /// 编解码器
    // codec:bool,
    // /// id生成器
    // id_generator: dyn IdGenerator,
    // //分片处理
    // fragment_handler:String,
    // //分片大小
    pub fragment_size: i32,
    // //ssl 上下文
    // ssl_context:String,
    //
    // //字符集
    // charset:String,
    //
    // //内核线程数
    // core_threads:u32,
    // //最大线程数
    // max_threads:u32,
    //
    /// 读缓冲大小
    pub read_buffer_size: i32,
    /// 写缓冲大小
    pub pwrite_buffer_size: i32,
    //
    // //连接空闲超时
    pub idle_timeout: i32,
    // //请求默认超时
    pub request_timeout: i32,
    // //消息流超时(从发起到应答结束)
    // stream_timeout:u64,
    // //最大udp包大小
    // max_udp_size:u32,
    // //通讯架构(tcp, ws, udp)
    // schema:String,
    // //连接地址
    // link_url:String,
    // url:String,
    // uri:String,
    // port:u16,
    //
    /// 心跳间隔(毫秒)
    pub heartbeat_interval: i64,
    /// 连接越时(毫秒)
    pub connect_timeout: i64,
    /// 是否自动重链
    pub auto_reconnect: bool,
    /** 通讯架构 */
    pub schema: String,
    /** 连接地址 */
    pub link_url: String,
    /** 连接地址 */
    pub url: String,
    /** 协议地址 */
    pub address: String,
}
pub struct DefaultChannelAssistant {
    config: Box<dyn Config>,
}

impl DefaultChannelAssistant {
    pub fn init(config: impl Config)->Self{
        DefaultChannelAssistant {
            config: Box::new(config),
        }
    }
}

I'm unable to reproduce the error from your code snippets. Could you provide a minimal reproducible example for us, please?

I'll reorganize it. Thank you very much

use crate::socketd::transport::core::constants::MAX_SIZE_DATA;

pub trait Config {
    /**
     *  是否客户端模式
     */
    fn client_mode(&self) -> bool;

    /**
     *  获取流管理器
     */
    fn get_stream_manger(&self) -> bool;

    /**
     *  获取角色名
     */
    fn get_role_name(&self) -> String;

    /**
     * 获取字符集
     */
    fn get_charset(&self) -> &str;

    /**
     * 获取模式
     */
    fn get_schema(&self) -> &str;

    /**
     * 获取地址
     */
    fn get_address(&self) -> &str;

    /**
     * 获取写缓冲大小
     */
    fn get_pwrite_buffer_size(&self) -> i32;

    /**
     * 获取读缓冲大小
     */
    fn get_read_buffer_size(&self) -> i32;
    /**
     * 获取连接超时时间
     */
    fn get_connect_timeout(&self) -> i64;
}

/**
 * Id 生成器
 */
pub trait IdGenerator {
    /**
     * 生成
     */
    fn generate() -> String;
}

#[derive(Debug, Clone)]
pub struct ClientConfig {
    /// 是否客户端模式
   pub client_mode: bool,
    /// 流管理器
    pub stream_manger: bool,
    // /// 编解码器
    // codec:bool,
    // /// id生成器
    // id_generator: dyn IdGenerator,
    // //分片处理
    // fragment_handler:String,
    // //分片大小
    pub fragment_size: i32,
    // //ssl 上下文
    // ssl_context:String,
    //
    // //字符集
    // charset:String,
    //
    // //内核线程数
    // core_threads:u32,
    // //最大线程数
    // max_threads:u32,
    //
    /// 读缓冲大小
    pub read_buffer_size: i32,
    /// 写缓冲大小
    pub pwrite_buffer_size: i32,
    //
    // //连接空闲超时
    pub idle_timeout: i32,
    // //请求默认超时
    pub request_timeout: i32,
    // //消息流超时(从发起到应答结束)
    // stream_timeout:u64,
    // //最大udp包大小
    // max_udp_size:u32,
    // //通讯架构(tcp, ws, udp)
    // schema:String,
    // //连接地址
    // link_url:String,
    // url:String,
    // uri:String,
    // port:u16,
    //
    /// 心跳间隔(毫秒)
    pub heartbeat_interval: i64,
    /// 连接越时(毫秒)
    pub connect_timeout: i64,
    /// 是否自动重链
    pub auto_reconnect: bool,
    /** 通讯架构 */
    pub schema: String,
    /** 连接地址 */
    pub link_url: String,
    /** 连接地址 */
    pub url: String,
    /** 协议地址 */
    pub address: String,
}

impl ClientConfig {
    pub fn new_server_url(server_url: &str) -> Self {
        // 校验合法性
        let sd_idx = server_url.find("sd").expect("No socketd client providers were found.");
        let idx = match server_url.to_string().find("://") {
            None => { panic!("The serverUrl invalid: {0}", server_url) }
            Some(idx) => { idx }
        };
        if idx < 2 {
            panic!("The serverUrl invalid: {0}", server_url);
        }
        ClientConfig {
            client_mode: true,
            stream_manger: false,
            fragment_size: MAX_SIZE_DATA,
            read_buffer_size: 512,
            pwrite_buffer_size: 512,
            idle_timeout: 60_000,//60秒(心跳默认为20秒)
            request_timeout: 10_000,//10秒(默认与连接超时同)
            heartbeat_interval: 20_000,
            connect_timeout: 10_000,
            auto_reconnect: false,
            schema: server_url[sd_idx + 3..idx].to_string(),
            link_url: server_url.to_string(),
            url: server_url[sd_idx + 3..].to_string(),
            address: server_url[idx + 3..].to_string(),
        }
    }
}

impl Default for ClientConfig {
    fn default() -> Self {
        ClientConfig {
            client_mode: true,
            stream_manger: false,
            fragment_size: 0,
            read_buffer_size: 0,
            pwrite_buffer_size: 0,
            idle_timeout: 0,
            request_timeout: 0,
            heartbeat_interval: 0,
            connect_timeout: 0,
            auto_reconnect: false,
            schema: "".to_string(),
            link_url: "".to_string(),
            url: "".to_string(),
            address: "".to_string(),
        }
    }
}


impl Config for ClientConfig {
    fn client_mode(&self) -> bool {
        self.client_mode
    }

    fn get_stream_manger(&self) -> bool {
        self.stream_manger
    }

    fn get_role_name(&self) -> String {
        match self.client_mode() {
            true => { String::from("Client") }
            false => { String::from("Server") }
        }
    }

    fn get_charset(&self) -> &str {
        "demo"
    }

    fn get_schema(&self) -> &str {
        &self.schema
    }

    fn get_address(&self) -> &str {
        &self.address
    }
    fn get_pwrite_buffer_size(&self) -> i32 {
        self.pwrite_buffer_size
    }

    fn get_read_buffer_size(&self) -> i32 {
        self.read_buffer_size
    }

    fn get_connect_timeout(&self) -> i64 {
        self.connect_timeout
    }
}

use crate::socketd::transport::core::error::SocketDError;
use crate::socketd::transport::core::Frame;
use bytes::BytesMut;

pub struct Coder {}

impl Coder {
    /// <code lang="cn"> 编码器 </<code>
    /// <code lang="en"> encoder </<code>
    ///
    /// <code lang="cn">用于将Tcp流转化为Frame</code>
    /// <code lang="en">Used to convert Tcp stream to Frame</code>
    ///
    /// # Examples
    /// ```
    /// use bytes::BytesMut;
    /// use socketd_rust::socketd::transport::core::codec::Coder;
    /// let  read = Coder::read(BytesMut::new());
    /// assert!(read.is_err());
    /// ```
    pub fn read(mut source: BytesMut) -> Result<Frame, SocketDError> {
        let frame_size = match source.first() {
            None => {0i32}
            Some(i) => *i
        };
        if frame_size > (source.len() + 4) as i32 {
            return Err(SocketDError::Channel("Illegal frame stack"));
        }
        let flag = source.get(2).expect("flag is null");
        if frame_size.eq(&8) {
            //len[int] + flag[int]
            // Fream结构与我的不一致需要重新改一下
            let mut frame = Frame::default();
            frame.flag= *flag as i32;
            Ok(frame)
        } else {
            todo!("需要对frame结构体进行组装")
        }
    }
}

use async_trait::async_trait;
use bytes::{Buf, BufMut, BytesMut};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use crate::socketd::transport::core::codec::Coder;
use crate::socketd::transport::core::config::Config;
use crate::socketd::transport::core::error::SocketDError;
use crate::socketd::transport::core::Frame;

pub trait Channel {}


pub struct ChannelDefault {}

/**
 * 通道助理
 *
 * @return 通道
 */
#[async_trait]
pub trait ChannelAssistant<T> {

    /**
     * 读取
     *
     * @param target 目标
     * @param frame  帧
     */
    async fn read_socket(&self, target: &mut T) -> Result<Frame, SocketDError>;
}


pub struct DefaultChannelAssistant {
    config: Box<dyn Config>,
}

impl DefaultChannelAssistant {
    pub fn init(config: impl Config)->Self{
        DefaultChannelAssistant {
            config: Box::new(config),
        }
    }
}

#[async_trait]
impl ChannelAssistant<TcpStream> for DefaultChannelAssistant {


    /// <code lang="cn"> 从tcpStream中读取流并解析为Frame结构体 </<code>
    /// <code lang="en"> Read the stream from tcpStream and parse it into a Frame structure </<code>
    ///
    /// # Examples
    /// ```
    /// use tokio::net::TcpStream;
    /// use socketd_rust::socketd::transport::core::channel::DefaultChannelAssistant;
    /// use socketd_rust::socketd::transport::core::config::{ClientConfig, Config};
    /// let config=ClientConfig::new_server_url("sd:tcp://127.0.0.1:8602");
    /// let channel_assistant=DefaultChannelAssistant::init(config);
    /// let stream = TcpStream::connect(config.get_address()).await.unwrap();
    /// ```
    async fn read_socket(&self, target: &mut TcpStream) -> Result<Frame, SocketDError> {
        let _ = target.readable().await;
        let mut buf = BytesMut::with_capacity(4);
        let _= target.read(&mut buf).await;
        let len = buf.get_u32();
        if len == 0 {
            return  Err(SocketDError::Channel("read 0 bytes"));
        }
        let mut byte_buff = BytesMut::with_capacity(len as usize);
        byte_buff.put_u32(len);
        let buff_size = self.config.get_read_buffer_size() as usize;
        let mut read_buff = BytesMut::with_capacity(buff_size);
        loop {
            if read_buff.capacity().eq(&0) {
                read_buff.clear();
            }
            if target.try_read(&mut read_buff).unwrap() > 0 {
                byte_buff.put(&mut buf);
            } else {
                break;
            }
        }
        read_buff.clear();
        Coder::read(byte_buff)
    }
}

#[cfg(test)]
mod tests {
    use tokio::net::TcpStream;
    use crate::socketd::transport::core::channel::{ChannelAssistant, DefaultChannelAssistant};
    use crate::socketd::transport::core::config::{ClientConfig, Config};

    #[tokio::test]
    async fn test_read_socket() {
        let config=ClientConfig::new_server_url("sd:tcp://127.0.0.1:8602");
        let channel_assistant=DefaultChannelAssistant::init(config);
        let mut stream = TcpStream::connect(config.get_address()).await.unwrap();
        let r=channel_assistant.read_socket(&mut stream).await;
        assert!(r.is_err())
    }
}

Here's a playground that compiles. As I expected, adding Send + Sync bounds to dyn Config solved your compile error:

pub struct DefaultChannelAssistant {
    config: Box<dyn Config + Send + Sync + 'static>,
}

impl DefaultChannelAssistant {
    pub fn init(config: impl Config + Send + Sync + 'static) -> Self {
        DefaultChannelAssistant {
            config: Box::new(config),
        }
    }
}

But my config does not implement the traits of send and sync, how do they need to implement them,I saw that you used static, but the configuration here is controlled uniformly from somewhere, and I don't want to obtain ownership here. What should I do

As long as your type doesn't contain fields of types that don't implement Send or Sync (written as !Send and !Sync), it will be Send and Sync, too. Send and Sync are so called auto traits, because they are automatically implemented for your type (if all fields recursively implement that trait, too). !Send and !Sync types are Rc and anything that contains an UnsafeCell, like Cell, RefCell and such. Since your ClientConfig doesn't contain any of these, it is Send and Sync as well. Read more about it here:

Send and Sync are also automatically derived traits. This means that, unlike every other trait, if a type is composed entirely of Send or Sync types, then it is Send or Sync. Almost all primitives are Send and Sync, and as a consequence pretty much all types you'll ever interact with are Send and Sync.

2 Likes

When working with async, you pretty much have to. It is impossible to track lifetimes across futures as they might be executed after something captured by reference in the future is dropped. If you are worried about the cost of cloning you can wrap your shared ClientConfig in an Arc. If you need to mutate the config, you can wrap it in an Arc<Mutex<ClientConfig>> or Arc<RwLock<ClientConfig>> or such.

Okay, I'm trying to fix it

I want to know how to use asynchronous code in the # Examples section of document comments

Wrap it in

#[tokio::main]
async fn main() {

}

thanks

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.