Qbit Java Micorservices Lib Tutorial | Situs web qbit | Qbit menggunakan REAKT | Qbit bekerja dengan vert.x | REAKT VERTX
Java Microservice Lib. Qbit adalah lib pemrograman reaktif untuk membangun layanan mikro - JSON, HTTP, WebSocket, dan istirahat. Qbit menggunakan pemrograman reaktif untuk membangun istirahat elastis, dan layanan web cloud berbasis websockets. SOA berevolusi untuk ponsel dan cloud. Servicediscovery, Kesehatan, Layanan Statistik Reaktif, Peristiwa, Pemrograman Reaktif Idiomatik Java untuk Layanan Microservices.
Punya pertanyaan? Tanyakan di sini: QBIT Google Group.
Semuanya adalah antrian. Anda punya pilihan. Anda dapat merangkulnya dan mengendalikannya. Anda dapat mengoptimalkannya. Atau Anda dapat bersembunyi di balik abstraksi. Qbit membuka Anda untuk mengintip apa yang sedang terjadi, dan memungkinkan Anda untuk menarik beberapa tuas tanpa menjual jiwa Anda.
Qbit adalah perpustakaan bukan kerangka kerja. Anda dapat mencampur dan mencocokkan qbit dengan musim semi, guice, dll.
QBIT sekarang mendukung janji yang dapat dibangun untuk proxy klien lokal dan jarak jauh. Ini memberikan API fasih yang bagus untuk pemrograman async.
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();
Callback qbit sekarang juga callback reakt tanpa melanggar kontrak qbit untuk panggilan balik.
Lihat Janji yang Dapat Dibangun Reakt untuk detail lebih lanjut.
Qbit diterbitkan ke repo publik Maven.
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'
Dikerahkan di beberapa perusahaan Fortune 100 besar. Qbit sekarang berfungsi dengan vertx (mandiri atau tertanam). Anda juga dapat menggunakan QBIT pada proyek non-QBIT, itu hanya lib.
Apache 2
QBIT memiliki layanan inproc, layanan microservices dan Websocket Microservices serta bus acara layanan in-proc (yang dapat per modul atau per aplikasi). Ini mendukung pekerja dan layanan dalam memori.
Sebelum kami menjelaskan lebih lanjut, berikut adalah dua layanan sampel:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {...
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}
Pada akhirnya qbit adalah perpustakaan sederhana, bukan kerangka kerja. Aplikasi Anda bukan aplikasi qbit tetapi aplikasi Java yang menggunakan qbit lib. Qbit memungkinkan Anda untuk bekerja dengan Java Util secara bersamaan, dan tidak berusaha untuk menyembunyikannya dari Anda. Hanya mencoba menghilangkan sengatannya.
Kami telah menggunakan teknik dalam anugerah dan qbit dengan keberhasilan besar dalam aplikasi high-end, berkinerja tinggi, berskala tinggi. Kami membantu klien menangani 10x beban dengan 1/10 server pesaing mereka menggunakan teknik di qbit. Qbit adalah kita muak dengan akses dan utas antrian tuning tangan.
Gagasan untuk anugerah dan qbit sering datang dari seluruh web. Kami membuat kesalahan. Tunjukkan mereka. Sebagai pengembang Boon dan Qbit, kami adalah sesama pelancong. Jika Anda memiliki ide atau teknik yang ingin Anda bagikan, kami mendengarkan.
Inspirasi besar untuk Boon/Qbit adalah Vertx, Akka, saluran GO, objek aktif, threading model apartemen, aktor, dan makalah simpati mekanis.
Qbit memiliki ide yang mirip dengan banyak kerangka kerja. Kita semua membaca makalah yang sama. Qbit mendapat inspirasi dari LMAX Disruptor Papers dan posting blog ini tentang tautan transfer antrian versus pengganggu. Kami memiliki beberapa teori tentang antrian posting blog yang menginspirasi kami untuk mencobanya. Beberapa teori ini digunakan di beberapa backend middleware terbesar dan yang namanya merek dikenal di seluruh dunia. Dan dengan demikian Qbit lahir.
Qbit juga mengambil banyak inspirasi oleh pekerjaan hebat yang dilakukan oleh Tim Fox di Vertx. Proyek pertama menggunakan sesuatu yang sebenarnya bisa disebut qbit (meskipun qbit awal) menggunakan VERTX di Web/Mobile Microservice untuk aplikasi yang berpotensi memiliki 80 juta pengguna. Pengalaman ini dengan VERTX dan qbit awal yang mengarah pada pengembangan dan evolusi qbit. Qbit dibangun di atas bahu raksasa (Netty/Vertx).
Pengganggu Musim Semi: Tidak. Anda bisa menggunakan QBIT untuk menulis plugin untuk pengganggu musim semi, saya kira, tetapi qbit tidak bersaing dengan pengganggu musim semi. Spring Boot/Spring MVC: Tidak. Kami menggunakan anotasi yang sama tetapi qbit diarahkan untuk microservices dalam memori berkecepatan tinggi. Ini lebih seperti akka daripada boot musim semi. Qbit memiliki subset fitur Spring MVC yang ditujukan hanya untuk layanan mikro, IE, Websocket RPC, REST, JSON MARSHALING, dll. Akka: Tidak. No. Well mungkin. Akka memiliki konsep yang sama tetapi mereka mengambil pendekatan yang berbeda. Qbit lebih fokus pada Java, dan Microservices (REST, JSON, WebSocket) daripada Akka. LMAX Disruptor: Tidak. Faktanya, kita dapat menggunakan pengganggu seperti pada antrian yang digunakan Qbit di bawah penutup.
(Tolok ukur awal telah dihapus. Mereka ada di sini. Qbit mendapat jauh lebih cepat. Benchmarking qbit adalah target yang bergerak saat ini. Tautan dan laporan akan dibuat.)
Contoh Kode
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)
Qbit adalah perpustakaan antrian untuk layanan mikro. Ini mirip dengan banyak proyek lain seperti Akka, Spring Reactor, dll. QBIT hanyalah perpustakaan bukan platform. Qbit memiliki perpustakaan untuk menempatkan layanan di belakang antrian. Anda dapat menggunakan antrian qbit secara langsung atau Anda dapat membuat layanan. Layanan QBIT dapat diekspos oleh WebSocket, HTTP, HTTP Pipeline, dan jenis remoting lainnya. Layanan di QBIT adalah kelas Java yang metodenya dieksekusi di balik antrian layanan. Qbit mengimplementasikan threading model apartemen dan mirip dengan model aktor atau deskripsi yang lebih baik akan menjadi objek aktif. Qbit tidak menggunakan pengganggu (tetapi bisa). Ini menggunakan antrian Java biasa. Qbit dapat melakukan utara 100 juta panggilan ping pong per detik yang merupakan kecepatan luar biasa (dilihat setinggi 200m). Qbit juga mendukung layanan panggilan melalui istirahat, dan Websocket. Qbit adalah layanan mikro dalam arti web murni: JSON, HTTP, WebSocket, dll. Qbit menggunakan batching mikro untuk mendorong pesan melalui pipa (antrian, IO, dll.) Lebih cepat mengurangi hand-off benang.
Qbit adalah Java Microservice Lib Lib REST, JSON dan WEBSOCKET. Itu ditulis dalam Java tetapi suatu hari kita bisa menulis versi dalam karat atau pergi atau C# (tapi itu akan membutuhkan hari gajian yang besar).
Layanan pojo (objek java tua biasa) di balik antrian yang dapat menerima panggilan metode melalui panggilan atau peristiwa proxy (mungkin memiliki satu utas mengelola peristiwa, panggilan metode, dan respons atau dua untuk panggilan dan peristiwa metode dan yang lainnya untuk respons sehingga penangan respons respons Jangan memblokir layanan. Layanan dapat menggunakan anotasi REST gaya MVC Spring untuk mengekspos diri ke dunia luar melalui istirahat dan websocket.
ServiceBundle banyak pojos di belakang satu antrian respons dan banyak yang menerima antrian. Mungkin ada satu utas untuk semua tanggapan atau tidak. Mereka juga bisa menjadi satu antrian menerima.
Antri utas yang mengelola antrian. Itu mendukung batching. Ini memiliki peristiwa untuk kosong, dijangkau, dimulai, IDLE. Anda dapat mendengarkan acara ini dari layanan yang duduk di belakang antrian. Anda tidak perlu menggunakan layanan. Anda dapat menggunakan Direct Queue. Di QBIT, Anda memiliki antrian pengirim dan antrian penerima. Mereka terpisah untuk mendukung pemasangan mikro.
ServiceEndointServer ServiceBundle yang terpapar komunikasi istirahat dan websocket.
Eventbus Eventbus adalah cara untuk mengirim banyak pesan ke layanan yang mungkin digabungkan secara longgar.
ClientProxy ClientProxy adalah cara untuk memohon layanan melalui antarmuka async, layanan dapat berupa inproc (proses yang sama) atau diulang kembali melalui WebSocket.
Qbit non-blocking adalah lib non-blocking. Anda menggunakan callback melalui java 8 lambdas. Anda juga dapat mengirim pesan acara dan mendapatkan balasan. Pesan dibangun ke dalam sistem sehingga Anda dapat dengan mudah mengoordinasikan tugas -tugas kompleks. Qbit mengambil pendekatan berorientasi objek untuk pengembangan layanan sehingga layanan terlihat seperti layanan Java normal yang sudah Anda tulis, tetapi layanan hidup di balik antrian/utas. Ini bukan konsep baru. Microsoft melakukan ini dengan DCOM/COM dan menyebutnya objek aktif. Akka melakukannya dengan aktor dan memanggil mereka aktor yang sangat diketik. Konsep -konsep pentingnya adalah Anda mendapatkan kecepatan pesan gaya reaktif dan aktor tetapi Anda berkembang dalam pendekatan OOP alami. Qbit bukan yang pertama. Qbit bukan satu -satunya.
Speed Qbit sangat cepat. Tentu saja ada banyak ruang untuk perbaikan. Tetapi sudah 200m+ TPS Inproc Ping Pong, 10m-20m+ TPS Event Bus, 500K TPS RPC panggilan melalui WebSocket/JSON, dll. Lebih banyak pekerjaan yang perlu dilakukan untuk meningkatkan kecepatan, tetapi sekarang cukup cepat di mana kami lebih fokus pada kegunaan. Dukungan JSON menggunakan anugerah secara default yang mencapai 4X lebih cepat dari parsers JSON lainnya untuk kasus REST/JSON, WebSocket/JSON menggunakan kasus.
Qbit pemrograman reaktif menyediakan reaktor untuk mengelola panggilan async. Ini memungkinkan panggilan balik untuk ditangani pada utas yang sama yang memanggil mereka dan menyediakan waktu tunggu dan penanganan kesalahan. Baca Tutorial Reaktor untuk Membuat Pemrograman Layanan Mikro Reaktif
Penemuan layanan dibangun dalam dukungan untuk penemuan layanan. Ini termasuk integrasi dengan konsul.
Statservice membangun dukungan untuk statistik. Statservice dapat diintegrasikan dengan StatSD (Graphite, Grafana, Datadog, dll.) Untuk menerbitkan statistik pasif. Atau Anda dapat meminta mesin statistik dan bereaksi terhadap statistik (hitungan, waktu dan level). STATSSERVICE adalah sistem statistik reaktif yang dapat dikelompokkan. Statservice reaktif karena layanan Anda dapat menerbitkannya dan menanyakannya dan bereaksi berdasarkan hasilnya. Anda dapat menerapkan hal -hal seperti membatasi tingkat dan bereaksi terhadap peningkatan laju sesuatu. Sistem servicediscovery terintegrasi dengan Salthsystem dan Konsul untuk menggulung setiap layanan internal Anda yang membentuk layanan mikro Anda dan menerbitkan komposit yang tersedia layanan mikro Anda ke titik akhir HTTP tunggal atau Sakelar Orang Mati di Konsul (TTL).
Bicara itu murah. Mari kita lihat beberapa kode. Anda bisa mendapatkan jalan -jalan rinci di wiki. Kami sudah memiliki banyak dokumentasi.
Kami akan membuat layanan yang diekspos melalui REST/JSON.
Untuk menanyakan ukuran daftar TODO:
curl localhost:8080/services/todo-service/todo/count
Untuk menambahkan item TODO baru.
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todo
Untuk mendapatkan daftar item todo
curl http://localhost:8080/services/todo-service/todo/
Contoh TODO akan menggunakan dan melacak item TODO.
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;
Todoservice menggunakan anotasi gaya MVC Spring.
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
}
Anda dapat memposting/menempatkan non-json dan Anda dapat menangkap tubuh sebagai String
atau sebagai byte[]
. Jika tipe konten diatur ke apa pun kecuali application/json
dan tubuh Anda didefinisikan string atau byte []. Ini berfungsi secara otomatis. (Tipe konten harus diatur.)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
}
Secara default Qbit mengirimkan 200
(OK) untuk panggilan non-void (panggilan yang memiliki pengembalian atau panggilan balik). Jika operasi istirahat tidak memiliki pengembalian atau tidak ada panggilan balik maka qbit mengirimkan 202
(diterima). Mungkin ada saat -saat ketika Anda ingin mengirim 201 (dibuat) atau kode lain yang bukan pengecualian. Anda dapat melakukannya dengan mengatur code
di @RequestMapping
. Secara default kode adalah -1 yang berarti menggunakan perilaku default (200 untuk sukses, 202 untuk pesan satu arah, dan 500 untuk kesalahan).
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
}
Callbacks
dapat digunakan untuk layanan internal juga. Seringkali Anda menggunakan callbackbuilder atau reaktor qbit untuk mengelola panggilan layanan.
Anda tidak perlu mengembalikan panggilan istirahat JSON Form. Anda dapat mengembalikan biner atau teks apa pun dengan menggunakan HttpBinaryResponse
dan HttpTextResponse
.
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
}
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}
Mengapa kami memilih anotasi gaya musim semi?
Sekarang mulai saja.
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}
Hanya itu saja. Ada juga dukungan Websocket di luar kotak dengan pembuatan proxy sisi klien sehingga Anda dapat meminta layanan dengan tingkat jutaan panggilan per detik.
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}
Anda selalu dapat memohon layanan qbit melalui proxy Websocket. Keuntungan dari proxy Websocket adalah memungkinkan Anda menjalankan 1M RPC+ per kedua (1 juta panggilan jarak jauh setiap detik).
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );
Outputnya adalah 3.
3
Di atas menggunakan antarmuka proxy WebSocket untuk menghubungi layanan async.
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}
Buat Klien Layanan Websocket yang disadari oleh ServiceDiscovery.
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" );
Saat ini clientBuilder
akan memuat semua titik akhir layanan yang terdaftar di bawah nama layanan, dan memilih satu secara acak.
ServicedIscovery termasuk konsul berbasis, menonton file JSON di disk, dan DNS. Sangat mudah untuk menulis penemuan layanan Anda sendiri juga dan mencolokkannya ke qbit.
Di masa depan kita dapat melakukan panggilan roundrobin atau panggilan shard ke layanan WebSocket dan/atau memberikan kegagalan otomatis jika koneksi ditutup. Kami melakukan ini untuk bus acara yang menggunakan penemuan layanan tetapi belum dimasukkan ke dalam rintisan klien berbasis websocket.
Contoh klien terakhir menggunakan WebSocket. Anda juga bisa menggunakan istirahat, dan benar -benar menggunakan URI Params yang kami atur. Istirahat itu bagus tapi akan lebih lambat dari dukungan WebSocket.
Qbit mengirim dengan klien http kecil yang bagus. Kita bisa menggunakannya.
Anda dapat menggunakannya untuk mengirim panggilan async dan pesan websocket dengan klien HTTP.
Di sini kami akan menggunakan klien HTTP untuk memohon metode jarak jauh kami:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );
Outputnya adalah 4.
4
Anda juga dapat mengakses layanan dari Curl.
$ curl http://localhost:7000/services/adder-service/add/2/2
Lihat contoh lengkap ini di sini: qbit microservice memulai tutorial.
Qbit URI Params dan Websocket Proxy Client
Qbit memiliki perpustakaan untuk bekerja dengan dan menulis layanan microsync yang ringan dan menyenangkan untuk digunakan.
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
});
/* Start the server. */
httpServer . start ();
/** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
/* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
Sekarang hentikan server dan klien. Cukup mudah ya?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
Fokusnya adalah pada kemudahan penggunaan dan menggunakan java 8 lambdas untuk panggilan balik sehingga kodenya ketat dan kecil.
Cari tahu lebih lanjut tentang Dukungan Websocket Gaya Microservice Qbit di sini
Sekarang, mari kita coba klien HTTP kami.
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();
Anda hanya melewati URL, port dan kemudian hubungi mulai.
Sekarang Anda dapat mulai mengirim permintaan HTTP.
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );
Respons HTTP hanya berisi hasil dari server.
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}
Ada metode pembantu untuk sinkronisasi http mendapatkan panggilan.
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
Metode puts adalah metode helper yang dilakukan System.out.println lebih atau kurang.
Lima param pertama dicakup. Lebih dari lima, Anda harus menggunakan httpBuilder.
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );
Ada panggilan async untuk mendapatkan juga.
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );
[Temukan lebih lanjut tentang klien Microservice http yang mudah digunakan, cepat di sini] (https://github.com/advantageous/qbit/wiki/%5bdoc%5d-using-qbit-microservice-lib's-httpClient-get.-Post, -et-al, -json, -java-8-lambda).
QBIT memungkinkan layanan di balik antrian juga dijalankan di Proc.
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());
Tutorial terperinci tentang layanan in-proc sedang ditulis.
Bus acara QBIT Contoh lebih rinci
Qbit juga memiliki bus acara layanan. Contoh ini adalah contoh layanan manfaat karyawan.
Kami memiliki dua saluran.
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
Objek karyawan terlihat seperti ini:
public static class Employee {
final String firstName ;
final int employeeId ;
Contoh ini memiliki tiga layanan: Pegawai Layanan, Layanan Layanan Manfaat, dan Payrollservice.
Layanan ini adalah layanan inproc. Qbit mendukung layanan jarak jauh Websocket, HTTP, dan REST, tetapi untuk saat ini, mari kita fokus pada layanan inproc. Jika Anda memahami INPROC maka Anda akan memahami remote.
The Pegawai Asservice sebenarnya menembakkan acara ke dua layanan lainnya.
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}
Perhatikan bahwa kami menelepon SendArray sehingga kami dapat mengirim karyawan dan gaji mereka. Pendengar untuk Payroll_Adjustment_Channel harus menangani baik karyawan dan int yang mewakili gaji karyawan baru. Anda juga dapat menggunakan proxy bus acara sehingga Anda tidak harus memanggil bus acara sama sekali.
Layanan manfaat mendengarkan karyawan baru yang dipekerjakan sehingga dapat mendaftarkan mereka ke dalam sistem tunjangan.
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}
Ayah perlu dibayar.
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}
Karyawan adalah objek karyawan dari Pegawai Layanan.
Jadi, Anda bisa mendapatkan tunjangan, dan dibayar!
Temukan detail lebih lanjut di sini:
Bus acara QBIT Contoh lebih rinci
Anda dapat mendefinisikan antarmuka Anda sendiri ke bus acara dan Anda dapat menggunakan bus acara Anda sendiri dengan QBIT. Setiap modul dalam layanan Anda dapat memiliki bus acara internal sendiri.
Untuk mempelajari lebih lanjut baca: qbit microservice bekerja dengan bus acara pribadi dan qbit java microservice lib menggunakan antarmuka Anda sendiri ke bus acara.
Untuk benar -benar memahami qbit, seseorang harus memahami konsep panggilan balik.
Panggilan balik adalah cara untuk mendapatkan respons async di qbit.
Anda menghubungi metode layanan dan menghubungi Anda kembali.
Proksi klien dapat memiliki panggilan balik:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}
Callbacks adalah konsumen Java 8 dengan beberapa penanganan kesalahan ekstra opsional.
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}
Layanan yang dapat diblokir harus menggunakan panggilan balik. Jadi jika loadUser diblokir dalam contoh berikut, itu harus benar -benar menggunakan panggilan balik alih -alih mengembalikan nilai.
Rekomendasi Kelas Publik {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
}
Mari kita berpura-pura loadUser
harus melihat dalam cache lokal, dan jika pengguna tidak ditemukan, lihat di cache yang tidak tertutup dan jika tidak ditemukan itu harus meminta pengguna dari layanan pengguna yang harus memeriksa cache-nya dan mungkin fallback untuk memuat data pengguna dari database atau dari layanan lain. Dengan kata lain, loadUser
berpotensi memblokir pada IO.
Klien kami tidak memblokir, tetapi layanan kami melakukannya. Kembali ke RecommendationService
kami. Jika kita mendapatkan banyak hit cache untuk beban pengguna, mungkin blok tidak akan selama itu, tetapi akan ada di sana dan setiap kali kita harus menyalahkan pengguna, seluruh sistem ditembakkan. Apa yang ingin kami lakukan adalah jika kami tidak dapat menangani permintaan rekomendasi, kami melanjutkan dan melakukan panggilan async ke UserDataService
. Ketika panggilan balik async itu kembali, maka kami menangani permintaan itu. Sementara itu, kami menangani daftar rekomendasi permintaan secepat yang kami bisa. Kami tidak pernah memblokir.
Jadi mari kita kunjungi kembali layanannya. Hal pertama yang akan kami lakukan adalah membuat metode layanan menerima panggilan balik. Sebelum kita melakukan itu, mari kita atur beberapa aturan.
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
Sekarang kami menerima panggilan balik dan kami dapat memutuskan kapan kami ingin menangani permintaan pembuatan rekomendasi ini. Kami dapat melakukannya segera jika ada data pengguna yang kami butuhkan adalah dalam memori atau kami dapat menunda.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
}
Perhatikan, jika pengguna ditemukan di cache, kami menjalankan aturan rekomendasi kami dalam memori dan hubungi callback segera recommendationsCallback.accept(runRulesEngineAgainstUser(user))
.
Bagian yang menarik adalah apa yang kami lakukan jika tidak memiliki pengguna dimuat.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
...
Di sini kami menggunakan panggilan balik untuk memuat pengguna, dan ketika pengguna dimuat, kami memanggil handleLoadFromUserDataService
yang menambahkan beberapa manajemen tentang menangani panggilan balik sehingga kami masih dapat menangani panggilan ini, hanya saja tidak sekarang.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...
Menggunakan lambdas seperti ini membuat kode lebih mudah dibaca dan singkat, tetapi ingat jangan terlalu sarang ekspresi lambda atau Anda akan membuat mimpi buruk pemeliharaan kode. Gunakan mereka dengan bijaksana.
Yang kami inginkan adalah menangani permintaan rekomendasi setelah sistem layanan pengguna memuat pengguna dari toko -Nya.
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
}
Bagian penting di sana adalah bahwa setiap kali kami mendapat panggilan panggilan balik dari UserDataService
, kami kemudian melakukan aturan rekomendasi intensif CPU kami dan panggilan balik penelepon kami. Yah bukan persis, apa yang kita lakukan adalah menganut antrian panggilan balik kita, dan kemudian kita akan mengulanginya melalui itu tetapi kapan?
RecommendationService
dapat diberitahu ketika antriannya kosong, ia telah memulai batch baru dan ketika telah mencapai batas batch. Ini semua adalah saat yang baik untuk menangani panggilan balik dari UserDataService
.
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}
Penting untuk diingat ketika menangani panggilan balik dari layanan microser lain yang ingin Anda tangani callback dari layanan lain sebelum Anda menangani lebih banyak permintaan yang tidak terkena dari klien Anda. Pada dasarnya Anda memiliki klien yang telah menunggu (async menunggu tetapi tetap), dan klien ini mungkin mewakili koneksi TCP/IP terbuka seperti panggilan HTTP sehingga yang terbaik adalah menutupnya sebelum menangani lebih banyak permintaan dan seperti yang kami katakan sudah menunggu sekitar dengan koneksi terbuka untuk pengguna untuk memuat dari layanan pengguna.
Untuk mempelajari lebih lanjut tentang panggilan balik, Plesae membaca [Qbit Java Microservice Lib Callback Fundamentals] ([CUT] QBIT Microservice Lib bekerja dengan callbacks).
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...
Anda dapat menyusun pekerja bercukur (untuk layanan in-memori, thread aman, CPU intensif), atau pekerja untuk IO atau berbicara dengan layanan asing atau bus asing.
Berikut adalah contoh yang menggunakan kumpulan pekerja dengan tiga pekerja layanan di dalamnya:
Katakanlah Anda memiliki layanan yang melakukan sesuatu:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}
Sekarang ini melakukan semacam IO dan Anda ingin memiliki bank yang berjalan bukan hanya satu sehingga Anda dapat melakukan IO secara paralel. Setelah beberapa pengujian kinerja, Anda mengetahui bahwa tiga adalah angka ajaib.
Anda ingin menggunakan API Anda untuk mengakses layanan ini:
public interface MultiWorkerClient {
void doSomeWork (...);
}
Sekarang mari kita buat bank ini dan gunakan.
Pertama -tama buat layanan qbit yang menambahkan utas/antrian/microbatch.
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
Sekarang tambahkan mereka ke objek Serviceworkers.
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workers
Anda dapat menambahkan layanan, POJOS, dan Metode Konsumen, Metode Dispatcher ke Bundel Layanan. Bundel layanan adalah titik integrasi ke qbit.
Mari kita tambahkan pekerja layanan baru kami. Peworker adalah servicemethoddispatcher.
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
Kami mungkin akan menambahkan metode pembantu ke bundel layanan sehingga sebagian besar dari ini dapat terjadi dalam satu panggilan.
Sekarang Anda dapat mulai menggunakan pekerja Anda.
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
Sekarang Anda bisa menggunakan Spring atau Guice untuk mengonfigurasi pembangun dan bundel layanan. Tetapi Anda bisa melakukannya seperti hal di atas yang baik untuk menguji dan memahami internal qbit.
QBIT juga mendukung konsep layanan berbiaya yang baik untuk sumber daya sharding seperti CPU (jalankan mesin aturan pada setiap inti CPU untuk mesin rekomendasi pengguna).
Qbit tidak tahu cara membanjiri layanan Anda, Anda harus memberikan petunjuk. Anda melakukan ini melalui aturan shard.
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}
Kami bekerja di aplikasi di mana argumen pertama untuk layanan adalah nama pengguna, dan kemudian kami menggunakannya untuk panggilan shard ke mesin aturan in-memori intensif CPU. Teknik ini berfungsi. :)
Kelas Serviceworkers memiliki metode untuk membuat kumpulan pekerja yang dicek.
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}
Untuk menggunakan Anda, lulus tombol shard saat Anda membuat pekerja layanan.
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});
Kemudian tambahkan layanan Anda ke komposisi Serviceworkers.
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}
Kemudian tambahkan ke bundel layanan seperti sebelumnya.
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();
Kemudian gunakan saja:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
}
public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}
ShardonBeanPath memungkinkan Anda untuk membuat panggilan navigasi Path Kompleks dan menggunakan propertinya untuk berbaring.
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );
Baca lebih lanjut tentang pekerja sharding dan pekerja layanan di sini
Anda dapat menemukan lebih banyak di wiki. Ikuti juga komitmen. Kami telah berang -berang sibuk. Qbit lib microservice untuk java - json, istirahat, websocket.