Az RSocket bemutatása

1. Bemutatkozás

Ebben az oktatóanyagban először megvizsgáljuk az RSocket-et és azt, hogy miként teszi lehetővé az ügyfél-szerver kommunikációt.

2. Mi az RSocket?

Az RSocket egy bináris, pont-pont kommunikációs protokoll szétosztott alkalmazásokban történő felhasználásra. Ebben az értelemben alternatívát kínál más protokollokkal, például a HTTP-vel.

Az RSocket és más protokollok teljes összehasonlítása meghaladja a cikk kereteit. Ehelyett az RSocket egyik legfontosabb jellemzőjére fogunk összpontosítani: az interakciós modellekre.

Az RSocket négy interakciós modellt kínál. Ezt szem előtt tartva, mindegyiket feltárjuk egy példával.

3. Maven-függőségek

Az RSocket-nek csak két közvetlen függőségre van szüksége példáinkhoz:

 io.rsocket rsocket-core 0.11.13 io.ocketocket rsocket-transport-netty 0.11.13 

Az rsocket-core és az rsocket-transport-netty függőségek a Maven Central oldalon érhetők el.

Fontos megjegyzés, hogy az RSocket könyvtár gyakran használja a reaktív folyamokat. A Fényáram és Monó osztályokat használunk ebben a cikkben, ezért hasznos lesz az alapvető ismeretek.

4. A kiszolgáló beállítása

Először hozzuk létre a szerver osztály:

public class Server {private final Eldobható szerver; public Server () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (new RSocketImpl ())) .transport (TcpServerTransport.create ("localhost", TCP_PORT)) .start () .Iratkozz fel(); } public void dispose () {this.szerver.dispose (); } az RSocketImpl privát osztály kiterjeszti az AbstractRSocket alkalmazást {}}

Itt használjuk a RSocketFactory hogy beállítson és meghallgasson egy TCP foglalatot. A szokásunk szerint adjuk át RSocketImpl az ügyfelek kéréseinek kezelésére. Hozzáadunk módszereket a RSocketImpl ahogy megyünk.

Ezután a szerver elindításához csak azonnal be kell állítanunk:

Kiszolgálószerver = új kiszolgáló ();

Egyetlen szerverpéldány több kapcsolatot is képes kezelni. Ennek eredményeként csak egy szerverpéldány fogja támogatni az összes példánkat.

Amikor befejeztük, a eldob metódus leállítja a szervert és felszabadítja a TCP portot.

4. Interakciós modellek

4.1. Kérés / válasz

Az RSocket egy kérés / válasz modellt biztosít - minden kérelem egyetlen választ kap.

Ehhez a modellhez létrehozunk egy egyszerű szolgáltatást, amely visszaküldi az üzenetet az ügyfélnek.

Kezdjük azzal, hogy hozzáadunk egy metódust a AbstractRSocket, RSocketImpl:

