I'll reorganize it. Thank you very much
use crate::socketd::transport::core::constants::MAX_SIZE_DATA;
pub trait Config {
/**
* 是否客户端模式
*/
fn client_mode(&self) -> bool;
/**
* 获取流管理器
*/
fn get_stream_manger(&self) -> bool;
/**
* 获取角色名
*/
fn get_role_name(&self) -> String;
/**
* 获取字符集
*/
fn get_charset(&self) -> &str;
/**
* 获取模式
*/
fn get_schema(&self) -> &str;
/**
* 获取地址
*/
fn get_address(&self) -> &str;
/**
* 获取写缓冲大小
*/
fn get_pwrite_buffer_size(&self) -> i32;
/**
* 获取读缓冲大小
*/
fn get_read_buffer_size(&self) -> i32;
/**
* 获取连接超时时间
*/
fn get_connect_timeout(&self) -> i64;
}
/**
* Id 生成器
*/
pub trait IdGenerator {
/**
* 生成
*/
fn generate() -> String;
}
#[derive(Debug, Clone)]
pub struct ClientConfig {
/// 是否客户端模式
pub client_mode: bool,
/// 流管理器
pub stream_manger: bool,
// /// 编解码器
// codec:bool,
// /// id生成器
// id_generator: dyn IdGenerator,
// //分片处理
// fragment_handler:String,
// //分片大小
pub fragment_size: i32,
// //ssl 上下文
// ssl_context:String,
//
// //字符集
// charset:String,
//
// //内核线程数
// core_threads:u32,
// //最大线程数
// max_threads:u32,
//
/// 读缓冲大小
pub read_buffer_size: i32,
/// 写缓冲大小
pub pwrite_buffer_size: i32,
//
// //连接空闲超时
pub idle_timeout: i32,
// //请求默认超时
pub request_timeout: i32,
// //消息流超时(从发起到应答结束)
// stream_timeout:u64,
// //最大udp包大小
// max_udp_size:u32,
// //通讯架构(tcp, ws, udp)
// schema:String,
// //连接地址
// link_url:String,
// url:String,
// uri:String,
// port:u16,
//
/// 心跳间隔(毫秒)
pub heartbeat_interval: i64,
/// 连接越时(毫秒)
pub connect_timeout: i64,
/// 是否自动重链
pub auto_reconnect: bool,
/** 通讯架构 */
pub schema: String,
/** 连接地址 */
pub link_url: String,
/** 连接地址 */
pub url: String,
/** 协议地址 */
pub address: String,
}
impl ClientConfig {
pub fn new_server_url(server_url: &str) -> Self {
// 校验合法性
let sd_idx = server_url.find("sd").expect("No socketd client providers were found.");
let idx = match server_url.to_string().find("://") {
None => { panic!("The serverUrl invalid: {0}", server_url) }
Some(idx) => { idx }
};
if idx < 2 {
panic!("The serverUrl invalid: {0}", server_url);
}
ClientConfig {
client_mode: true,
stream_manger: false,
fragment_size: MAX_SIZE_DATA,
read_buffer_size: 512,
pwrite_buffer_size: 512,
idle_timeout: 60_000,//60秒(心跳默认为20秒)
request_timeout: 10_000,//10秒(默认与连接超时同)
heartbeat_interval: 20_000,
connect_timeout: 10_000,
auto_reconnect: false,
schema: server_url[sd_idx + 3..idx].to_string(),
link_url: server_url.to_string(),
url: server_url[sd_idx + 3..].to_string(),
address: server_url[idx + 3..].to_string(),
}
}
}
impl Default for ClientConfig {
fn default() -> Self {
ClientConfig {
client_mode: true,
stream_manger: false,
fragment_size: 0,
read_buffer_size: 0,
pwrite_buffer_size: 0,
idle_timeout: 0,
request_timeout: 0,
heartbeat_interval: 0,
connect_timeout: 0,
auto_reconnect: false,
schema: "".to_string(),
link_url: "".to_string(),
url: "".to_string(),
address: "".to_string(),
}
}
}
impl Config for ClientConfig {
fn client_mode(&self) -> bool {
self.client_mode
}
fn get_stream_manger(&self) -> bool {
self.stream_manger
}
fn get_role_name(&self) -> String {
match self.client_mode() {
true => { String::from("Client") }
false => { String::from("Server") }
}
}
fn get_charset(&self) -> &str {
"demo"
}
fn get_schema(&self) -> &str {
&self.schema
}
fn get_address(&self) -> &str {
&self.address
}
fn get_pwrite_buffer_size(&self) -> i32 {
self.pwrite_buffer_size
}
fn get_read_buffer_size(&self) -> i32 {
self.read_buffer_size
}
fn get_connect_timeout(&self) -> i64 {
self.connect_timeout
}
}
use crate::socketd::transport::core::error::SocketDError;
use crate::socketd::transport::core::Frame;
use bytes::BytesMut;
pub struct Coder {}
impl Coder {
/// <code lang="cn"> 编码器 </<code>
/// <code lang="en"> encoder </<code>
///
/// <code lang="cn">用于将Tcp流转化为Frame</code>
/// <code lang="en">Used to convert Tcp stream to Frame</code>
///
/// # Examples
/// ```
/// use bytes::BytesMut;
/// use socketd_rust::socketd::transport::core::codec::Coder;
/// let read = Coder::read(BytesMut::new());
/// assert!(read.is_err());
/// ```
pub fn read(mut source: BytesMut) -> Result<Frame, SocketDError> {
let frame_size = match source.first() {
None => {0i32}
Some(i) => *i
};
if frame_size > (source.len() + 4) as i32 {
return Err(SocketDError::Channel("Illegal frame stack"));
}
let flag = source.get(2).expect("flag is null");
if frame_size.eq(&8) {
//len[int] + flag[int]
// Fream结构与我的不一致需要重新改一下
let mut frame = Frame::default();
frame.flag= *flag as i32;
Ok(frame)
} else {
todo!("需要对frame结构体进行组装")
}
}
}
use async_trait::async_trait;
use bytes::{Buf, BufMut, BytesMut};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use crate::socketd::transport::core::codec::Coder;
use crate::socketd::transport::core::config::Config;
use crate::socketd::transport::core::error::SocketDError;
use crate::socketd::transport::core::Frame;
pub trait Channel {}
pub struct ChannelDefault {}
/**
* 通道助理
*
* @return 通道
*/
#[async_trait]
pub trait ChannelAssistant<T> {
/**
* 读取
*
* @param target 目标
* @param frame 帧
*/
async fn read_socket(&self, target: &mut T) -> Result<Frame, SocketDError>;
}
pub struct DefaultChannelAssistant {
config: Box<dyn Config>,
}
impl DefaultChannelAssistant {
pub fn init(config: impl Config)->Self{
DefaultChannelAssistant {
config: Box::new(config),
}
}
}
#[async_trait]
impl ChannelAssistant<TcpStream> for DefaultChannelAssistant {
/// <code lang="cn"> 从tcpStream中读取流并解析为Frame结构体 </<code>
/// <code lang="en"> Read the stream from tcpStream and parse it into a Frame structure </<code>
///
/// # Examples
/// ```
/// use tokio::net::TcpStream;
/// use socketd_rust::socketd::transport::core::channel::DefaultChannelAssistant;
/// use socketd_rust::socketd::transport::core::config::{ClientConfig, Config};
/// let config=ClientConfig::new_server_url("sd:tcp://127.0.0.1:8602");
/// let channel_assistant=DefaultChannelAssistant::init(config);
/// let stream = TcpStream::connect(config.get_address()).await.unwrap();
/// ```
async fn read_socket(&self, target: &mut TcpStream) -> Result<Frame, SocketDError> {
let _ = target.readable().await;
let mut buf = BytesMut::with_capacity(4);
let _= target.read(&mut buf).await;
let len = buf.get_u32();
if len == 0 {
return Err(SocketDError::Channel("read 0 bytes"));
}
let mut byte_buff = BytesMut::with_capacity(len as usize);
byte_buff.put_u32(len);
let buff_size = self.config.get_read_buffer_size() as usize;
let mut read_buff = BytesMut::with_capacity(buff_size);
loop {
if read_buff.capacity().eq(&0) {
read_buff.clear();
}
if target.try_read(&mut read_buff).unwrap() > 0 {
byte_buff.put(&mut buf);
} else {
break;
}
}
read_buff.clear();
Coder::read(byte_buff)
}
}
#[cfg(test)]
mod tests {
use tokio::net::TcpStream;
use crate::socketd::transport::core::channel::{ChannelAssistant, DefaultChannelAssistant};
use crate::socketd::transport::core::config::{ClientConfig, Config};
#[tokio::test]
async fn test_read_socket() {
let config=ClientConfig::new_server_url("sd:tcp://127.0.0.1:8602");
let channel_assistant=DefaultChannelAssistant::init(config);
let mut stream = TcpStream::connect(config.get_address()).await.unwrap();
let r=channel_assistant.read_socket(&mut stream).await;
assert!(r.is_err())
}
}