QBIT Java Micorservices LIB Учебники | QBIT Веб -сайт | QBIT использует REAKT | QBIB работает с Vert.x | Reakt Vertx
Java Microservice Lib. QBIT - это реагирующий программирование LIB для создания микросервисов - JSON, HTTP, WebSocket и REST. QBIT использует реактивное программирование для построения упругого отдыха, а также облачных веб -сервисов, основанных на веб -билете. SOA развивалась для мобильных и облачных. Обслуживание
Есть вопрос? Спросите здесь: qbit Google Group.
Все это очередь. У вас есть выбор. Вы можете принять его и контролировать. Вы можете оптимизировать для этого. Или вы можете спрятаться за абстракциями. QBIT открывает вас, чтобы заглянуть в то, что происходит, и позволяет вытащить рычаги, не продавая свою душу.
QBIT - это библиотека, а не структура. Вы можете смешать и сочетать QBIT с пружиной, Guice и т. Д.
QBIT теперь поддерживает Reakt, вызывшие обещания для локальных и удаленных клиентских прокси. Это дает хороший беглый API для асинхронного программирования.
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();
Обратные вызовы QBIT теперь также являются Reakt Callbacks, не нарушая контракт QBIT для обратных вызовов.
Смотрите Reakt, вызывшие обещания для более подробной информации.
QBIT публикуется в Maven Public Repo.
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'
Развернуто в нескольких крупных компаниях из списка Fortune 100. QBIT теперь работает с Vertx (автономным или встроенным). Вы также можете использовать QBIT для не QBIT Projects, это просто LIB.
Apache 2
QBIT имеет услуги Inproc, микросервисы REST и микросервисы WebSocket, а также автобус в области сервисного события (которая может быть на модуль или на приложение). Он поддерживает работников и услуги в памяти.
Прежде чем мы опишем больше, вот два образца службы:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {...
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}
В конце дня QBIT - это простая библиотека, а не основа. Ваше приложение - это не приложение QBIT, а приложение Java, которое использует QBIT LIB. QBIT позволяет вам работать с Java Util, одновременно и не пытается скрыть это от вас. Просто пытаюсь вытащить из этого жал.
Мы использовали методы в Boon и QBIT с большим успехом в высококлассных, высокопроизводительных, высоко масштабируемых приложениях. Мы помогли клиентам справиться с нагрузкой в 10 раз с 1/10 -м серверами своих конкурентов, используя методы в QBIT. QBIT - это то, что мы надоели, от доступа к очереди на настройку и потоков.
Идеи для Буна и Кбита часто бывают со всего Интернета. Мы совершаем ошибки. Укажите им. Как разработчик Boon и Qbit, мы являемся попутчиками. Если у вас есть идея или техника, которой вы хотите поделиться, мы слушаем.
Большим вдохновением для Boon/Qbit были Vertx, Akka, Go каналы, активные объекты, резьба модели квартиры, актер и документы с механическим симпатием.
У QBIT есть идеи, которые похожи на многие рамки. Мы все читаем одни и те же статьи. QBIT получил вдохновение в документах LMAX Disruptor и в этом блоге о очереди передачи ссылок по сравнению с нарушением. У нас было несколько теорий о очереди, которые пост в блоге вдохновил нас на то, чтобы попробовать их. Некоторые из этих теорий развернуты в некоторых из самых больших бэкэндов промежуточного программного обеспечения и чьи названные бренды известны во всем мире. И, таким образом, QBIT родился.
QBIT также принял много вдохновения благодаря великой работе, проделанной Тимом Фоксом на Vertx. Первый проект, использующий что -то, что на самом деле можно назвать QBIT (хотя и раннее QBIT), использовал Vertx на веб -/мобильном микросервисе для приложения, в котором потенциально может быть 80 миллионов пользователей. Именно этот опыт работы с Vertx и ранним QBIT привел к развитию и эволюции QBIT. QBIT построен на плечах гигантов (Netty/Vertx).
Разрушитель пружины: Нет. Вы можете использовать QBIT для записи плагинов для пружинного нарушения, я полагаю, но QBIT не конкурирует с нарушением пружины. Spring Boot/Spring MVC: Нет. Мы используем те же аннотации, но QBIT предназначен для высокоскоростных микросервисов в памяти. Это больше похоже на Akka, чем на Spring Boot. QBIT имеет подмножество функций Spring MVC, предназначенных только для микросервисов, то есть, WebSocket RPC, REST, JSON Marshaling и т. Д. Akka: Нет. Ну, возможно,. У Акки есть похожие понятия, но они используют другой подход. QBIT больше сосредоточен на Java, а микросервисы (Rest, JSON, WebSocket), чем Akka. LMAX Disruptor: Нет. Фактически, мы можем использовать разрушитель, как и из очередей, которые QBIT использует под крышками.
(Ранние тесты были удалены. Они были здесь. QBIT стал намного быстрее. Брингеринг QBIT на данный момент является движущейся целью. Ссылки и отчеты будут созданы.)
Примеры кода
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)
QBIT - это библиотека очередей для микросервисов. Это похоже на многие другие проекты, такие как Akka, Spring Reactor и т. Д. Qbit - это просто библиотека, а не платформа. У QBIT есть библиотеки, чтобы поставить сервис за очередью. Вы можете использовать очередь QBIT напрямую или вы можете создать услугу. Услуги QBIT могут быть обнаружены WebSocket, HTTP, HTTP -конвейером и другими типами удаленного. Служба в QBIT - это класс Java, методы которых выполняются за очередью обслуживания. QBIT реализует потоки модели квартиры и похож на модель актера, или лучшее описание будет активными объектами. QBIT не использует разрушитель (но может). Он использует обычные очереди Java. QBIT может делать к северу от 100 миллионов вызовов Ping Pong в секунду, что является удивительной скоростью (наблюдается до 200 м). QBIT также поддерживает вызова службы через REST и WebSocket. QBIT-это микросервисы в чистом веб-смысле: JSON, HTTP, WebSocket и т. Д. QBIT использует микроорганизация для проталкивания сообщений через трубу (очередь, IO и т. Д.) Быстро быстрее, чтобы уменьшить передачу резьбы.
QBIT - это Java MicroService LIB, поддерживающий отдых, JSON и WebSocket. Это написано на Java, но мы могли бы однажды написать версию в Rust или Go или C# (но это потребует большой зарплаты).
Сервис Pojo (простой старый объект Java) за очередью, которая может получать вызовы методов с помощью прокси -вызовов или событий (может иметь одну потоку, управляющие событиями, вызовы методов и ответы или два для вызовов методов и другой для ответов, чтобы обработчики ответов обработчики ответов и обработки ответов. Не блокируйте обслуживание. Услуги могут использовать Spring MVC Style Annotations Rest, чтобы подвергнуть себя внешнему миру с помощью отдыха и WebSocket.
ServiceBundle Много Pojos за одной очередью ответов, и многие получают очереди. Там может быть один поток для всех ответов или нет. Они также могут быть одной очередью получения.
Очередь ветка, управляя очередью. Он поддерживает партии. У него есть события для пустого, достижения, запускаемого, idle. Вы можете выслушать эти события из услуг, которые сидят за очередью. Вам не нужно использовать услуги. Вы можете использовать прямую очередь. В QBIT у вас есть очереди отправителя и очереди приемников. Они разделены, чтобы поддержать микроэлементы.
ServiceEndPointServer ServiceBundle, который подвергается общению с отдыхом и WebSocket.
EventBus EventBus - это способ отправить много сообщений в услуги, которые могут быть свободно связаны.
ClientProxy ClientProxy - это способ вызвать услуги через Async Interface, Service может быть Inproc (тот же процесс) или удаленным через WebSocket.
Неблокирующий QBIT-это не блокирующий либера. Вы используете обратные вызовы через Java 8 Lambdas. Вы также можете отправить сообщения событий и получить ответы. Обмен сообщениями встроен в систему, поэтому вы можете легко координировать сложные задачи. QBIT использует объектно-ориентированный подход к разработке услуг, поэтому услуги выглядят как обычные услуги Java, которые вы уже пишете, но услуги живут за очередью/веткой. Это не новая концепция. Microsoft сделала это с DCOM/COM и назвала его активными объектами. Акка делает это с актерами и назвал их сильно напечатанными актерами. Важными понятиями является то, что вы получаете скорость реактивного и актерского обмена сообщениями, но вы развиваете естественный подход ООП. Qbit не первый. Qbit не единственный.
Скорость Qbit очень быстрая. Конечно, есть много возможностей для улучшения. Но уже 200 м+ TPS InProc Ping Pong, 10-миллионовно 20 м+ шины событий TPS, 500K TPS RPC вызовы по WebSocketocket/JSON и т. Д. Необходимо выполнить больше работы для улучшения скорости, но теперь это достаточно быстро, где мы сосредотачиваемся больше на пользователе. Поддержка JSON использует BOON по умолчанию, который в 4 раза быстрее, чем другие анализаторы JSON для остальных/JSON, вариант использования WebSocket/JSON.
Реактивное программирование QBIT обеспечивает реактор для управления асинхронными вызовами. Это позволяет обрабатывать обратные вызовы на том же потоке, который их называли, и обеспечивает время ожидания и обработки ошибок. Читать учебник по реактору для создания реактивного программирования микро службы
Обнаружение обслуживания встроено в поддержку Service Discovery . Это включает интеграцию с консулом.
СТАТИЗИЧЕСКИЕ СЛУЖБА Встроен в поддержку статистики. Стативное обслуживание может быть интегрировано с STATSD (графит, графана, датадог и т. Д.), Чтобы опубликовать пассивную статистику. Или вы можете запросить двигатель статистики и отреагировать на статистику (количество времени, времени и уровней). StatsService - это реактивная статистическая система, которую можно сгруппировать. Стативное обслуживание реактивно в том смысле, что ваши услуги могут публиковать его, запрашивать его и реагировать на основе результатов. Вы можете реализовать такие вещи, как ограничение скорости и отреагировать на увеличение скорости чего -либо. Система ServiceSiveSovery интегрируется с HealthSystem и Consul, чтобы свернуть каждую из ваших внутренних услуг, которые составляют вам микро обслуживание, и публикуют композит, доступный для вашего микроэлемента в одну конечную точку HTTP или переключатель Dead Mans In Consul (TTL).
Разговор дешево. Давайте посмотрим на какой -то код. Вы можете получить подробную прогулку в вики. У нас уже много документации.
Мы создадим услугу, которая выставлена через Rest/Json.
Чтобы запросить размер списка TODO:
curl localhost:8080/services/todo-service/todo/count
Чтобы добавить новый элемент Todo.
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todo
Чтобы получить список предметов Todo
curl http://localhost:8080/services/todo-service/todo/
Пример TODO будет использовать и отслеживать элементы TODO.
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;
ToDoService использует аннотации в стиле Spring MVC.
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
}
Вы можете опубликовать/поместить не JSON, и вы можете захватить тело в виде String
или как byte[]
. Если тип контента устанавливается на что-либо, кроме application/json
, а ваше тело определяется строкой или байтом []. Это работает автоматически. (Тип контента должен быть установлен.)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
}
По умолчанию QBIT отправляет 200
(ОК) для невоидного вызова (вызов, который имеет возврат или обратный вызов). Если в остальной операции нет возврата или отсутствия обратного вызова, QBIT отправляет 202
(принято). Могут быть времена, когда вы хотите отправить 201 (созданный) или какой -то другой код, который не является исключением. Вы можете сделать это, установив code
на @RequestMapping
. По умолчанию код --1, что означает использование поведения по умолчанию (200 для успеха, 202 для одностороннего сообщения и 500 для ошибок).
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
}
Callbacks
также могут быть использованы для внутренних служб. Часто вы используете CallbackBuilder или QBIT -реактор для управления сервисными вызовами.
Вам не нужно возвращать вызовы в форме JSON. Вы можете вернуть любой двоичный или любой текст, используя HttpBinaryResponse
и HttpTextResponse
.
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
}
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}
Почему мы выбрали аннотации весеннего стиля?
Теперь просто запустите это.
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}
Вот и все. Существует также поддержка Box WebSocket с генерацией прокси -сервера клиента, поэтому вы можете позвонить в услуги по скорости миллионов вызовов в секунду.
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}
Вы всегда можете вызвать услуги QBIT через прокси WebSocket. Преимущество прокси -прокси WebSocket заключается в том, что он позволяет вам выполнять 1M RPC+ второй (1 миллион удаленных вызовов каждую секунду).
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );
Вывод 3.
3
Выше использует интерфейс прокси -концерта WebSocket, чтобы вызвать службу Async.
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}
Создайте клиент Service Service, который обслуживается.
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" );
В настоящее время clientBuilder
загрузит все конечные точки сервиса, которые зарегистрированы под именем службы, и случайным образом выберет его.
ServiceedCovery включает в себя консул, просмотр файлов JSON на диске и DNS. Также легко написать свое собственное обнаружение и подключить его к QBIT.
В будущем мы сможем groundrobin звонки или наборы Shard в службу WebSocket и/или предоставлять автоматическое сбой, если соединение закрыто. Мы делаем это для автобуса Event, которая использует Service Discovery, но он еще не запечен в клиентские заглушки на основе WebSocket.
Последний пример клиента использует WebSocket. Вы также можете просто использовать отдых и фактически использовать параметры URI, которые мы настраиваем. Отдых хорош, но он будет медленнее, чем поддержка WebSocket.
QBIT отправляется с хорошим маленьким HTTP -клиентом. Мы можем использовать это.
Вы можете использовать его для отправки Async -вызовов и сообщений WebSocket с клиентом HTTP.
Здесь мы будем использовать клиент HTTP, чтобы вызвать наш удаленный метод:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );
Вывод 4.
4
Вы также можете получить доступ к сервису от curl.
$ curl http://localhost:7000/services/adder-service/add/2/2
Смотрите этот полный пример здесь: QBIT MicroService Учебное пособие.
QBIT URI Params и Proxy Proxy Client WebSocket
У QBIT есть библиотека для работы и написания асинхронных микросервисов, которые легки и интересны в использовании.
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
});
/* Start the server. */
httpServer . start ();
/** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
/* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
Теперь остановите сервер и клиент. Довольно просто, а?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
Основное внимание уделяется простости использования и использованию Java 8 Lambdas для обратных вызовов, чтобы код был плотным и маленьким.
Узнайте больше о поддержке WebSocket в стиле микросервиса QBIT
Теперь давайте попробуем нашего HTTP -клиента.
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
Вы просто передаете URL, порт, а затем звоните.
Теперь вы можете начать отправлять HTTP -запросы.
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );
Ответ HTTP просто содержит результаты с сервера.
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}
Существуют вспомогательные методы для синхронизации HTTP. Получите звонки.
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
Метод POTS - это вспомогательный метод, который он выполняет System.out.println, кстати, более или менее.
Первые пять параметров покрыты. Помимо пяти, вы должны использовать Httpbuilder.
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );
Есть асинхронные звонки для получения.
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );
[Найдите больше о простых в использовании, быстрое MicroService http-клиент здесь] (https://github.com/advantageous/qbit/wiki/%5BDOC%5D-USING-QBIT-MicroService-lib's-httpclient-get, post,, -et-al, -json, -java-8-lambda).
QBIT позволяет запускать и запуск услуг, стоящих за очередями.
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());
Подробное руководство по услугам в области продуктов написано.
QBIT Event Bus более подробный пример
QBIT также имеет автобус с сервисным мероприятием. Этот пример является примером услуг услуг сотрудников.
У нас есть два канала.
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
Объект сотрудника выглядит так:
public static class Employee {
final String firstName ;
final int employeeId ;
В этом примере есть три услуги: employeeeeehiringservice, льготы и заработную плату.
Эти услуги являются услугами Inproc. QBIT также поддерживает удаленные услуги WebSocket, HTTP и REST, но пока давайте сосредоточимся на услугах Inproc. Если вы понимаете Inproc, вы поймете удаленное.
Сотрудники -работники фактически выпускают мероприятия в две другие услуги.
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}
Обратите внимание, что мы звоним Sendarray, чтобы отправить сотрудника и его зарплату. Слушатель для PayRoll_Adjustment_Channel должен будет обрабатывать как сотрудника, так и Int, которая представляет собой зарплату новых сотрудников. Вы также можете использовать прокси -серверы автобусов событий, чтобы вам вообще не приходилось звонить в автобус событий.
Преимущества Service слушает для нанятых новых сотрудников, чтобы он мог зарегистрировать их в систему льгот.
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}
Папа должен получить оплату.
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}
Сотрудник является объектом сотрудника из работника.
Таким образом, вы можете получить свои льготы и заплатить!
Найдите здесь более подробную информацию:
QBIT Event Bus более подробный пример
Вы можете определить свой собственный интерфейс на автобусе событий, и вы можете использовать свои собственные шины событий с помощью QBIT. Каждый модуль в вашем сервисе может иметь свой собственный внутренний автобус.
Чтобы узнать больше о прочтении: QBIT MicroService, работая с автобусом частного события и QBIT Java MicroService LIB, используя свой собственный интерфейс с шиной события.
Чтобы по -настоящему понять QBIT, нужно понять концепции обратного вызова.
Обратный вызов - это способ получить асинхронный ответ в QBIT.
Вы называете метод службы, и он вызывает вас обратно.
Клиентские прокси могут иметь обратные вызовы:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}
Обратные вызовы - это потребители Java 8 с дополнительной дополнительной обработкой ошибок.
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}
Услуги, которые могут блокировать, должны использовать обратные вызовы. Таким образом, если LoadUser заблокировал в следующем примере, он действительно должен использовать обратный вызов вместо возврата значения.
Рекомендации публичного класса {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
}
Давайте притворяемся, что loadUser
должен смотреть в локальный кэш, и если пользователь не найден, посмотрите в кэше вне HEAP и, если не найдено, он должен попросить пользователя из пользователя пользователя, который должен проверить его кэши и, возможно, запасной на загрузку Пользовательские данные из базы данных или из других служб. Другими словами, loadUser
может потенциально блокировать в io.
Наш клиент не блокирует, но наш сервис делает. Возвращаясь к нашей RecommendationService
. Если мы получим много ударов в кеш для пользовательских нагрузок, возможно, блок не будет таким длинным, но он будет там, и каждый раз, когда нам придется винить в пользователе, вся система появляется. То, что мы хотим иметь в состоянии сделать, это если мы не можем обработать запрос на рекомендацию, мы идем вперед и сделаем асинхронный звонок в UserDataService
. Когда этот асинхронный обратный вызов возвращается, мы обрабатываем этот запрос. В то же время мы обрабатываем перечисления рекомендаций как можно быстрее. Мы никогда не блокируем.
Итак, давайте вернемся к услуге. Первое, что мы собираемся сделать, это заставить метод обслуживания провести обратный вызов. Прежде чем мы это сделаем, давайте установим некоторые правила.
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
Теперь мы проводим обратный вызов, и мы можем решить, когда хотим обработать этот запрос на получение рекомендаций. Мы можем сделать это сразу, если пользовательские данные, которые нам нужны, находятся в памяти или мы можем отложить их.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
}
Обратите внимание, что если пользователь найден в кэше, мы запускаем наши правила рекомендации в памяти и сразу же называем вызов recommendationsCallback.accept(runRulesEngineAgainstUser(user))
.
Интересно, что мы делаем, если не загружаем пользователя.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
...
Здесь мы используем обратный вызов для загрузки пользователя, и когда пользователь загружается, мы называем handleLoadFromUserDataService
, который добавляет некоторое управление по поводу обращения с обратным вызовом, чтобы мы все еще могли обрабатывать этот вызов, но не сейчас.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...
Использование Lambdas, подобных этому, делает код более читабельным и территочным, но помните, что не гнездитесь о Lambda выражения, иначе вы создадите кошмар для обслуживания кода. Используйте их разумно.
Мы хотим обрабатывать запрос на рекомендации после того, как система пользователя загружает пользователя из своего магазина.
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
}
Важной частью является то, что каждый раз, когда мы получаем обратный вызов от UserDataService
, мы затем выполняем наши правила интенсивной рекомендации ЦП и обратный вызыв. Ну, не совсем то, что мы делаем, так это внедрение в нашу очередь обратных вызовов, а затем мы будем повторять их, кроме когда?
RecommendationService
можно уведомлять, когда его очередь пуст, она запустила новую партию и когда она достигла партийного предела. Это все хорошие времена для обработки обратных вызовов от UserDataService
.
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}
Важно помнить при обработке обратных вызовов от другого микросервиса, который вы хотите обработать обратные вызовы от другой услуги, прежде чем вы обрабатываете больше запросов на комплексные от вас, клиенты. По сути, у вас есть клиенты, которые ждали (асинхронное ожидание, но все же), и эти клиенты могут представлять открытое соединение TCP/IP, например, http Вокруг открытого подключения для пользователей для загрузки пользовательской службы.
Чтобы узнать больше о обратных вызовах, Plesae Read [Qbit Java Microservice Lib Callback Основы] ([грубое разрезание] Qbit Microservice Lib Работает с обратными вызовами).
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...
Вы можете составить оскорбленные работников (для в памяти, безопасных поток, интенсивных услуг процессора) или работников для ввода ИО или общения с иностранными услугами или иностранными автобусами.
Вот пример, который использует работник с тремя работниками обслуживания:
Допустим, у вас есть услуга, которая что -то делает:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}
Теперь это делает какой -то IO, и вы хотите, чтобы это банк этого не только был одним из них, так что вы можете сделать IO параллельно. После некоторого тестирования производительности вы узнали, что три - это волшебное число.
Вы хотите использовать свой API для доступа к этой услуге:
public interface MultiWorkerClient {
void doSomeWork (...);
}
Теперь давайте создадим его банк и используем его.
Сначала создайте сервисы QBIT, которые добавляют поток/очередь/MicroBatch.
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
Теперь добавьте их в объект работников обслуживания.
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workers
Вы можете добавить услуги, POJO и потребителей методов, диспетчеры метода в пакет услуг. Служба -пакет является точкой интеграции в QBIT.
Давайте добавим наших новых работников обслуживания. ServiceWorkers - это ServiceMethodDispatcher.
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
Мы, вероятно, собираемся добавить вспомогательный метод в пакет обслуживания, чтобы большая часть этого может произойти за один вызов.
Теперь вы можете начать использовать своих работников.
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
Теперь вы можете использовать Spring или Guice для настройки строителей и пакета обслуживания. Но вы можете просто сделать это, как приведенное выше, что полезно для тестирования и понимания внутренних внутренних участников QBIT.
QBIT также поддерживает концепцию Sharded Services, которая полезна для ресурсов Sharding, таких как CPU (запустите двигатель правил на каждом ядре ЦП для механизма рекомендаций пользователей).
QBIT не знает, как положить свои услуги, вы должны дать им намек. Вы делаете это через правило Shard.
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}
Мы работали над приложением, где первым аргументом в Сервисах было имя пользователя, а затем мы использовали его для призывов Shard в интенсивный процессор интенсивного в памяти. Эта техника работает. :)
У класса обслуживания есть метод создания бассейна работников Sharded.
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}
Чтобы использовать вас, просто проходите клавиш Shard, когда вы создаете работников обслуживания.
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});
Затем добавьте свои услуги в композицию работников.
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}
Затем добавьте его в пакет обслуживания, как и раньше.
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
Тогда просто используйте его:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
}
public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}
Shardonbeanpath позволяет вам создать сложный вызов навигации по пути и использовать его свойство для Shard.
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );
Узнайте больше о службах Sharding и Service Wordings здесь
Вы можете найти гораздо больше в вики. Также следуйте за коммитами. Мы были заняты бобр. Qbit the microservice lib для java - json, rest, websocket.