Qbit Java Micorservices lib教程| QBIT網站| Qbit使用Reakt | QBIT與Vert.x |一起使用Reakt Vertx
Java微服務自由。 QBIT是用於構建微服務的反應性編程LIB -JSON,HTTP,WebSocket和REST。 QBIT使用反應性編程來構建彈性休息和基於Websocket的雲友好,Web服務。 SOA用於移動和雲。維修,健康,反應性統計服務,事件,用於微服務的Java慣用反應性編程。
有問題嗎?在這裡詢問:QBIT Google Group。
一切都是隊列。你有選擇。您可以擁抱並控制它。您可以為此進行優化。或者,您可以躲在抽像後面。 QBIT讓您窺視正在發生的事情,並讓您在不出售靈魂的情況下拉動一些槓桿。
Qbit是庫而不是框架。您可以將Qbit與春季,Guice等混合匹配。
QBIT現在支持REAKT引用的本地和遠程客戶端代理的承諾。這為異步編程提供了一個不錯的流利API。
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();
QBIT回調現在也已成為REAKT回調,而無需打破QBIT合同以進行回調。
有關更多詳細信息,請參見Reakt引起的承諾。
QBIT已發布給Maven Public Repo。
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'
部署在幾家大型財富100大公司中。 QBIT現在與VERTX(獨立或嵌入式)一起使用。您也可以在非QBIT項目上使用QBIT,這只是一個LIB。
Apache 2
QBIT具有INPROC服務,REST微服務和Websocket微服務以及程序內服務事件總線(可以是每個模塊或每個應用程序)。它支持工人和內存服務。
在我們描述更多之前,這裡有兩個示例服務:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {...
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}
歸根結底,Qbit是一個簡單的庫而不是一個框架。您的應用不是QBIT應用程序,而是使用QBIT LIB的Java應用程序。 QBIT允許您與Java Util並發合作,並且不會努力向您隱藏它。只是試圖將刺痛從中取出。
我們已經在Boon和QBIT中使用了技術,在高端高性能,高尺度的應用程序中取得了巨大成功。我們用QBIT中的技術幫助客戶通過1/10的競爭對手服務器來處理10倍負載。 Qbit是我們厭倦了手工調整隊列訪問和線程。
Boon和Qbit的想法通常來自網絡。我們犯錯。指出他們。作為Boon和Qbit的開發商,我們是旅行者。如果您想分享一個想法或技術,我們會聽。
Boon/Qbit的一個很大的靈感是Vertx,Akka,Go頻道,活動對象,公寓模型螺紋,演員和機械同情論文。
Qbit的想法與許多框架相似。我們都在閱讀相同的論文。 QBIT從LMAX DISPRUEATER論文中獲得了啟發,以及有關鏈接傳輸隊列與破壞者的博客文章。我們有一些關於隊列的理論,博客文章啟發了我們嘗試一下。其中一些理論部署在一些最大的中間件後端,其名稱品牌在世界範圍內廣為人知。因此Qbit誕生了。
Qbit還為蒂姆·福克斯(Tim Fox)在VERTX上完成的偉大工作汲取了很多靈感。第一個使用實際上稱為QBIT的項目(儘管早期QBIT)是在Web/Mobile Microservice上使用VERTX來用於可能有8000萬用戶的應用程序。正是這種在VERTX和早期QBIT的經歷導致了QBIT的發展和進化。 QBIT建立在巨人(Netty/Vertx)的肩膀上。
春季破壞者:不。我想您可以使用Qbit為彈簧破壞者編寫插件,但是QBIT並未與彈簧破壞者競爭。春季啟動/春季MVC:否。我們使用相同的註釋,但QBIT適用於高速內存微服務。它更像是Akka,而不是Spring Boot。 QBIT具有僅適用於微服務的Spring MVC功能的子集,即,Websocket RPC,REST,JSON MASSHALING等。AKKA:不,也許。 Akka具有類似的概念,但他們採用了不同的方法。 QBIT比Akka更專注於Java和Microservices(REST,JSON,Websocket)。 lmax顛覆器:不。實際上,我們可以使用disruptor作為Qbit在蓋下使用的隊列。
(已經刪除了早期基準。它們在這裡。QBIT的速度更快。基準測試QBIT目前是一個移動的目標。將創建鏈接和報告。)
代碼示例
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)
QBIT是用於微服務的排隊庫。它類似於其他許多項目,例如Akka,Spring Reactor等。QBIT只是一個庫而不是平台。 Qbit有圖書館可以在隊列後面提供服務。您可以直接使用QBIT隊列,也可以創建服務。 QBIT服務可以由Websocket,HTTP,HTTP管道和其他類型的遠程服務公開。 QBIT中的服務是Java類,其方法是在服務隊列後面執行的。 QBIT實施公寓模型螺紋,與Actor模型相似,或者更好的描述將是活動對象。 QBIT不使用破壞者(但可以)。它使用常規的Java隊列。 Qbit可以在每秒1億次乒乓球的北部進行,這是一個驚人的速度(高達200m)。 QBIT還通過REST和Websocket支持呼叫服務。 QBIT是純Web意義上的微服務:JSON,HTTP,WebSocket等。QBIT使用微批次來通過管道推動消息(隊列,IO等)更快地減少線程交換。
QBIT是支持REST,JSON和WebSocket的Java微服務LIB。它是用Java編寫的,但有一天我們可以用Rust或Go或C#寫一個版本(但這需要大的發薪日)。
服務pojo(普通舊的java對象)在隊列後面可以通過代理呼叫或事件接收方法調用(可能有一個線程管理事件,方法呼叫和兩個響應或兩個用於方法呼叫和事件,而另一個用於響應除非響應塊,否則請勿阻止服務。服務可以使用Spring MVC樣式REST註釋通過Rest和Websocket暴露於外界。
ServiceBundle在一個響應隊列後面的許多Pojos,許多Pojos都會收到隊列。所有響應是否都可能有一個線程。他們也可以是一個接收隊列。
排隊管理隊列的線程。它支持批處理。它有空的事件,到達,到達Limit,startBatch,閒置。您可以從排隊後面的服務中收聽這些事件。您不必使用服務。您可以使用隊列的直接。在QBIT中,您有發件人的隊列和接收器隊列。它們分開以支持微批量。
ServiceEndPointServer ServiceBundle,可暴露於REST和WESTOCKECT通信。
EventBus EventBus是一種向可能鬆散耦合的服務發送大量消息的方式。
ClientProxy ClientProxy是通過異步接口調用服務的一種方式,服務可以是INPROC(相同的過程)或通過WebSocket遠離。
非阻滯QBIT是一種非阻滯性LIB。您可以通過Java 8 Lambdas使用回調。您還可以發送事件消息並獲得答复。消息傳遞已內置在系統中,因此您可以輕鬆地協調複雜的任務。 QBIT採用以對象為導向的服務開發方法,因此服務看起來像您已經編寫的普通Java服務,但是服務在隊列/線程後面存在。這不是一個新概念。 Microsoft使用DCOM/COM進行了此操作,並將其稱為Active對象。 Akka與演員一起做這件事,並稱他們為強烈打字的演員。重要的概念是,您可以獲得反應性和演員風格的消息傳遞的速度,但以自然的OOP方法發展。 Qbit不是第一個。 Qbit並不是唯一的。
QBIT非常快。當然有很多改進的空間。但是,已經200m+ TPS Inproc ping Pong,10m-20m+ TPS事件總線,500K TPS RPC通過WebSocket/JSOND呼叫。 JSON支持使用BOON默認情況下,其剩下的JSON/JSON,WebSocket/JSON用例的其他JSON解析器的速度最高4倍。
反應性編程QBIT提供了一個反應器來管理異步調用。這允許在稱呼它們的同一線程上處理回調,並提供超時和錯誤處理。讀取用於創建反應性微服務編程的反應器教程
服務發現,以支持服務發現的支持。這包括與領事的集成。
STATSERVICE構建以支持統計數據。 STATSERVICE可以與Statsd (Graphite,Grafana,DataDog等)集成以發布被動統計數據。或者,您可以查詢統計引擎並對統計數據(計數,時間和級別)做出反應。 STATSSERVICE是一個可以聚類的反應性統計系統。 STATSERVICE具有反應性,因為您的服務可以發布並根據結果進行查詢並做出反應。您可以實施諸如限制速率的事情,並對提高的事物做出反應。 Serviciscovery系統與Healthsystem集成在一起,並彙總您的每項內部服務,這些內部服務構成了您的微服務,並將您的微服務的複合材料發佈到單個HTTP端點或領事(TTL)中的Dead Mans Switch。
談話很便宜。讓我們看一些代碼。您可以在Wiki中進行詳細的步行。我們已經有很多文檔了。
我們將創建通過REST/JSON暴露的服務。
查詢待辦事項列表的大小:
curl localhost:8080/services/todo-service/todo/count
添加一個新的湯託物品。
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todo
獲取待辦事項清單
curl http://localhost:8080/services/todo-service/todo/
待辦事項將使用和跟踪待辦事項。
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;
TodoService使用Spring MVC樣式註釋。
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
}
您可以發布/放置非json,並且可以將其捕獲為String
或byte[]
。如果內容類型設置為application/json
,並且您的身體被定義為字符串或字節[]。這會自動起作用。 (必須設置內容類型。)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
}
默認情況下,QBIT將發送200
(確定)以進行非空隙呼叫(帶有返回或回調的呼叫)。如果其餘操作沒有返回或沒有回調,則QBIT發送202
(接受)。有時候,您可能要發送201(創建)或其他不例外的代碼。您可以通過在@RequestMapping
上設置code
來做到這一點。默認情況下,代碼為-1,這意味著使用默認行為(成功的200,單向消息202,錯誤500)。
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
}
Callbacks
也可用於內部服務。通常,您使用回調佈置器或QBIT反應器來管理服務呼叫。
您不必返回JSON表格rest呼叫。您可以使用HttpBinaryResponse
和HttpTextResponse
返回任何二進製或任何文本。
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
}
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}
我們為什麼選擇春季風格的註釋?
現在就開始。
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}
就是這樣。還提供了帶有客戶端代理生成的Box Websocket支持,因此您可以以每秒數百萬個電話的速度撥打服務。
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}
您始終可以通過Websocket代理調用QBIT服務。 WebSocket代理的優點是,它允許您執行1M RPC+一秒鐘(每秒100萬個遙控電話)。
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );
輸出為3。
3
以上使用WebSocket代理接口來調用服務異步。
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}
創建Websocket服務客戶端的客戶端口。
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" );
當前, clientBuilder
將加載以服務名稱註冊的所有服務端點,並隨機選擇一個。
ServiceScovery包括基於領事的,在磁盤上觀看JSON文件和DNS。也很容易編寫自己的服務發現並將其插入QBIT。
將來,如果連接已關閉,我們可以通話或撥打Websocket服務和/或提供自動失敗。我們為使用服務發現的事件總線這樣做,但尚未烘烤到基於Websocket的客戶端存根中。
最後一個客戶端示例使用websocket。您也可以只使用REST,然後實際使用我們設置的URI參數。休息很好,但是比WebSocket的支持要慢。
QBIT與一個漂亮的小型HTTP客戶端發貨。我們可以使用它。
您可以使用它與HTTP客戶端一起發送異步電話和Websocket消息。
在這裡,我們將使用HTTP客戶端調用我們的遠程方法:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );
輸出為4。
4
您也可以從Curl訪問服務。
$ curl http://localhost:7000/services/adder-service/add/2/2
在此處查看此完整示例:QBIT微服務入門教程。
QBIT URI參數和WebSocket代理客戶端
QBIT有一個可以使用和編寫異步微服務的庫,使用的是輕巧且有趣的。
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
});
/* Start the server. */
httpServer . start ();
/** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
/* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
現在停止服務器和客戶端。非常容易嗎?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
重點是易用性,並使用Java 8 lambdas進行回調,因此代碼緊密而小。
在此處查找有關QBIT的微服務樣式Websocket支持的更多信息
現在,讓我們嘗試一下我們的HTTP客戶端。
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
您只需通過URL,端口,然後致電啟動即可。
現在,您可以開始發送HTTP請求。
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );
HTTP響應僅包含服務器的結果。
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}
有同步HTTP GET呼叫的輔助方法。
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
puts方法是助手方法,它或多或少或多或少。
前五個參數被涵蓋。超過五個,您必須使用httpbuilder。
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );
也有異步呼籲獲得。
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );
[在此處找到有關易於使用的易於使用的有關易於使用的易於使用的微服務http client](https://github.com/advantage/qbit/qbit/wiki/%5bdoc%5d-usis-roservice- microservice-microservice-lib's-httpclient-get-post,post,post, -et-al,-json,-java-8-lambda)。
QBIT允許在程序內運行隊列後面的服務。
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());
正在編寫有關程序內服務的詳細教程。
QBIT事件總線更詳細的示例
QBIT還擁有服務事件總線。此示例是一個員工福利服務示例。
我們有兩個渠道。
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
員工對像看起來像:
public static class Employee {
final String firstName ;
final int employeeId ;
此示例有三個服務:員工HiringService,Benefitservice和PayrollService。
這些服務是INPROC服務。 QBIT也支持WebSocket,HTTP和REST遠程服務,但就目前而言,讓我們專注於INPROC服務。如果您了解INPROC,那麼您將了解遠程。
員工Hiringservice實際上向其他兩項服務發射了事件。
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}
請注意,我們致電Sendarray,以便我們派遣員工及其工資。 PayRoll_Adjustment_Channel的聽眾將必須同時處理代表新員工薪水的INT。您也可以使用Event Bus代理,因此您根本不必撥打活動總線。
福利服務聆聽被雇用的新員工,因此可以將他們納入福利系統。
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}
爸爸需要獲得報酬。
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}
員工是員工對象的僱員對象。
這樣您就可以獲得自己的福利並付款!
在此處找到更多詳細信息:
QBIT事件總線更詳細的示例
您可以將自己的接口定義到活動總線,可以使用QBIT自己的事件總線。您服務中的每個模塊都可以擁有自己的內部活動總線。
要了解更多閱讀:QBIT微服務與私人事件總線和QBIT Java微服務LIB一起使用您自己的活動總線。
為了真正掌握Qbit,必須掌握回調的概念。
回調是在QBIT中獲得異步響應的一種方式。
您調用服務方法,然後回電。
客戶代理可以有回調:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}
回調是Java 8消費者,具有一些可選的額外錯誤處理。
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}
可以阻止的服務應使用回調。因此,如果在下面的示例中阻止了加載器,則應真正使用回調而不是返回值。
公共類建議服務{
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
}
讓我們假裝loadUser
必須在本地緩存中查看,如果找不到用戶,請查看離子緩存,如果找不到的話,必須從UserService中詢問用戶,該用戶必須檢查其緩存,也許是後備來自數據庫或其他服務的用戶數據。換句話說, loadUser
可能會阻止IO。
我們的客戶不會阻止,但我們的服務確實可以。回到我們的RecommendationService
。如果我們為用戶負載獲得了很多緩存命中,也許塊不會那麼長,但是它會在那裡,每次我們必須在用戶中犯錯時,整個系統都會被填充。我們想做的是,如果我們無法處理推薦請求,我們就可以繼續對UserDataService
進行異步。當該異步回調返回時,我們就會處理該請求。同時,我們盡快處理建議列表請求。我們永遠不會阻止。
因此,讓我們重新審視服務。我們要做的第一件事是讓服務方法進行回調。在這樣做之前,讓我們制定一些規則。
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
現在,我們正在回電,我們可以決定何時要處理此推薦生成請求。如果我們需要的用戶數據是內存的,或者我們可以延遲它,我們可以立即執行此操作。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
}
請注意,如果在緩存中找到了用戶,我們將在內存中運行建議規則,並立即致電callback recommendationsCallback.accept(runRulesEngineAgainstUser(user))
。
有趣的部分是如果沒有加載用戶,我們該怎麼辦。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
...
在這裡,我們使用回調來加載用戶,當用戶加載時,我們調用handleLoadFromUserDataService
,該fromuserdataservice添加了有關處理回調的一些管理,以便我們仍然可以處理此調用,而不是現在。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...
使用這樣的lambdas使代碼更可讀性和簡潔,但請記住,不要深深地嵌套lambda表達式,否則您將創建代碼維護噩夢。明智地使用它們。
我們想要的是處理用戶從其商店加載用戶加載用戶後的建議請求。
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
}
重要的是,每次我們從UserDataService
收到回調電話時,我們都會執行CPU密集的建議規則並回電我們的呼叫者。好吧,我們要做的是在我們的回調隊列中加入一個可運行的,後來我們會迭代這些何時?
建議服務的隊列為空時,可以通知RecommendationService
,啟動了新批次,並且已達到批處理限制。這些都是從UserDataService
處理回調的好時機。
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}
重要的是要記住在處理另一個服務的另一個微服務的回調時,在處理客戶端的更多不斷訪問請求之前。本質上,您的客戶一直在等待(異步等待但仍在等待),這些客戶可能會像HTTP調用一樣代表開放的TCP/IP連接,因此最好在處理更多請求之前將其關閉,就像我們說他們已經在等待周圍有一個開放連接供用戶加載形式的用戶服務。
要了解有關回調的更多信息,PLESAE讀取[QBIT Java Microservice LIB回調基礎]([粗切] Qbit Microservice lib與回調一起工作)。
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...
您可以組成碎片工人(用於內存,線程安全,CPU密集型服務)或IO工人或與外國服務或外國公共汽車交談。
這是一個使用其中三名服務人員的工人池的示例:
假設您的服務可以做點什麼:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}
現在,這可以做些IO,您想擁有一定的運行庫,而不僅僅是一個運行,以便您可以並行進行IO。經過一些性能測試,您發現三個是魔術數字。
您想使用API訪問此服務:
public interface MultiWorkerClient {
void doSomeWork (...);
}
現在,讓我們創建這些銀行並使用它。
首先創建添加線程/隊列/微鍵的QBIT服務。
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
現在將它們添加到服務工作人員對像中。
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workers
您可以將服務,POJOS和方法消費者,方法調度程序添加到服務捆綁包中。服務捆綁包是QBIT的集成點。
讓我們添加我們的新服務人員。 ServiceWorkers是ServiceMethodDisPatcher。
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
我們可能會在服務捆綁包中添加輔助方法,因此大部分可以在單個呼叫中發生。
現在您可以開始使用工人。
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
現在,您可以使用Spring或Guice配置構建器和服務捆綁包。但是,您可以像上面的那樣做到這一點,這對於測試和理解QBIT內部詞非常有用。
QBIT還支持碎片服務的概念,該概念非常適合CPU等碎片資源(為用戶推薦引擎運行每個CPU核心上的規則引擎)。
QBIT不知道如何分解您的服務,您必須提示它。您可以通過碎片規則進行此操作。
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}
我們在一個應用程序上工作,在該應用程序中,對服務的第一個論點是用戶名,然後我們將其用於將CPU密集型內存中的內存性規則引擎打電話。該技術有效。 :)
ServiceWorkers類有一種創建碎片工人池的方法。
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}
要使用您的服務工作人員時,只需通過碎片鍵即可。
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});
然後將您的服務添加到服務工作人員組成中。
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}
然後像以前一樣將其添加到服務捆綁包中。
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
然後只使用它:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
}
public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}
ShardonBeanPath使您可以創建一個複雜的Bean路徑導航調用,並使用其屬性進行碎片。
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );
在此處閱讀有關服務碎片和服務工作人員的更多信息
您可以在Wiki中找到更多。也遵循提交。我們一直很忙。 QBIT用於Java -JSON,REST,WEBSOCKECT的微服務lib。