@Orride public Mono requestResponse (Hasznos teher) {try {return Mono.just (hasznos teher); // a hasznos teher visszatükrözése a feladónak} fogás (x kivétel) {return Mono.error (x); }}

A requestResponse metódus minden kéréshez egyetlen eredményt ad vissza, amint azt a Monó válasz típusa.

Hasznos teher az osztály, amely az üzenet tartalmát és metaadatait tartalmazza. Az összes interakciós modell használja. A hasznos teher tartalma bináris, de vannak kényelmi módszerek, amelyek támogatják Húr-alapú tartalom.

Ezután létrehozhatjuk ügyfélosztályunkat:

public class ReqResClient {private final RSocket socket; public ReqResClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public String callBlocking (String string) {return socket .requestResponse (DefaultPayload.create (string)) .map (Hasznos teher :: getDataUtf8) .block (); } public void dispose () {this.socket.dispose (); }}

Az ügyfél használja a RSocketFactory.connect () módszer a socket kapcsolat kezdeményezésére a szerverrel. Használjuk a requestResponse metódus a socketen, hogy hasznos terhet küldjön a szerverre.

Hasznos teherünk tartalmazza a Húr átadta az ügyfélnek. Amikor az Monó válasz érkezik, használhatjuk a getDataUtf8 () módszer a Húr a válasz tartalma.

Végül futtathatjuk az integrációs tesztet, hogy lássuk a kérést / választ. Küldünk egy Húr a szerverre, és ellenőrizze, hogy ugyanaz Húr visszatér:

@Test public void whenSendingAString_thenRevceiveTheSameString () {ReqResClient client = new ReqResClient (); Karaktersorozat = "Hello RSocket"; assertEquals (string, client.callBlocking (string)); kliens.dispose (); }

4.2. Tűz és felejtsd el

A tűz és felejtsd el modellel az ügyfél nem kap választ a szervertől.

Ebben a példában az ügyfél szimulált méréseket küld 50 ms-os időközönként a szervernek. A szerver közzéteszi a méréseket.

Adjunk hozzá egy tűz és felejts kezelőt a szerverünkhöz a RSocketImpl osztály:

@Orride public Mono fireAndForget (Payload payload) {try {dataPublisher.publish (hasznos teher); // továbbítja a hasznos teher visszatérését Mono.empty (); } catch (x kivétel) {return Mono.error (x); }}

Ez a kezelő nagyon hasonlít a kérés / válasz kezelőhöz. Azonban, fireAndForget visszatér Monó ahelyett Monó.

A dataPublisher példánya org.reactivestreams.Publisher. Így elérhetővé teszi az előfizetők számára a hasznos terhet. Ezt felhasználjuk a kérés / adatfolyam példában.

Ezután létrehozzuk a tűz és felejts klienst:

nyilvános osztályú FireNForgetClient {private final RSocket socket; privát végleges lista adatok; public FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** bináris sebesség (lebegés) küldése 50 ms-onként * / public void sendData () {data = Collections.unmodifiableList (generatorData ()); Flux.interval (Duration.ofMillis (50)). Take (data.size ()) .map (this :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}

A foglalat beállítása teljesen megegyezik az előzőekkel.

A sendData () módszer a Fényáram stream több üzenet küldéséhez. Minden üzenetnél meghívjuk socket :: fireAndForget.

Fel kell iratkoznunk a Monó válasz minden üzenetre. Ha elfelejtünk feliratkozni akkor socket :: fireAndForget nem hajtja végre.

A flatMap operátor gondoskodik arról, hogy Üres a válaszokat továbbítják az előfizetőnek, míg a blockLast üzemeltető jár el előfizetőként.

Várni fogunk a következő szakaszig, amíg lefuttatjuk a tűz és felejtés tesztet. Ezen a ponton létrehozunk egy kérés / adatfolyam klienst a tűz és felejts ügyfél által letolt adatok fogadására.

4.3. Kérés / közvetítés

A kérelem / adatfolyam modellben egyetlen kérelemre több válasz is érkezhet. Ahhoz, hogy ezt működés közben lássuk, a tűz és felejtsd példára építhetünk. Ehhez kérjünk egy adatfolyamot az előző szakaszban küldött mérések lekéréséhez.

Kezdjük úgy, mint korábban, új hallgató hozzáadásával a RSocketImpl a szerveren:

@Orride public Flux requestStream (Payload payload) {return Flux.from (dataPublisher); }

A requestStream a kezelő visszaadja a Fényáram folyam. Mint az előző szakaszból felidézzük, a fireAndForget kezelő a bejövő adatokat közzétette a dataPublisher. Most létrehozunk egy Fényáram streamelheted ugyanezt dataPublisher mint az esemény forrása. Ezzel a mérési adatok aszinkron módon áramlanak a tűz és felejts el kliensünktől a kérelem / adatfolyam kliensünkig.

Hozd létre a következő kérelem / adatfolyam klienst:

public class ReqStreamClient {private final RSocket socket; public ReqStreamClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public Flux getDataStream () {return socket .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (Payload :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } public void dispose () {this.socket.dispose (); }}

A szerverhez ugyanúgy kapcsolódunk, mint korábbi ügyfeleinkhez.

Ban ben getDataStream ()használunk socket.requestStream () hogy Flux folyamot fogadjon a szervertől. Ebből a folyamból kivonjuk a Úszó értékeket a bináris adatokból. Végül az adatfolyam visszakerül a hívóhoz, így a hívó feliratkozhat rá és feldolgozhatja az eredményeket.

Most teszteljük. Ellenőrizni fogjuk az oda-vissza útvonalat a tűz és felejtsd el a kérés / streamelés irányába.

Azt állíthatjuk, hogy az egyes értékeket ugyanabban a sorrendben kapjuk meg, mint amit elküldtünk. Ezután azt állíthatjuk, hogy ugyanannyi értéket kapunk, amelyet elküldtünk:

@Test public void whenSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = new FireNForgetClient (); ReqStreamClient streamClient = új ReqStreamClient (); Lista adatok = fnfClient.getData (); List dataReceived = new ArrayList (); Eldobható előfizetés = streamClient.getDataStream () .index () .subscribe (tuple -> {assertEquals ("Rossz érték", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); dataReceived. add (tuple.getT2 ());}, tévedés -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... dispose client & subscription assertEquals ("Helytelen adatszám érkezett", data.size (), dataReceived.size ()); }

4.4. Csatorna

A csatornamodell kétirányú kommunikációt biztosít. Ebben a modellben az üzenetfolyamok aszinkron módon áramlanak mindkét irányban.

Hozzunk létre egy egyszerű játékszimulációt ennek tesztelésére. Ebben a játékban a csatorna mindkét oldala játékos lesz. A játék lefutása során ezek a játékosok véletlenszerű időközönként üzeneteket küldenek a másik oldalra. A másik oldal reagál az üzenetekre.

Először létrehozzuk a kezelőt a szerveren. Az előzőekhez hasonlóan hozzáadjuk a RSocketImpl:

@Orride public Flux requestChannel (Publisher payloads) {Flux.from (payloads) .subscribe (gameController :: processPayload); return Flux.from (gameController); }

A requestChannel kezelő rendelkezik Hasznos teher adatfolyamok mind bemenetre, mind kimenetre. A Kiadó az input paraméter az ügyféltől kapott hasznos terhelések folyama. Amint megérkeznek, ezeket a hasznos terheket átadják a gameController :: processPayload funkció.

Válaszként egy másikat adunk vissza Fényáram streameljen vissza az ügyfélhez. Ez az adatfolyam a mi gameController, amely szintén a Kiadó.

Itt található a GameController osztály:

public class A GameController megvalósítja a Publisher {@Orride public void subscribe (Subscriber előfizető) {// véletlenszerű időközönként Payload üzeneteket küld az előfizetőnek} public void processPayload (Payload payload) {// reagál a másik játékos üzeneteire}}

Amikor az GameController fogad egy előfizetőt, akkor üzeneteket kezd küldeni annak az előfizetőnek.

Ezután hozzuk létre az ügyfelet:

public class ChannelClient {private final RSocket socket; privát végső GameController gameController; public ChannelClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); this.gameController = new GameController ("Client Player"); } public void playGame () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } public void dispose () {this.socket.dispose (); }}

Amint azt korábbi példáinkban láthattuk, az ügyfél ugyanúgy csatlakozik a szerverhez, mint a többi kliens.

Az ügyfél létrehozza a saját példányát GameController.

Használunk socket.requestChannel () hogy elküldje a mi Hasznos teher stream a szerverre. A szerver saját Payload folyamattal válaszol.

A szervertől kapott hasznos terhekként továbbítjuk őket a sajátunkhoz gameController :: processPayload kezelő.

A játék szimulációnkban az ügyfél és a szerver egymás tükörképe. Vagyis mindkét oldal áramlatot küld Hasznos teher és kap egy Hasznos teher a másik végéből.

Az adatfolyamok függetlenül, szinkronizálás nélkül futnak.

Végül futtassuk a szimulációt egy tesztben:

@Test public void whenRunningChannelGame_thenLogTheResults () {ChannelClient kliens = new ChannelClient (); client.playGame (); kliens.dispose (); }

5. Következtetés

Ebben a bevezető cikkben az RSocket által biztosított interakciós modelleket tártuk fel. A példák teljes forráskódja megtalálható a Github adattárunkban.

Feltétlenül nézze meg az RSocket webhelyét a mélyebb vita érdekében. Különösen a GYIK és a Motivációk dokumentumok nyújtanak jó hátteret.


$config[zx-auto] not found$config[zx-overlay] not found