QBIT Java Micorservices Lib Tutorials | Sitio web de QBIT | Qbit usa Reakt | Qbit funciona con vert.x | Reakt vertx
El microservicio de Java Lib. QBIT es una programación reactiva LIB para construir microservicios: JSON, HTTP, WebSocket y REST. QBIT utiliza la programación reactiva para crear un descanso elástico y los servicios web amigables con la nube basados en la nube basados en WebSockets. SOA evolucionó para dispositivos móviles y nubes. Servicio de servicios, salud, estadísticas reactivas, eventos, programación reactiva idiomática Java para microservicios.
¿Tienes una pregunta? Pregunte aquí: QBIT Google Group.
Todo es una cola. Tienes una opción. Puedes abrazarlo y controlarlo. Puedes optimizarlo. O puedes esconderte detrás de las abstracciones. QBIT te abre para asomarse en lo que está sucediendo, y te permite sacar algunas palancas sin vender tu alma.
QBIT es una biblioteca, no un marco. Puede mezclar y combinar QBIT con Spring, Guice, etc.
QBIT ahora admite promesas invocables de Reakt para proxies de clientes locales y remotos. Esto le da una buena API fluida para la programación de asíncrono.
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();
Las devoluciones de llamada de QBIT ahora también son devoluciones de llamada Reakt sin romper el contrato de QBIT para devoluciones de llamada.
Vea las promesas invocables de Reakt para obtener más detalles.
QBIT se publica para el repositorio público Maven.
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'
Desplegado en varias grandes empresas Fortune 100. QBIT ahora funciona con VertX (independiente o incrustado). También puede usar QBIT en proyectos que no son de Qbit, es solo una lib.
Apache 2
QBIT tiene servicios INPROC, microservicios REST y microservicios de WebSocket, así como un bus de eventos de servicio en PROC (que puede ser por módulo o por aplicación). Apoya a los trabajadores y servicios en memoria.
Antes de describir más, aquí hay dos servicios de muestra:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {...
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}
Al final del día, QBIT es una biblioteca simple, no un marco. Su aplicación no es una aplicación QBIT, sino una aplicación Java que usa el QBIT lib. QBIT le permite trabajar con Java utilizado y no se esfuerza por ocultarlo. Solo trato de sacar el aguijón.
Hemos utilizado técnicas en Boon y QBIT con gran éxito en aplicaciones de alta gama, de alto rendimiento y alta escalable. Ayudamos a los clientes a manejar 10 veces la carga con 1/10 los servidores de sus competidores utilizando técnicas en QBIT. QBIT estamos hartos de que el acceso a la cola de ajuste a mano y los hilos.
Las ideas para Boon y QBIT a menudo provienen de toda la web. Cometemos errores. Apuntarlos. Como desarrollador de Boon y QBIT, somos compañeros de viaje. Si tiene una idea o técnica que desea compartir, escuchamos.
Una gran inspiración para Boon/Qbit fue VertX, Akka, canales GO, objetos activos, roscado del modelo de apartamentos, actor y los documentos de simpatía mecánica.
QBIT tiene ideas similares a muchos marcos. Todos estamos leyendo los mismos documentos. QBIT se inspiró en los documentos de disruptor LMAX y esta publicación de blog sobre la cola de transferencia de enlaces versus disruptor. Teníamos algunas teorías sobre las colas que la publicación de blog nos inspiró a probarlas. Algunas de estas teorías se implementan en algunos de los mayores backends de middleware y cuyas marcas de nombre son conocidas en todo el mundo. Y así nació Qbit.
QBIT también se inspiró mucho por el gran trabajo realizado por Tim Fox en Vertx. El primer proyecto que usa algo que en realidad podría llamarse QBIT (aunque QBIT temprano) fue usar VertX en un microservicio web/móvil para una aplicación que podría tener 80 millones de usuarios. Fue esta experiencia con VertX y QBIT temprano lo que condujo al desarrollo y la evolución de QBIT. QBIT se basa en los hombros de los gigantes (Netty/VertX).
Spring Disruptor: No. Puede usar QBIT para escribir complementos para Spring Disruptor. Supongo, pero QBIT no compite con Spring Disruptor. Spring Boot/Spring MVC: No. Usamos las mismas anotaciones pero QBIT está orientado a microservicios en memoria de alta velocidad. Es más como Akka que Spring Boot. QBIT tiene un subconjunto de las características de Spring MVC engranadas solo para microservicios, es decir, WebSocket RPC, REST, JSON Marshaling, etc. Akka: No. Bueno, tal vez. Akka tiene conceptos similares, pero adoptan un enfoque diferente. QBIT está más enfocado en Java y microservicios (REST, JSON, WebSocket) que Akka. LMAX Disruptor: No. De hecho, podemos usar el disruptor como en las colas que QBIT usa debajo de las cubiertas.
(Se han eliminado los primeros puntos de referencia. Estaron aquí. QBIT se volvió mucho más rápido. El Benchmarking QBIT es un objetivo móvil en este momento. Se crearán enlaces e informes).
Ejemplos de código
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)
QBIT es una biblioteca de colas para microservicios. Es similar a muchos otros proyectos como Akka, Spring Reactor, etc. QBIT es solo una biblioteca, no una plataforma. QBIT tiene bibliotecas para poner un servicio detrás de una cola. Puede usar las colas de QBIT directamente o puede crear un servicio. Los servicios de QBIT pueden ser expuestos por WebSocket, HTTP, HTTP Pipeline y otros tipos de remotos. Un servicio en QBIT es una clase Java cuyos métodos se ejecutan detrás de las colas de servicio. QBIT implementa el enhebrado del modelo de apartamento y es similar al modelo de actor o una mejor descripción sería objetos activos. QBIT no usa un disruptor (pero podría). Utiliza colas regulares de Java. QBIT puede hacer al norte de 100 millones de llamadas de ping pong por segundo, lo que es una velocidad increíble (vista tan alta como 200 m). QBIT también admite servicios de llamadas a través de REST y WebSocket. QBIT es microservicios en el sentido web puro: JSON, HTTP, WebSocket, etc. QBIT utiliza micro lotes para empujar mensajes a través de la tubería (cola, io, etc.) más rápido para reducir la transferencia de roscas.
QBIT es un microservicio Java Lib Supporting REST, JSON y WebSocket. Está escrito en Java, pero algún día podríamos escribir una versión en Rust o Go o C# (pero eso requeriría un gran día de pago).
Service POJO (objeto Java Old Old) detrás de una cola que puede recibir llamadas de método a través de llamadas o eventos proxy (puede tener un hilo de administrar eventos, llamadas de método y dos o dos para llamadas y eventos de método y el otro para respuestas para que los manejadores de respuesta No bloquee el servicio. Los servicios pueden usar anotaciones de descanso de estilo MVC de primavera para exponerse al mundo exterior a través de REST y WebSocket.
ServiceBundle muchos pojos detrás de una cola de respuesta y muchos reciben colas. Puede haber un hilo para todas las respuestas o no. También pueden ser una cola de recibir.
Cola un hilo que administra una cola. Admite un lote. Tiene eventos para vacío, alcanzado, inicioBatch, inactivo. Puede escuchar estos eventos de servicios que se encuentran detrás de una cola. No tiene que usar servicios. Puedes usar la cola directa. En QBIT, tiene colas de remitente y colas de receptores. Están separados para admitir micro-lotes.
ServiceEndPointServer ServiceBundle que está expuesto a la comunicación REST y WebSocket.
EventBus EventBus es una forma de enviar muchos mensajes a los servicios que pueden estar libremente acoplados.
ClientProxy ClientProxy es una forma de invocar el servicio a través de la interfaz Async, el servicio puede ser inProc (mismo proceso) o remover a través de WebSocket.
QBIT sin bloqueo es una lib. Usas devoluciones de llamada a través de Java 8 Lambdas. También puede enviar mensajes de eventos y obtener respuestas. La mensajería está integrada en el sistema para que pueda coordinar fácilmente tareas complejas. QBIT adopta un enfoque orientado a objetos para el desarrollo de servicios para que los servicios parezcan servicios Java normales que ya escribe, pero los servicios viven detrás de una cola/hilo. Este no es un concepto nuevo. Microsoft hizo esto con DCOM/COM y lo llamó objetos activos. Akka lo hace con los actores y los llamó actores fuertemente escritos. Los conceptos importantes es que obtienes la velocidad de los mensajes reactivos y de estilo actor, pero se desarrolla en un enfoque de OOP natural. QBIT no es el primero. QBIT no es el único.
Speed QBIT es muy rápido. Por supuesto, hay mucho margen de mejora. Pero ya 200 m+ tps inproc ping pong, bus de eventos de 10m-20m+ tps, 500k tps rpc llamadas a través de websocket/json, etc. Se necesita hacer más trabajo para mejorar la velocidad, pero ahora es lo suficientemente rápido donde nos centramos más en la usabilidad. El soporte JSON utiliza boon de forma predeterminada, que es hasta 4 veces más rápido que otros analizadores JSON para el caso de uso REST/JSON, WebSocket/JSON.
Reactive Programming QBIT proporciona un reactor para administrar las llamadas de async. Esto permite que las devoluciones de llamada se manejen en el mismo hilo que las llamó y proporciona tiempo de espera y manejo de errores. Lea el tutorial del reactor para crear programación de servicios de micro reactivos
Descubrimiento de servicio integrado Soporte para el descubrimiento de servicios. Esto incluye la integración con el cónsul.
STATService incorporado en soporte para estadísticas. El servicio STATS se puede integrar con STATSD (Graphite, Grafana, Datadog, etc.) para publicar estadísticas pasivas. O puede consultar el motor de estadísticas y reaccionar a las estadísticas (recuentos, tiempos y niveles). El STATSService es un sistema de estadísticas reactivas que se puede agrupar. StatService es reactivo en el sentido de que sus servicios pueden publicarlo y consultarlo y reaccionar según los resultados. Puede implementar cosas como la limitación de la velocidad y reaccionar a una mayor velocidad de algo. El sistema de descubrimiento de servicios se integra con el sistema de salud y el cónsul para enrollar cada uno de sus servicios internos que componen su servicio micro y publiquen el compuesto a la disposición de su micro servicio a un solo punto final HTTP o un interruptor de hombre muerto en Consul (TTL).
Hablar es barato. Veamos algún código. Puedes dar un paseo detallado en la wiki. Ya tenemos mucha documentación.
Crearemos un servicio expuesto a través de REST/JSON.
Para consultar el tamaño de la lista de TODO:
curl localhost:8080/services/todo-service/todo/count
Para agregar un nuevo artículo de TODO.
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todo
Para obtener una lista de elementos de TODO
curl http://localhost:8080/services/todo-service/todo/
El ejemplo de TODO usará y rastreará los elementos de TODO.
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;
El Todoservice utiliza anotaciones de estilo MVC Spring.
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
}
Puede publicar/poner no json y puede capturar el cuerpo como una String
o como un byte[]
. Si el tipo de contenido se establece en algo más que application/json
y su cuerpo se define una cadena o byte []. Esto funciona automáticamente. (El tipo de contenido debe estar configurado).
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
}
Por defecto, QBIT envía un 200
(OK) para una llamada no Void (una llamada que tiene una devolución o una devolución de llamada). Si la operación de descanso no tiene devolución o ninguna devolución de llamada, QBIT envía un 202
(aceptado). Puede haber momentos en que desee enviar un 201 (creado) o algún otro código que no sea una excepción. Puede hacerlo configurando code
en @RequestMapping
. Por defecto, el código es -1, lo que significa usar el comportamiento predeterminado (200 para el éxito, 202 para un mensaje unidireccional y 500 para errores).
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
}
Callbacks
también se pueden utilizar para servicios internos. A menudo es el caso de que use una devolución de llamada o un reactor QBIT para administrar las llamadas de servicio.
No tiene que devolver las llamadas de reposo de JSON. Puede devolver cualquier texto binario o cualquier texto utilizando HttpBinaryResponse
y HttpTextResponse
.
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
}
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}
¿Por qué elegimos anotaciones de estilo de primavera?
Ahora solo comienza.
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}
Eso es todo. También hay soporte WebSocket fuera de la caja con la generación del proxy del lado del cliente para que pueda llamar a los servicios a una tasa de millones de llamadas por segundo.
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}
Siempre puede invocar los servicios de QBIT a través de un proxy WebSocket. La ventaja de un proxy de WebSocket es que le permite ejecutar 1M RPC+ un segundo (1 millón de llamadas remotas cada segundo).
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );
La salida es 3.
3
Lo anterior utiliza una interfaz WebSocket proxy para llamar al servicio Async.
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}
Cree un cliente de servicio WebSocket que sea consciente de los servicios de servicio.
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" );
Actualmente, el clientBuilder
cargará todos los puntos finales del servicio que están registrados bajo el nombre del servicio y elegirán al azar uno.
ServicedScovery incluye cónsul, ver archivos JSON en disco y DNS. También es fácil escribir su propio descubrimiento de servicios y enchufarlo a QBIT.
En el futuro podemos ver llamadas de Roundrobin o llamadas de fragmentos al servicio WebSocket y/o proporcionar un fracaso automático si la conexión está cerrada. Hacemos esto para el bus de eventos que utiliza el descubrimiento de servicios, pero todavía no se hornea en los trozos de clientes basados en WebSocket.
El último ejemplo de cliente usa WebSocket. También puede usar REST y realmente usar los parámetros URI que configuramos. El descanso es bueno, pero será más lento que el soporte de WebSocket.
QBIT se envía con un pequeño cliente HTTP agradable. Podemos usarlo.
Puede usarlo para enviar llamadas Async y mensajes de WebSocket con el cliente HTTP.
Aquí usaremos el cliente HTTP para invocar nuestro método remoto:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );
La salida es 4.
4
También puede acceder al servicio desde Curl.
$ curl http://localhost:7000/services/adder-service/add/2/2
Vea este ejemplo completo aquí: Tutorial de Microservice de QBIT.
QBIT URI Params y WebSocket proxy Client
QBIT tiene una biblioteca para trabajar y escribir microservicios de async que sea liviano y divertido de usar.
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
});
/* Start the server. */
httpServer . start ();
/** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
/* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
Ahora detenga el servidor y el cliente. Bastante fácil, ¿eh?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
La atención se centra en la facilidad de uso y en el uso de Lambdas Java 8 para devoluciones de llamada para que el código sea apretado y pequeño.
Obtenga más información sobre el soporte de WebSocket de Microservice Style de QBIT aquí
Ahora, probemos nuestro cliente HTTP.
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
Simplemente pase la URL, el puerto y luego llame al inicio.
Ahora puede comenzar a enviar solicitudes HTTP.
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );
Una respuesta HTTP solo contiene los resultados del servidor.
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}
Existen métodos auxiliares para sincronizar http get llamadas.
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
El método PUTS es un método auxiliar que hace System.out.println más o menos por cierto.
Los primeros cinco parámetros están cubiertos. Más allá de las cinco, debes usar el httpbuilder.
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );
También hay llamadas de asíncrono para obtener.
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );
[Encuentre más sobre el cliente HTTP de microservicio rápido fácil de usar aquí] (https://github.com/advantageus/qbit/wiki/%5bdoc%5D-Using-qbit-microservice-lib's-httpclient-post ,-post ,-post, -et-al, -json, -java-8-lambda).
QBIT permite que los servicios detrás de las colas también se ejecuten en el proceso.
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());
Se está escribiendo un tutorial detallado sobre los servicios in-proc.
QBIT Event Bus más detallado Ejemplo
QBIT también tiene un autobús de evento de servicio. Este ejemplo es un ejemplo de servicios de beneficios para empleados.
Tenemos dos canales.
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
Un objeto de empleado se ve así:
public static class Employee {
final String firstName ;
final int employeeId ;
Este ejemplo tiene tres servicios: empleado de servicio, servicio de beneficios y servicio de nómina.
Estos servicios son servicios INPROC. QBIT también admite los servicios Remote WebSocket, HTTP y REST, pero por ahora, centrémonos en los servicios INPROC. Si comprende InProc, comprenderá remoto.
El empleado HiringService en realidad dispara los eventos a otros dos servicios.
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}
Tenga en cuenta que llamamos a SendArray para que podamos enviar al empleado y su salario. El oyente de Payroll_adjustment_Channel tendrá que manejar tanto a un empleado como un INT que represente el salario de los nuevos empleados. También puede usar los proxies de bus de eventos para que no tenga que llamar al bus de eventos.
El servicio de beneficios escucha para los nuevos empleados que se contratan para que pueda inscribirlos en el sistema de beneficios.
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}
Papá necesita que le paguen.
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}
El empleado es el objeto del empleado del empleado de servicio.
¡Para que pueda obtener sus beneficios y pagar!
Encuentra más detalles aquí:
QBIT Event Bus más detallado Ejemplo
Puede definir su propia interfaz al bus de eventos y puede usar sus propios autobuses de eventos con QBIT. Cada módulo de su servicio puede tener su propio bus de eventos internos.
Para obtener más información, lea: Microservicio de QBIT trabajando con un bus de eventos privado y Lib de microservicio de Java JBIT utilizando su propia interfaz para el bus de eventos.
Para comprender realmente QBIT, uno debe comprender los conceptos de una devolución de llamada.
Una devolución de llamada es una forma de obtener una respuesta asíncrata en QBIT.
Llamas a un método de servicio y te devuelve la llamada.
Los proxies del cliente pueden tener devoluciones de llamada:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}
Las devoluciones de llamada son consumidores de Java 8 con un manejo de errores adicional opcional.
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}
Los servicios que pueden bloquear deben usar devoluciones de llamada. Por lo tanto, si LoadUser bloqueó en el siguiente ejemplo, realmente debería usar una devolución de llamada en lugar de devolver un valor.
Class Public Recomendationservice {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
}
Permitemos que loadUser
tiene que mirar en un caché local, y si no se encuentra el usuario, busque en un caché fuera de tiempo y, si no se encuentra, debe solicitar al usuario del servicio de usuarios que debe verificar sus cachés y quizás devolios para cargar el Datos de usuario de una base de datos o de otros servicios. En otras palabras, loadUser
puede bloquear en IO.
Nuestro cliente no bloquea, pero nuestro servicio sí. Volviendo a nuestro RecommendationService
. Si obtenemos muchos éxitos de caché para las cargas de los usuarios, tal vez el bloque no será tan largo, pero estará allí y cada vez que tengamos que criticar en un usuario, todo el sistema está bien. Lo que queremos poder hacer es si no podemos manejar la solicitud de recomendación, seguimos adelante y hacemos una llamada de async para el UserDataService
. Cuando regresa esa devolución de llamada Async, entonces manejamos esa solicitud. Mientras tanto, manejamos las solicitudes de las listas de recomendaciones lo más rápido que podemos. Nunca bloqueamos.
Así que volvamos a visitar el servicio. Lo primero que vamos a hacer es hacer que el método de servicio tome una devolución de llamada. Antes de hacer eso, establezcamos algunas reglas.
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
Ahora estamos tomando una devolución de llamada y podemos decidir cuándo queremos manejar esta solicitud de generación de recomendaciones. Podemos hacerlo de inmediato si los datos del usuario que necesitamos están en memoria o podemos retrasarlos.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
}
Observe, si el usuario se encuentra en el caché, ejecutamos nuestras reglas de recomendación en memoria y llamamos a las recommendationsCallback.accept(runRulesEngineAgainstUser(user))
de devolución de llamada.
La parte interesante es qué hacemos si no tenemos al usuario cargado.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
...
Aquí usamos una devolución de llamada para cargar al usuario, y cuando se carga el usuario, llamamos a handleLoadFromUserDataService
, lo que agrega cierta administración sobre el manejo de la devolución de llamada para que aún podamos manejar esta llamada, simplemente no ahora.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...
El uso de lambdas como este hace que el código sea más legible y breve, pero recuerde que no aniden las expresiones de lambda profundamente o creará una pesadilla de mantenimiento del código. Úselos juiciosamente.
Lo que queremos es manejar la solicitud de recomendaciones después de que el sistema de servicio de usuario cargue al usuario desde su tienda.
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
}
La parte importante allí es que cada vez que recibimos una llamada de devolución de llamada de UserDataService
, luego realizamos nuestras reglas de recomendación intensiva de CPU y devolución de llamada a nuestra persona que llama. Bueno, no exactamente, lo que hacemos es Enqueue un Runnable en nuestra cola de devoluciones de llamada, y luego iteraremos a través de ellos, pero ¿cuándo?
El RecommendationService
se puede notificar cuando su cola está vacía, ha comenzado un nuevo lote y cuando ha alcanzado un límite de lotes. Todos estos son buenos momentos para manejar las devoluciones de llamada del UserDataService
.
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}
Es importante recordar al manejar las devoluciones de llamada desde otro microservicio que desea manejar las devoluciones de llamada del otro servicio antes de manejar más solicitudes de sus clientes. Esencialmente, tiene clientes que han estado esperando (async esperando pero aún así), y estos clientes podrían representar una conexión TCP/IP abierta como una llamada HTTP, por lo que es mejor cerrarlos antes de manejar más solicitudes y, como dijimos. alrededor con una conexión abierta para que los usuarios carguen forman el servicio de usuario.
Para obtener más información sobre las devoluciones de llamada, Plesae Read [QBIT Java Microservice LIB Callback Fundaments] ([Rough Cut] QBIT microservice LIB trabajando con devoluciones de llamada).
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...
Puede componer trabajadores fragmentados (en memoria, seguro de hilo, servicios intensivos de CPU) o trabajadores para IO o que hablen con servicios extranjeros o autobuses extranjeros.
Aquí hay un ejemplo que utiliza un grupo de trabajadores con tres trabajadores de servicio:
Digamos que tiene un servicio que hace algo:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}
Ahora, esto hace algún tipo de IO y quieres tener un banco de estos ejecutando no solo uno para que puedas hacer IO en paralelo. Después de algunas pruebas de rendimiento, descubriste que tres es el número mágico.
Desea usar su API para acceder a este servicio:
public interface MultiWorkerClient {
void doSomeWork (...);
}
Ahora creemos un banco de estos y lo usemos.
Primero cree los servicios QBIT que agregan el hilo/cola/microbatch.
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
Ahora agrégalos a un objeto ServiceWorkers.
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workers
Puede agregar servicios, Pojos y consumidores de métodos, despachadores de métodos a un paquete de servicio. El paquete de servicio es un punto de integración en QBIT.
Agreguemos a nuestros nuevos trabajadores de servicio. ServiceWorkers es un ServiceMethoddispatcher.
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
Probablemente vamos a agregar un método auxiliar al paquete de servicio para que la mayor parte de esto pueda suceder en una sola llamada.
Ahora puede comenzar a usar a sus trabajadores.
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
Ahora puede usar Spring o Guice para configurar los constructores y el paquete de servicio. Pero puede hacerlo como lo anterior, que es bueno para probar y comprender los internales QBIT.
QBIT también admite el concepto de servicios fragmentados, que es bueno para fragmentar recursos como CPU (ejecute un motor de reglas en cada núcleo de CPU para un motor de recomendación del usuario).
QBIT no sabe cómo fragmentar sus servicios, debe darle una pista. Haces esto a través de una regla de fragmentos.
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}
Trabajamos en una aplicación donde el primer argumento a los Servicios era el nombre de usuario, y luego lo usamos para las llamadas de fragmentos a un motor de reglas de memoria intensiva de CPU. Esta técnica funciona. :)
La clase ServiceWorkers tiene un método para crear un grupo de trabajadores fragmentados.
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}
Para usar, simplemente pasa una llave de fragmentos cuando crea los trabajadores de servicio.
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});
Luego agregue sus servicios a la composición de los trabajadores del servicio.
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}
Luego agréguelo al paquete de servicio como antes.
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
Entonces solo úsalo:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
}
public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}
ShardonBeanPath le permite crear una llamada de navegación de ruta de frijol compleja y usar su propiedad para fragmentar.
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );
Lea más sobre los trabajadores de fragmentos de servicio y servicios aquí
Puedes encontrar mucho más en la wiki. También siga los compromisos. Hemos sido castores ocupados. QBIT El microservicio lib para Java - JSON, REST, WebSocket.