บทนำ : เฟรมเวิร์ก RPC น้ำหนักเบาที่ใช้ Connect UDP, โปรโตคอลเครือข่ายแบบกำหนดเอง (โปรโตคอลตอบสนองคำขออย่างง่าย) และโมเดลเครือข่าย Reactor (หนึ่งลูปต่อเธรด + เธรดพูล)!
C++ 17
千级访问
ในเครื่องปฏิกรณ์ทาสเดี่ยวและเธรดผู้ปฏิบัติงานสี่เธรด!内网穿透
ในกรณีของ IP ที่ไม่ใช่แบบสาธารณะ!留言
ข้อความไว้การใช้งานปัจจุบัน :
# 需要提前安装zlib库
# 本人开发环境 GCC 11.3 CMake 3.25 clion ubuntu 22.04
git clone [email protected]:sorise/muse-rpc.git
cd muse-rpc
cmake -S . -B build
cmake --build
cd build
./muse #启动
แผนภาพสถาปัตยกรรม :
เนื้อหาการกำหนดค่าพื้นฐานมีดังนี้
int main () {
// 启动配置
// 4 设置 线程池最小线程数
// 4 设置 线程池最大线程数
// 4096 线程池任务缓存队列长度
// 3000ms; 动态线程空闲时间 3 秒
// 日志目录
// 是否将日志打印到控制台
muse::rpc::Disposition::Server_Configure ( 4 , 4 , 4096 , 3000ms, " /home/remix/log " , true );
//绑定方法的例子
Normal normal ( 10 , " remix " ); //用户自定义类
// 同步,意味着这个方法一次只能由一个线程执行,不能多个线程同时执行这个方法
muse_bind_sync ( " normal " , &Normal::addValue, & normal ); //绑定成员函数
muse_bind_async ( " test_fun1 " , test_fun1); // test_fun1、test_fun2 是函数指针
muse_bind_async ( " test_fun2 " , test_fun2);
// 开一个线程启动反应堆,等待请求
// 绑定端口 15000, 启动两个从反应堆,每个反应堆最多维持 1500虚链接
// ReactorRuntimeThread::Asynchronous 指定主反应堆新开一个线程运行,而不是阻塞当前线程
Reactor reactor ( 15000 , 2 , 1500 , ReactorRuntimeThread::Asynchronous);
try {
//开始运行
reactor. start ();
} catch ( const ReactorException &ex){
SPDLOG_ERROR ( " Main-Reactor start failed! " );
}
/*
* 当前线程的其他任务
* */
//程序结束
spdlog::default_logger ()-> flush (); //刷新日志
}
หลังจากเริ่มต้น:
ใช้มาโคร muse_bind_sync และ muse_bind_async ก่อนหน้านี้เรียก SynchronousRegistry และอย่างหลังเรียก Registry ต้นแบบมีดังนี้
# include < iostream >
# include " rpc/rpc.hpp "
using namespace muse ::rpc ;
using namespace muse ::pool ;
using namespace std ::chrono_literals ;
//绑定方法的例子
Normal normal ( 10 , " remix " );
// 绑定类的成员函数、使用 只同步方法 绑定
muse_bind_sync ( " normal " , &Normal::addValue, & normal );
// 绑定函数指针
muse_bind_async ( " test_fun1 " , test_fun1);
// 绑定 lambda 表达式
muse_bind_async ( " lambda test " , []( int val)->int{
printf ( " why call me n " );
return 10 + val;
});
วิธีการลงทะเบียนทางฝั่งเซิร์ฟเวอร์จำเป็นต้องใช้วัตถุ Registry และ SynchronousRegistry
คำอธิบาย SynchronousRegistry: value คือฟิลด์ที่ใช้บันทึกจำนวนไคลเอ็นต์ที่ร้องขอเมธอด addValue หากมีไคลเอ็นต์ 1,000 รายร้องขอ addValue และลงทะเบียนโดยใช้ Registry ค่าของค่าอาจไม่เท่ากับ 1,000 เนื่องจากการเข้าถึงค่าไม่ปลอดภัยสำหรับเธรด ที่ลงทะเบียนโดยใช้ SynchronousRegistry จะต้องเป็น 1,000
class Counter {
public:
Counter ():value( 0 ){}
void addValue (){
this -> value ++;
}
private:
long value;
};
วิธีการลงทะเบียน : คำจำกัดความของมาโครทั้งสองมีดังนี้
# define muse_bind_async (...)
Singleton<Registry>()-> Bind (__VA_ARGS__);
//同步方法,一次只能一个线程执行此方法
# define muse_bind_sync (...)
Singleton<SynchronousRegistry>()-> Bind (__VA_ARGS__);
ไคลเอนต์ใช้วัตถุไคลเอนต์ซึ่งจะส่งคืนวัตถุ ผลลัพธ์<R> และวิธีการ isOK จะระบุว่าการส่งคืนสำเร็จหรือไม่! หากส่งคืนค่า false สมาชิก protocolReason จะระบุว่ามีความผิดปกติของเครือข่ายหรือไม่ และสมาชิกการตอบสนองจะระบุว่าเป็นคำขอ rpc และข้อผิดพลาดในการตอบสนองหรือไม่
# include " rpc/rpc.hpp "
# include " rpc/client/client.hpp "
using namespace muse ::rpc ;
using namespace muse ::timer ;
using namespace std ::chrono_literals ;
int main{
// //启动客户端配置
muse::rpc::Disposition::Client_Configure ();
// MemoryPoolSingleton 返回一个 std::shared_ptr<std::pmr::synchronized_pool_resource>
//你可以自己定一个内存池
//传入 服务器地址和服务端端口号、一个C++ 17 标准内存池
Client remix ( " 127.0.0.1 " , 15000 , MemoryPoolSingleton ());
//调用远程方法
Outcome<std::vector< double >> result = remix. call <std::vector< double >>( " test_fun2 " ,scores);
std::cout << result. value . size () << std::endl;
//调用 无参无返回值方法
Outcome< void > result =remix. call < void >( " normal " );
if (result. isOK ()){
std::printf ( " success n " );
} else {
std::printf ( " failed n " );
}
//调用
auto ri = remix. call < int >( " test_fun1 " , 590 );
std::cout << ri. value << std::endl; // 600
};
การจัดการข้อผิดพลาด: ภายใต้สถานการณ์ปกติ คุณจะต้องสนใจว่าเมธอด isOk เป็นจริงหรือไม่ หากคุณต้องการทราบรายละเอียดของข้อผิดพลาด คุณสามารถใช้เมธอดต่อไปนี้ได้ FailureReason และ RpcFailureReason และข้อผิดพลาดคำขอ RPC ตามลำดับ
auto resp = remix.call< int >( " test_fun1 " , 590 );
if (resp.isOK()){
//调用成功
std::cout << " request success n " << std::endl; // 600
std::cout << ri. value << std::endl; // 600
} else {
//调用失败
if (resp. protocolReason == FailureReason::OK){
//错误原因是RPC错误
std::printf ( " rpc error n " );
std::cout << resp. response . getReason () << std::endl;
//返回 int 值对应 枚举 RpcFailureReason
} else {
//错误原因是网络通信过程中的错误
std::printf ( " internet error n " );
std::cout << ( short )resp. protocolReason << std::endl; //错误原因
}
}
// resp.protocolReason() 返回 枚举FailureReason
enum class FailureReason : short {
OK, //没有失败
TheServerResourcesExhausted, //服务器资源耗尽,请勿链接
NetworkTimeout, //网络连接超时
TheRunningLogicOfTheServerIncorrect, //服务器运行逻辑错误,返回的报文并非所需
};
// resp.response.getReason() 返回值 是 int
enum class RpcFailureReason : int {
Success = 0 , // 成功
ParameterError = 1 , // 参数错误,
MethodNotExist = 2 , // 指定方法不存在
ClientInnerException = 3 , // 客户端内部异常,请求还没有到服务器
ServerInnerException = 4 , // 服务器内部异常,请求到服务器了,但是处理过程有异常
MethodExecutionError = 5 , // 方法执行错误
UnexpectedReturnValue = 6 , //返回值非预期
};
คำขอที่ไม่บล็อกหมายความว่าคุณจะต้องตั้งค่างานคำขอและลงทะเบียนฟังก์ชันการโทรกลับเพื่อประมวลผลผลลัพธ์คำขอ กระบวนการส่งได้รับการจัดการโดยอ็อบเจ็กต์ตัวส่ง ดังนั้นเธรดปัจจุบันจะไม่ถูกบล็อกเนื่องจากเหตุผลด้านเครือข่ายและ การประมวลผล การบล็อกจะได้รับการจัดการตามการโทรกลับและสามารถใช้เพื่อจัดการกับคำขอจำนวนมาก ที่นี่เราต้องการออบเจ็กต์ตัวส่งสัญญาณและงานการส่งได้รับการตั้งค่าผ่าน TransmitterEvent!
หมายเหตุ : จำนวนงานที่ส่งโดยตัวส่งสัญญาณเดียวในเวลาเดียวกันควรน้อยกว่า 100 หากจำนวนคำขอที่ส่งเกิน 100 จะต้องเพิ่มการหมดเวลา คุณสามารถเรียกใช้อินเทอร์เฟซต่อไปนี้เพื่อตั้งเวลาการทำงาน
void test_v (){
//启动客户端配置
muse::rpc::Disposition::Client_Configure ();
Transmitter transmitter ( 14500 );
// transmitter.set_request_timeout(1500); //设置请求阶段的等待超时时间
// transmitter.set_response_timeout(2000); //设置响应阶段的等待超时时间
//测试参数
std::vector< double > score = {
100.526 , 95.84 , 75.86 , 99.515 , 6315.484 , 944.5 , 98.2 , 99898.26 ,
9645.54 , 484.1456 , 8974.4654 , 4894.156 , 89 , 12 , 0.56 , 95.56 , 41
};
std::string name {
" asdasd54986198456h487s1as8d7as5d1w877y98j34512g98ad "
" sf3488as31c98aasdasd54986198sdasdasd456h487s1as8d7a "
" s5d1w877y98j34512g98ad "
};
for ( int i = 0 ; i < 1000 ; ++i) {
TransmitterEvent event ( " 127.0.0.1 " , 15000 ); //指定远程IP 、Port
event. call < int >( " read_str " , name,score); //指定方法
event. set_callBack ([](Outcome< int > t){ //设置回调
if (t. isOK ()){
printf ( " OK lambda %d n " , t. value );
} else {
printf ( " fail lambda n " );
}
});
transmitter. send ( std::move (event));
}
//异步启动发射器,将会新开一个线程持续发送
transmitter. start (TransmitterThreadType::Asynchronous);
//停止发射器,这是个阻塞方法,如果发送器还有任务没有处理完,将会等待
transmitter. stop ();
//如果想直接停止可以使用 transmitter.stop_immediately()方法
}
รองรับการกำหนดค่าจำนวนคอร์เธรด จำนวนเธรดสูงสุด ความยาวคิวแคชงาน และเวลาว่างของเธรดแบบไดนามิก!
ThreadPoolSetting::MinThreadCount = 4 ; //设置 核心线程数
ThreadPoolSetting::MaxThreadCount = 4 ; //设置 核心线程数
ThreadPoolSetting::TaskQueueLength = 4096 ; //设置 任务缓存队列长度
ThreadPoolSetting::DynamicThreadVacantMillisecond = 3000ms; //动态线程空闲时间
บทนำ : Simple Request Response Protocol (SR2P protocol) เป็นโปรโตคอลสองเฟสที่ปรับแต่งเป็นพิเศษสำหรับ RPC โดยแบ่งออกเป็นสองขั้นตอน: คำขอและการตอบสนองโดยไม่ต้องสร้างลิงก์
ฟิลด์โปรโตคอลมีดังนี้ ส่วนหัวของโปรโตคอลคือ 26 ไบต์ ลำดับไบต์ของฟิลด์คือ big endian และส่วนข้อมูลเป็น little endian เนื่องจากข้อจำกัดของ MTU MTU มาตรฐานของเครือข่ายคือ 576 และส่วนข้อมูลสูงถึง 522 ไบต์ เพิ่มเติม โปรดดู Protocol.pdf
11110000
หนึ่งไบต์โปรโตคอล SR2P จะกำหนดจำนวนดาตาแกรมที่ถูกสร้างขึ้นในแต่ละครั้งตามจำนวนข้อมูลที่สร้างขึ้น ต่อไปนี้จะใช้ 2 เดตาแกรมต่อครั้งเป็นตัวอย่าง
สำหรับรายละเอียดเกี่ยวกับกระบวนการจัดการสถานการณ์อื่นๆ โปรดดูเอกสาร Protocol.pdf