Az RSocket bemutatása
1. Bemutatkozás
Ebben az oktatóanyagban először megvizsgáljuk az RSocket-et és azt, hogy miként teszi lehetővé az ügyfél-szerver kommunikációt.
2. Mi az RSocket?
Az RSocket egy bináris, pont-pont kommunikációs protokoll szétosztott alkalmazásokban történő felhasználásra. Ebben az értelemben alternatívát kínál más protokollokkal, például a HTTP-vel.
Az RSocket és más protokollok teljes összehasonlítása meghaladja a cikk kereteit. Ehelyett az RSocket egyik legfontosabb jellemzőjére fogunk összpontosítani: az interakciós modellekre.
Az RSocket négy interakciós modellt kínál. Ezt szem előtt tartva, mindegyiket feltárjuk egy példával.
3. Maven-függőségek
Az RSocket-nek csak két közvetlen függőségre van szüksége példáinkhoz:
io.rsocket rsocket-core 0.11.13 io.ocketocket rsocket-transport-netty 0.11.13
Az rsocket-core és az rsocket-transport-netty függőségek a Maven Central oldalon érhetők el.
Fontos megjegyzés, hogy az RSocket könyvtár gyakran használja a reaktív folyamokat. A Fényáram és Monó osztályokat használunk ebben a cikkben, ezért hasznos lesz az alapvető ismeretek.
4. A kiszolgáló beállítása
Először hozzuk létre a szerver osztály:
public class Server {private final Eldobható szerver; public Server () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (new RSocketImpl ())) .transport (TcpServerTransport.create ("localhost", TCP_PORT)) .start () .Iratkozz fel(); } public void dispose () {this.szerver.dispose (); } az RSocketImpl privát osztály kiterjeszti az AbstractRSocket alkalmazást {}}
Itt használjuk a RSocketFactory hogy beállítson és meghallgasson egy TCP foglalatot. A szokásunk szerint adjuk át RSocketImpl az ügyfelek kéréseinek kezelésére. Hozzáadunk módszereket a RSocketImpl ahogy megyünk.
Ezután a szerver elindításához csak azonnal be kell állítanunk:
Kiszolgálószerver = új kiszolgáló ();
Egyetlen szerverpéldány több kapcsolatot is képes kezelni. Ennek eredményeként csak egy szerverpéldány fogja támogatni az összes példánkat.
Amikor befejeztük, a eldob metódus leállítja a szervert és felszabadítja a TCP portot.
4. Interakciós modellek
4.1. Kérés / válasz
Az RSocket egy kérés / válasz modellt biztosít - minden kérelem egyetlen választ kap.
Ehhez a modellhez létrehozunk egy egyszerű szolgáltatást, amely visszaküldi az üzenetet az ügyfélnek.
Kezdjük azzal, hogy hozzáadunk egy metódust a AbstractRSocket, RSocketImpl:
@Orride public Mono requestResponse (Hasznos teher) {try {return Mono.just (hasznos teher); // a hasznos teher visszatükrözése a feladónak} fogás (x kivétel) {return Mono.error (x); }}
A requestResponse metódus minden kéréshez egyetlen eredményt ad vissza, amint azt a Monó válasz típusa.
Hasznos teher az osztály, amely az üzenet tartalmát és metaadatait tartalmazza. Az összes interakciós modell használja. A hasznos teher tartalma bináris, de vannak kényelmi módszerek, amelyek támogatják Húr-alapú tartalom.
Ezután létrehozhatjuk ügyfélosztályunkat:
public class ReqResClient {private final RSocket socket; public ReqResClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public String callBlocking (String string) {return socket .requestResponse (DefaultPayload.create (string)) .map (Hasznos teher :: getDataUtf8) .block (); } public void dispose () {this.socket.dispose (); }}
Az ügyfél használja a RSocketFactory.connect () módszer a socket kapcsolat kezdeményezésére a szerverrel. Használjuk a requestResponse metódus a socketen, hogy hasznos terhet küldjön a szerverre.
Hasznos teherünk tartalmazza a Húr átadta az ügyfélnek. Amikor az Monó válasz érkezik, használhatjuk a getDataUtf8 () módszer a Húr a válasz tartalma.
Végül futtathatjuk az integrációs tesztet, hogy lássuk a kérést / választ. Küldünk egy Húr a szerverre, és ellenőrizze, hogy ugyanaz Húr visszatér:
@Test public void whenSendingAString_thenRevceiveTheSameString () {ReqResClient client = new ReqResClient (); Karaktersorozat = "Hello RSocket"; assertEquals (string, client.callBlocking (string)); kliens.dispose (); }
4.2. Tűz és felejtsd el
A tűz és felejtsd el modellel az ügyfél nem kap választ a szervertől.
Ebben a példában az ügyfél szimulált méréseket küld 50 ms-os időközönként a szervernek. A szerver közzéteszi a méréseket.
Adjunk hozzá egy tűz és felejts kezelőt a szerverünkhöz a RSocketImpl osztály:
@Orride public Mono fireAndForget (Payload payload) {try {dataPublisher.publish (hasznos teher); // továbbítja a hasznos teher visszatérését Mono.empty (); } catch (x kivétel) {return Mono.error (x); }}
Ez a kezelő nagyon hasonlít a kérés / válasz kezelőhöz. Azonban, fireAndForget visszatér Monó ahelyett Monó.
A dataPublisher példánya org.reactivestreams.Publisher. Így elérhetővé teszi az előfizetők számára a hasznos terhet. Ezt felhasználjuk a kérés / adatfolyam példában.
Ezután létrehozzuk a tűz és felejts klienst:
nyilvános osztályú FireNForgetClient {private final RSocket socket; privát végleges lista adatok; public FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** bináris sebesség (lebegés) küldése 50 ms-onként * / public void sendData () {data = Collections.unmodifiableList (generatorData ()); Flux.interval (Duration.ofMillis (50)). Take (data.size ()) .map (this :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}
A foglalat beállítása teljesen megegyezik az előzőekkel.
A sendData () módszer a Fényáram stream több üzenet küldéséhez. Minden üzenetnél meghívjuk socket :: fireAndForget.
Fel kell iratkoznunk a Monó válasz minden üzenetre. Ha elfelejtünk feliratkozni akkor socket :: fireAndForget nem hajtja végre.
A flatMap operátor gondoskodik arról, hogy Üres a válaszokat továbbítják az előfizetőnek, míg a blockLast üzemeltető jár el előfizetőként.
Várni fogunk a következő szakaszig, amíg lefuttatjuk a tűz és felejtés tesztet. Ezen a ponton létrehozunk egy kérés / adatfolyam klienst a tűz és felejts ügyfél által letolt adatok fogadására.
4.3. Kérés / közvetítés
A kérelem / adatfolyam modellben egyetlen kérelemre több válasz is érkezhet. Ahhoz, hogy ezt működés közben lássuk, a tűz és felejtsd példára építhetünk. Ehhez kérjünk egy adatfolyamot az előző szakaszban küldött mérések lekéréséhez.
Kezdjük úgy, mint korábban, új hallgató hozzáadásával a RSocketImpl a szerveren:
@Orride public Flux requestStream (Payload payload) {return Flux.from (dataPublisher); }
A requestStream a kezelő visszaadja a Fényáram folyam. Mint az előző szakaszból felidézzük, a fireAndForget kezelő a bejövő adatokat közzétette a dataPublisher. Most létrehozunk egy Fényáram streamelheted ugyanezt dataPublisher mint az esemény forrása. Ezzel a mérési adatok aszinkron módon áramlanak a tűz és felejts el kliensünktől a kérelem / adatfolyam kliensünkig.
Hozd létre a következő kérelem / adatfolyam klienst:
public class ReqStreamClient {private final RSocket socket; public ReqStreamClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public Flux getDataStream () {return socket .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (Payload :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } public void dispose () {this.socket.dispose (); }}
A szerverhez ugyanúgy kapcsolódunk, mint korábbi ügyfeleinkhez.
Ban ben getDataStream ()használunk socket.requestStream () hogy Flux folyamot fogadjon a szervertől. Ebből a folyamból kivonjuk a Úszó értékeket a bináris adatokból. Végül az adatfolyam visszakerül a hívóhoz, így a hívó feliratkozhat rá és feldolgozhatja az eredményeket.
Most teszteljük. Ellenőrizni fogjuk az oda-vissza útvonalat a tűz és felejtsd el a kérés / streamelés irányába.
Azt állíthatjuk, hogy az egyes értékeket ugyanabban a sorrendben kapjuk meg, mint amit elküldtünk. Ezután azt állíthatjuk, hogy ugyanannyi értéket kapunk, amelyet elküldtünk:
@Test public void whenSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = new FireNForgetClient (); ReqStreamClient streamClient = új ReqStreamClient (); Lista adatok = fnfClient.getData (); List dataReceived = new ArrayList (); Eldobható előfizetés = streamClient.getDataStream () .index () .subscribe (tuple -> {assertEquals ("Rossz érték", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); dataReceived. add (tuple.getT2 ());}, tévedés -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... dispose client & subscription assertEquals ("Helytelen adatszám érkezett", data.size (), dataReceived.size ()); }
4.4. Csatorna
A csatornamodell kétirányú kommunikációt biztosít. Ebben a modellben az üzenetfolyamok aszinkron módon áramlanak mindkét irányban.
Hozzunk létre egy egyszerű játékszimulációt ennek tesztelésére. Ebben a játékban a csatorna mindkét oldala játékos lesz. A játék lefutása során ezek a játékosok véletlenszerű időközönként üzeneteket küldenek a másik oldalra. A másik oldal reagál az üzenetekre.
Először létrehozzuk a kezelőt a szerveren. Az előzőekhez hasonlóan hozzáadjuk a RSocketImpl:
@Orride public Flux requestChannel (Publisher payloads) {Flux.from (payloads) .subscribe (gameController :: processPayload); return Flux.from (gameController); }
A requestChannel kezelő rendelkezik Hasznos teher adatfolyamok mind bemenetre, mind kimenetre. A Kiadó az input paraméter az ügyféltől kapott hasznos terhelések folyama. Amint megérkeznek, ezeket a hasznos terheket átadják a gameController :: processPayload funkció.
Válaszként egy másikat adunk vissza Fényáram streameljen vissza az ügyfélhez. Ez az adatfolyam a mi gameController, amely szintén a Kiadó.
Itt található a GameController osztály:
public class A GameController megvalósítja a Publisher {@Orride public void subscribe (Subscriber előfizető) {// véletlenszerű időközönként Payload üzeneteket küld az előfizetőnek} public void processPayload (Payload payload) {// reagál a másik játékos üzeneteire}}
Amikor az GameController fogad egy előfizetőt, akkor üzeneteket kezd küldeni annak az előfizetőnek.
Ezután hozzuk létre az ügyfelet:
public class ChannelClient {private final RSocket socket; privát végső GameController gameController; public ChannelClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); this.gameController = new GameController ("Client Player"); } public void playGame () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } public void dispose () {this.socket.dispose (); }}
Amint azt korábbi példáinkban láthattuk, az ügyfél ugyanúgy csatlakozik a szerverhez, mint a többi kliens.
Az ügyfél létrehozza a saját példányát GameController.
Használunk socket.requestChannel () hogy elküldje a mi Hasznos teher stream a szerverre. A szerver saját Payload folyamattal válaszol.
A szervertől kapott hasznos terhekként továbbítjuk őket a sajátunkhoz gameController :: processPayload kezelő.
A játék szimulációnkban az ügyfél és a szerver egymás tükörképe. Vagyis mindkét oldal áramlatot küld Hasznos teher és kap egy Hasznos teher a másik végéből.
Az adatfolyamok függetlenül, szinkronizálás nélkül futnak.
Végül futtassuk a szimulációt egy tesztben:
@Test public void whenRunningChannelGame_thenLogTheResults () {ChannelClient kliens = new ChannelClient (); client.playGame (); kliens.dispose (); }
5. Következtetés
Ebben a bevezető cikkben az RSocket által biztosított interakciós modelleket tártuk fel. A példák teljes forráskódja megtalálható a Github adattárunkban.
Feltétlenül nézze meg az RSocket webhelyét a mélyebb vita érdekében. Különösen a GYIK és a Motivációk dokumentumok nyújtanak jó hátteret.