소개 : Connect UDP, 사용자 정의 네트워크 프로토콜(간단한 요청 응답 프로토콜) 및 Reactor 네트워크 모델(스레드당 하나의 루프 + 스레드 풀)을 기반으로 하는 경량 RPC 프레임워크입니다!
C++ 17
표준 메모리 풀 채택千级访问
동시성에는 문제가 없습니다!内网穿透
에 주로 사용되는 RPC 요청을 적극적으로 시작하도록 서버를 지원합니다!留言
.현재 사용량 :
# 需要提前安装zlib库
# 本人开发环境 GCC 11.3 CMake 3.25 clion ubuntu 22.04
git clone [email protected]:sorise/muse-rpc.git
cd muse-rpc
cmake -S . -B build
cmake --build
cd build
./muse #启动
아키텍처 다이어그램 :
기본 구성 내용은 다음과 같습니다
int main () {
// 启动配置
// 4 设置 线程池最小线程数
// 4 设置 线程池最大线程数
// 4096 线程池任务缓存队列长度
// 3000ms; 动态线程空闲时间 3 秒
// 日志目录
// 是否将日志打印到控制台
muse::rpc::Disposition::Server_Configure ( 4 , 4 , 4096 , 3000ms, " /home/remix/log " , true );
//绑定方法的例子
Normal normal ( 10 , " remix " ); //用户自定义类
// 同步,意味着这个方法一次只能由一个线程执行,不能多个线程同时执行这个方法
muse_bind_sync ( " normal " , &Normal::addValue, & normal ); //绑定成员函数
muse_bind_async ( " test_fun1 " , test_fun1); // test_fun1、test_fun2 是函数指针
muse_bind_async ( " test_fun2 " , test_fun2);
// 开一个线程启动反应堆,等待请求
// 绑定端口 15000, 启动两个从反应堆,每个反应堆最多维持 1500虚链接
// ReactorRuntimeThread::Asynchronous 指定主反应堆新开一个线程运行,而不是阻塞当前线程
Reactor reactor ( 15000 , 2 , 1500 , ReactorRuntimeThread::Asynchronous);
try {
//开始运行
reactor. start ();
} catch ( const ReactorException &ex){
SPDLOG_ERROR ( " Main-Reactor start failed! " );
}
/*
* 当前线程的其他任务
* */
//程序结束
spdlog::default_logger ()-> flush (); //刷新日志
}
시작 후:
muse_bind_sync 및 muse_bind_async 매크로를 사용하세요. 전자는 SynchronousRegistry를 호출하고 후자는 Registry를 호출합니다. 프로토타입은 다음과 같습니다.
# include < iostream >
# include " rpc/rpc.hpp "
using namespace muse ::rpc ;
using namespace muse ::pool ;
using namespace std ::chrono_literals ;
//绑定方法的例子
Normal normal ( 10 , " remix " );
// 绑定类的成员函数、使用 只同步方法 绑定
muse_bind_sync ( " normal " , &Normal::addValue, & normal );
// 绑定函数指针
muse_bind_async ( " test_fun1 " , test_fun1);
// 绑定 lambda 表达式
muse_bind_async ( " lambda test " , []( int val)->int{
printf ( " why call me n " );
return 10 + val;
});
서버측 등록 방법에는 Registry 및 동기식Registry 개체를 사용해야 합니다.
동기식 등록 설명: 값은 addValue 메소드를 요청한 클라이언트 수를 기록하는 데 사용되는 필드입니다. 1000개의 클라이언트가 addValue를 요청하고 레지스트리를 사용하여 등록한 경우 값에 대한 액세스가 스레드로부터 안전하지 않기 때문에 값이 1000이 아닐 수 있습니다. SynchronousRegistry를 사용하여 등록된 경우 1000이어야 합니다.
class Counter {
public:
Counter ():value( 0 ){}
void addValue (){
this -> value ++;
}
private:
long value;
};
등록방법 : 두 매크로의 정의는 다음과 같습니다.
# define muse_bind_async (...)
Singleton<Registry>()-> Bind (__VA_ARGS__);
//同步方法,一次只能一个线程执行此方法
# define muse_bind_sync (...)
Singleton<SynchronousRegistry>()-> Bind (__VA_ARGS__);
클라이언트는 Outcome<R> 객체를 반환하는 Client 객체를 사용하고 isOK 메소드는 반환이 성공적인지 여부를 나타냅니다! false가 반환되면 해당 멤버 프로토콜Reason은 네트워크 이상이 있는지 여부를 나타내고, 응답 멤버는 rpc 요청 및 응답 오류인지 여부를 나타냅니다.
# include " rpc/rpc.hpp "
# include " rpc/client/client.hpp "
using namespace muse ::rpc ;
using namespace muse ::timer ;
using namespace std ::chrono_literals ;
int main{
// //启动客户端配置
muse::rpc::Disposition::Client_Configure ();
// MemoryPoolSingleton 返回一个 std::shared_ptr<std::pmr::synchronized_pool_resource>
//你可以自己定一个内存池
//传入 服务器地址和服务端端口号、一个C++ 17 标准内存池
Client remix ( " 127.0.0.1 " , 15000 , MemoryPoolSingleton ());
//调用远程方法
Outcome<std::vector< double >> result = remix. call <std::vector< double >>( " test_fun2 " ,scores);
std::cout << result. value . size () << std::endl;
//调用 无参无返回值方法
Outcome< void > result =remix. call < void >( " normal " );
if (result. isOK ()){
std::printf ( " success n " );
} else {
std::printf ( " failed n " );
}
//调用
auto ri = remix. call < int >( " test_fun1 " , 590 );
std::cout << ri. value << std::endl; // 600
};
오류 처리: 일반적인 상황에서는 isOk 메서드가 true인지 여부에만 주의하면 됩니다. 오류의 세부 정보를 알아야 하는 경우 두 가지 열거 개체인 FailureReason 및 RpcFailureReason을 사용하여 네트워크 오류를 지적할 수 있습니다. 및 RPC 요청 오류가 각각 발생합니다.
auto resp = remix.call< int >( " test_fun1 " , 590 );
if (resp.isOK()){
//调用成功
std::cout << " request success n " << std::endl; // 600
std::cout << ri. value << std::endl; // 600
} else {
//调用失败
if (resp. protocolReason == FailureReason::OK){
//错误原因是RPC错误
std::printf ( " rpc error n " );
std::cout << resp. response . getReason () << std::endl;
//返回 int 值对应 枚举 RpcFailureReason
} else {
//错误原因是网络通信过程中的错误
std::printf ( " internet error n " );
std::cout << ( short )resp. protocolReason << std::endl; //错误原因
}
}
// resp.protocolReason() 返回 枚举FailureReason
enum class FailureReason : short {
OK, //没有失败
TheServerResourcesExhausted, //服务器资源耗尽,请勿链接
NetworkTimeout, //网络连接超时
TheRunningLogicOfTheServerIncorrect, //服务器运行逻辑错误,返回的报文并非所需
};
// resp.response.getReason() 返回值 是 int
enum class RpcFailureReason : int {
Success = 0 , // 成功
ParameterError = 1 , // 参数错误,
MethodNotExist = 2 , // 指定方法不存在
ClientInnerException = 3 , // 客户端内部异常,请求还没有到服务器
ServerInnerException = 4 , // 服务器内部异常,请求到服务器了,但是处理过程有异常
MethodExecutionError = 5 , // 方法执行错误
UnexpectedReturnValue = 6 , //返回值非预期
};
비차단 요청은 요청 작업을 설정하고 요청 결과를 처리하기 위한 콜백 함수만 등록하면 전송 프로세스가 Transmitter 개체에 의해 처리되므로 네트워크 문제로 인해 현재 스레드가 차단되지 않습니다. 처리는 콜백을 기반으로 처리되며 많은 수의 요청을 처리하는 데 사용될 수 있습니다. 여기서는 Transmitter 개체가 필요하며 전송 작업은 TransmitterEvent를 통해 설정됩니다.
참고 : 단일 송신기가 동시에 전송하는 작업 수는 100개 미만이어야 합니다. 전송된 요청 작업 수가 100개를 초과하는 경우 다음 인터페이스를 호출하여 작업 시간을 설정할 수 있습니다.
void test_v (){
//启动客户端配置
muse::rpc::Disposition::Client_Configure ();
Transmitter transmitter ( 14500 );
// transmitter.set_request_timeout(1500); //设置请求阶段的等待超时时间
// transmitter.set_response_timeout(2000); //设置响应阶段的等待超时时间
//测试参数
std::vector< double > score = {
100.526 , 95.84 , 75.86 , 99.515 , 6315.484 , 944.5 , 98.2 , 99898.26 ,
9645.54 , 484.1456 , 8974.4654 , 4894.156 , 89 , 12 , 0.56 , 95.56 , 41
};
std::string name {
" asdasd54986198456h487s1as8d7as5d1w877y98j34512g98ad "
" sf3488as31c98aasdasd54986198sdasdasd456h487s1as8d7a "
" s5d1w877y98j34512g98ad "
};
for ( int i = 0 ; i < 1000 ; ++i) {
TransmitterEvent event ( " 127.0.0.1 " , 15000 ); //指定远程IP 、Port
event. call < int >( " read_str " , name,score); //指定方法
event. set_callBack ([](Outcome< int > t){ //设置回调
if (t. isOK ()){
printf ( " OK lambda %d n " , t. value );
} else {
printf ( " fail lambda n " );
}
});
transmitter. send ( std::move (event));
}
//异步启动发射器,将会新开一个线程持续发送
transmitter. start (TransmitterThreadType::Asynchronous);
//停止发射器,这是个阻塞方法,如果发送器还有任务没有处理完,将会等待
transmitter. stop ();
//如果想直接停止可以使用 transmitter.stop_immediately()方法
}
코어 스레드 수, 최대 스레드 수, 작업 캐시 대기열 길이 및 동적 스레드 유휴 시간 구성을 지원합니다!
ThreadPoolSetting::MinThreadCount = 4 ; //设置 核心线程数
ThreadPoolSetting::MaxThreadCount = 4 ; //设置 核心线程数
ThreadPoolSetting::TaskQueueLength = 4096 ; //设置 任务缓存队列长度
ThreadPoolSetting::DynamicThreadVacantMillisecond = 3000ms; //动态线程空闲时间
소개 : SR2P 프로토콜(Simple Request Response Protocol)은 RPC용으로 특별히 맞춤화된 2단계 프로토콜로 링크를 설정하지 않고 요청과 응답의 두 단계로 나뉩니다.
프로토콜 필드는 다음과 같으며, 프로토콜 헤더는 26바이트, 필드 바이트 순서는 빅 엔디안, 데이터 부분은 MTU 제한으로 인해 네트워크 표준 MTU는 576, 데이터 부분은 최대 522입니다. 자세한 내용은 Protocol.pdf를 참조하세요.
11110000
, 1바이트입니다.SR2P 프로토콜은 생성된 데이터 양에 따라 매번 생성되는 데이터그램 수를 결정합니다. 다음은 기본적으로 일반적인 상황에서 한 번에 2개의 데이터그램을 예로 들어 보겠습니다.
기타 상황 처리 과정에 대한 자세한 내용은 Protocol.pdf 문서를 참고하세요.