概要: Connect UDP、カスタム ネットワーク プロトコル (単純な要求応答プロトコル)、および Reactor ネットワーク モデル (スレッドごとに 1 つのループ + スレッド プール) に基づく軽量 RPC フレームワーク!
C++ 17
標準メモリプールを採用千级访问
に問題はありません。内网穿透
に使用される RPC リクエストをアクティブに開始するサーバーをサポートします。留言
ください。現在の使用状況:
# 需要提前安装zlib库
# 本人开发环境 GCC 11.3 CMake 3.25 clion ubuntu 22.04
git clone [email protected]:sorise/muse-rpc.git
cd muse-rpc
cmake -S . -B build
cmake --build
cd build
./muse #启动
アーキテクチャ図:
基本的な設定内容は以下の通りです
int main () {
// 启动配置
// 4 设置 线程池最小线程数
// 4 设置 线程池最大线程数
// 4096 线程池任务缓存队列长度
// 3000ms; 动态线程空闲时间 3 秒
// 日志目录
// 是否将日志打印到控制台
muse::rpc::Disposition::Server_Configure ( 4 , 4 , 4096 , 3000ms, " /home/remix/log " , true );
//绑定方法的例子
Normal normal ( 10 , " remix " ); //用户自定义类
// 同步,意味着这个方法一次只能由一个线程执行,不能多个线程同时执行这个方法
muse_bind_sync ( " normal " , &Normal::addValue, & normal ); //绑定成员函数
muse_bind_async ( " test_fun1 " , test_fun1); // test_fun1、test_fun2 是函数指针
muse_bind_async ( " test_fun2 " , test_fun2);
// 开一个线程启动反应堆,等待请求
// 绑定端口 15000, 启动两个从反应堆,每个反应堆最多维持 1500虚链接
// ReactorRuntimeThread::Asynchronous 指定主反应堆新开一个线程运行,而不是阻塞当前线程
Reactor reactor ( 15000 , 2 , 1500 , ReactorRuntimeThread::Asynchronous);
try {
//开始运行
reactor. start ();
} catch ( const ReactorException &ex){
SPDLOG_ERROR ( " Main-Reactor start failed! " );
}
/*
* 当前线程的其他任务
* */
//程序结束
spdlog::default_logger ()-> flush (); //刷新日志
}
起動後:
マクロmuse_bind_syncおよびmuse_bind_asyncを使用します。前者は SynchronousRegistry を呼び出し、後者は Registry を呼び出します。 プロトタイプは次のとおりです。
# include < iostream >
# include " rpc/rpc.hpp "
using namespace muse ::rpc ;
using namespace muse ::pool ;
using namespace std ::chrono_literals ;
//绑定方法的例子
Normal normal ( 10 , " remix " );
// 绑定类的成员函数、使用 只同步方法 绑定
muse_bind_sync ( " normal " , &Normal::addValue, & normal );
// 绑定函数指针
muse_bind_async ( " test_fun1 " , test_fun1);
// 绑定 lambda 表达式
muse_bind_async ( " lambda test " , []( int val)->int{
printf ( " why call me n " );
return 10 + val;
});
サーバー側の登録メソッドでは、Registry オブジェクトと SynchronousRegistry オブジェクトを使用する必要があります。
SynchronousRegistry の説明: value は、addValue メソッドを要求したクライアントの数を記録するために使用されるフィールドです。1000 個のクライアントが addValue を要求し、レジストリを使用して登録されている場合、value へのアクセスはスレッドセーフではないため、value の値は 1000 にならない可能性があります。 SynchronousRegistry を使用して登録されている場合は、1000 である必要があります。
class Counter {
public:
Counter ():value( 0 ){}
void addValue (){
this -> value ++;
}
private:
long value;
};
登録方法:2つのマクロの定義は以下の通りです。
# define muse_bind_async (...)
Singleton<Registry>()-> Bind (__VA_ARGS__);
//同步方法,一次只能一个线程执行此方法
# define muse_bind_sync (...)
Singleton<SynchronousRegistry>()-> Bind (__VA_ARGS__);
クライアントは、 Outcome<R>オブジェクトを返す Client オブジェクトを使用します。isOK メソッドは、戻りが成功したかどうかを示します。 false が返された場合、そのメンバーのprotocolReasonはネットワーク異常の有無を示し、responseメンバーはrpcリクエストとレスポンスのエラーかどうかを示します。
# include " rpc/rpc.hpp "
# include " rpc/client/client.hpp "
using namespace muse ::rpc ;
using namespace muse ::timer ;
using namespace std ::chrono_literals ;
int main{
// //启动客户端配置
muse::rpc::Disposition::Client_Configure ();
// MemoryPoolSingleton 返回一个 std::shared_ptr<std::pmr::synchronized_pool_resource>
//你可以自己定一个内存池
//传入 服务器地址和服务端端口号、一个C++ 17 标准内存池
Client remix ( " 127.0.0.1 " , 15000 , MemoryPoolSingleton ());
//调用远程方法
Outcome<std::vector< double >> result = remix. call <std::vector< double >>( " test_fun2 " ,scores);
std::cout << result. value . size () << std::endl;
//调用 无参无返回值方法
Outcome< void > result =remix. call < void >( " normal " );
if (result. isOK ()){
std::printf ( " success n " );
} else {
std::printf ( " failed n " );
}
//调用
auto ri = remix. call < int >( " test_fun1 " , 590 );
std::cout << ri. value << std::endl; // 600
};
エラー処理:通常の状況では、isOk メソッドが true であるかどうかに注意するだけで済みます。エラーの詳細を知る必要がある場合は、次の 2 つの列挙オブジェクト FailureReason と RpcFailureReason を使用してネットワーク エラーを指摘できます。および RPC 要求エラーがそれぞれ発生します。
auto resp = remix.call< int >( " test_fun1 " , 590 );
if (resp.isOK()){
//调用成功
std::cout << " request success n " << std::endl; // 600
std::cout << ri. value << std::endl; // 600
} else {
//调用失败
if (resp. protocolReason == FailureReason::OK){
//错误原因是RPC错误
std::printf ( " rpc error n " );
std::cout << resp. response . getReason () << std::endl;
//返回 int 值对应 枚举 RpcFailureReason
} else {
//错误原因是网络通信过程中的错误
std::printf ( " internet error n " );
std::cout << ( short )resp. protocolReason << std::endl; //错误原因
}
}
// resp.protocolReason() 返回 枚举FailureReason
enum class FailureReason : short {
OK, //没有失败
TheServerResourcesExhausted, //服务器资源耗尽,请勿链接
NetworkTimeout, //网络连接超时
TheRunningLogicOfTheServerIncorrect, //服务器运行逻辑错误,返回的报文并非所需
};
// resp.response.getReason() 返回值 是 int
enum class RpcFailureReason : int {
Success = 0 , // 成功
ParameterError = 1 , // 参数错误,
MethodNotExist = 2 , // 指定方法不存在
ClientInnerException = 3 , // 客户端内部异常,请求还没有到服务器
ServerInnerException = 4 , // 服务器内部异常,请求到服务器了,但是处理过程有异常
MethodExecutionError = 5 , // 方法执行错误
UnexpectedReturnValue = 6 , //返回值非预期
};
ノンブロッキング リクエストとは、リクエスト タスクを設定し、リクエスト結果を処理するコールバック関数を登録するだけで済むことを意味し、送信プロセスは Transmitter オブジェクトによって処理されるため、ネットワーク上の理由により現在のスレッドがブロックされることはありません。ブロックはコールバックに基づいて処理され、多数のリクエストを処理するために使用できます。ここでは Transmitter オブジェクトが必要であり、送信タスクは TransmitterEvent を通じて設定されます。
注: 1 つのトランスミッターによって同時に送信されるタスクの数は 100 未満である必要があります。送信されるリクエスト タスクの数が 100 を超える場合は、タイムアウトを増やす必要があります。次のインターフェイスを呼び出して動作時間を設定できます。
void test_v (){
//启动客户端配置
muse::rpc::Disposition::Client_Configure ();
Transmitter transmitter ( 14500 );
// transmitter.set_request_timeout(1500); //设置请求阶段的等待超时时间
// transmitter.set_response_timeout(2000); //设置响应阶段的等待超时时间
//测试参数
std::vector< double > score = {
100.526 , 95.84 , 75.86 , 99.515 , 6315.484 , 944.5 , 98.2 , 99898.26 ,
9645.54 , 484.1456 , 8974.4654 , 4894.156 , 89 , 12 , 0.56 , 95.56 , 41
};
std::string name {
" asdasd54986198456h487s1as8d7as5d1w877y98j34512g98ad "
" sf3488as31c98aasdasd54986198sdasdasd456h487s1as8d7a "
" s5d1w877y98j34512g98ad "
};
for ( int i = 0 ; i < 1000 ; ++i) {
TransmitterEvent event ( " 127.0.0.1 " , 15000 ); //指定远程IP 、Port
event. call < int >( " read_str " , name,score); //指定方法
event. set_callBack ([](Outcome< int > t){ //设置回调
if (t. isOK ()){
printf ( " OK lambda %d n " , t. value );
} else {
printf ( " fail lambda n " );
}
});
transmitter. send ( std::move (event));
}
//异步启动发射器,将会新开一个线程持续发送
transmitter. start (TransmitterThreadType::Asynchronous);
//停止发射器,这是个阻塞方法,如果发送器还有任务没有处理完,将会等待
transmitter. stop ();
//如果想直接停止可以使用 transmitter.stop_immediately()方法
}
コア スレッドの数、スレッドの最大数、タスク キャッシュ キューの長さ、および動的スレッド アイドル時間の構成をサポートします。
ThreadPoolSetting::MinThreadCount = 4 ; //设置 核心线程数
ThreadPoolSetting::MaxThreadCount = 4 ; //设置 核心线程数
ThreadPoolSetting::TaskQueueLength = 4096 ; //设置 任务缓存队列长度
ThreadPoolSetting::DynamicThreadVacantMillisecond = 3000ms; //动态线程空闲时间
はじめに: Simple Request Response Protocol (SR2P プロトコル) は、RPC 用に特別にカスタマイズされた 2 フェーズのプロトコルで、リンクを確立しない要求と応答の 2 段階に分かれています。
プロトコル フィールドは次のとおりです。プロトコル ヘッダーは 26 バイト、フィールドのバイト オーダーはビッグ エンディアン、データ部分はリトル エンディアンです。MTU の制限により、ネットワーク標準の MTU は 576、データ部分は最大 522 です。詳細については、Protocol.pdf を参照してください。
11110000
、1 バイトです。SR2P プロトコルは、生成されるデータ量に基づいて毎回生成されるデータグラムの数を決定します。次に、基本的に通常の状況でのリクエスト フローチャートを例として示します。
他の状況の処理プロセスの詳細については、Protocol.pdf ドキュメントを参照してください。