Megbízható üzenetküldés JGroups-szal

1. Áttekintés

A JGroups egy Java API a megbízható üzenetek cseréjéhez. Egy egyszerű kezelőfelülettel rendelkezik, amely a következőket biztosítja:

  • rugalmas protokollköteg, beleértve a TCP-t és az UDP-t
  • a nagy üzenetek szétaprózódása és összerakása
  • megbízható unicast és multicast
  • hiba észlelése
  • áramlásszabályozás

Valamint sok más funkció.

Ebben az oktatóanyagban létrehozunk egy egyszerű alkalmazást a cserére Húr üzenetek az alkalmazások között, és megosztott állapot biztosítása az új alkalmazások számára, amikor csatlakoznak a hálózathoz.

2. Beállítás

2.1. Maven-függőség

Egyetlen függőséget kell hozzáadnunk a sajátunkhoz pom.xml:

 org.jgroups jgroups 4.0.10.Döntő 

A könyvtár legújabb verziója a Maven Central oldalon ellenőrizhető.

2.2. Hálózatépítés

A JGroups alapértelmezés szerint megpróbálja használni az IPV6-ot. Rendszerkonfigurációnktól függően ez azt eredményezheti, hogy az alkalmazások nem tudnak kommunikálni.

Ennek elkerülése érdekében beállítjuk a java.net.preferIPv4Stack nak nek igaz tulajdonság, amikor itt futtatjuk alkalmazásainkat:

java -Djava.net.preferIPv4Stack = true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Kapcsolatunk a JGroups hálózathoz a JChannel. A csatorna csatlakozik egy fürthöz, és üzeneteket, valamint információkat küld a hálózat állapotáról.

3.1. Csatorna létrehozása

Létrehozunk egy JChannel egy konfigurációs fájl elérési útjával. Ha kihagyjuk a fájl nevét, akkor meg fogja keresni udp.xml az aktuális munkakönyvtárban.

Létrehozunk egy csatornát egy kifejezetten elnevezett konfigurációs fájllal:

JChannel channel = new JChannel ("src / main / resources / udp.xml"); 

A JGroups konfigurálása nagyon bonyolult lehet, de az alapértelmezett UDP és TCP konfiguráció elegendő a legtöbb alkalmazáshoz. Felvettük az UDP fájlt a kódunkba, és ezt az oktatóanyagot fogjuk használni.

A szállítás konfigurálásával kapcsolatos további információkért lásd a JGroups kézikönyvet itt.

3.2. Csatorna csatlakoztatása

Miután létrehoztuk csatornánkat, csatlakoznunk kell egy fürthöz. A fürt egy csomópontok csoportja, amelyek üzeneteket cserélnek.

A fürthöz való csatlakozáshoz fürtnévre van szükség:

channel.connect ("Baeldung"); 

Az első csomópont, amely megpróbál csatlakozni egy fürthöz, létrehozza azt, ha nem létezik. Az alábbiakban ezt a folyamatot látjuk működés közben.

3.3. Csatorna elnevezése

A csomópontokat név alapján azonosítják, hogy a társak irányított üzeneteket küldhessenek és értesítéseket kapjanak arról, hogy ki lép be és távozik a fürtbe. A JGroups automatikusan hozzárendel egy nevet, vagy beállíthatjuk a sajátunkat:

channel.name ("user1");

Az alábbiakban ezeket a neveket használjuk annak nyomon követésére, amikor a csomópontok belépnek és kilépnek a fürtből.

3.4. Csatorna bezárása

A csatornatisztítás elengedhetetlen, ha azt akarjuk, hogy a társak időben értesítést kapjanak arról, hogy kiléptünk.

Bezárjuk a JChannel szoros módszerével:

channel.close ()

4. Fürt nézet változásai

A létrehozott JChannel segítségével készen állunk arra, hogy megnézzük a társak állapotát a fürtben, és üzeneteket cseréljünk velük.

A JGroups fenntartja a fürt állapotát a Kilátás osztály. Minden csatornának egyetlen Kilátás a hálózat. Amikor a nézet megváltozik, a viewAccepted () visszahív.

Ehhez az oktatóanyaghoz kibővítjük a ReceiverAdaptor API-osztály, amely az alkalmazáshoz szükséges összes interfész-módszert megvalósítja.

Ez a visszahívások végrehajtásának ajánlott módja.

Tegyük hozzá viewAccepted alkalmazásunkhoz:

public void viewAccepted (View newView) {private View lastView; if (lastView == null) {System.out.println ("Fogadott kezdeti nézet:"); newView.forEach (System.out :: println); } else {System.out.println ("Új nézet érkezett."); List newMembers = View.newMembers (lastView, newView); System.out.println ("Új tagok:"); newMembers.forEach (System.out :: println); List exMembers = View.leftMembers (lastView, newView); System.out.println ("Kilépett tagok:"); exMembers.forEach (System.out :: println); } lastView = newView; } 

Minden egyes Kilátás tartalmaz egy Lista nak,-nek Cím objektumok, amelyek a fürt minden tagját képviselik. A JGroups kényelmi módszereket kínál az egyik nézet összehasonlításához a másikkal, amelyeket a fürt új vagy kilépett tagjainak felderítésére használunk.

5. Üzenetek küldése

Az üzenetkezelés a JGroups-ban egyszerű. A Üzenet tartalmaz egy byte tömb és Cím a küldőnek és a vevőnek megfelelő tárgyakat.

Ehhez az oktatóanyaghoz használjuk Húrok olvasható a parancssorból, de könnyen belátható, hogy egy alkalmazás hogyan cserélhet más adattípusokat.

5.1. Broadcast Messages

A Üzenet létrejön egy rendeltetési hely és egy bájt tömb; JChannel beállítja nekünk a feladót. Ha a cél az nulla, az egész fürt megkapja az üzenetet.

Elfogadjuk a szöveget a parancssorból, és elküldjük a fürtnek:

System.out.print ("Írjon be egy üzenetet:"); Karaktersor = in.readLine (). ToLowerCase (); Üzenetüzenet = új Üzenet (null, line.getBytes ()); channel.send (üzenet); 

Ha a programunk több példányát futtatjuk és elküldjük ezt az üzenetet (a kap() módszer), mindegyikük megkapja, beleértve a feladót is.

5.2. Üzeneteink blokkolása

Ha nem akarjuk látni az üzeneteinket, akkor beállíthatunk egy tulajdonságot ehhez:

channel.setDiscardOwnMessages (true); 

Amikor lefuttatjuk az előző tesztet, az üzenetküldő nem kapja meg a sugárzott üzenetét.

5.3. Közvetlen üzenet

Közvetlen üzenet küldéséhez érvényes Cím. Ha név szerint hivatkozunk a csomópontokra, szükségünk van egy módra a Cím. Szerencsére megvan a Kilátás azért.

A jelenlegi Kilátás mindig elérhető a JChannel:

privát Opcionális getAddress (karakterlánc neve) {Nézet megtekintése = channel.view (); return view.getMembers (). stream () .filter (cím -> név.egyenlő (cím.String ())) .findAny (); } 

Cím nevek elérhetőek az osztályon keresztül toString () módszerrel, ezért csupán a Lista a kívánt klasztertagok közül.

Így elfogadhatunk egy nevet a konzolról, megkereshetjük a társított rendeltetési helyet és közvetlen üzenetet küldhetünk:

Cím cél = null; System.out.print ("Adjon meg egy rendeltetési helyet:"); String destinationName = in.readLine (). ToLowerCase (); rendeltetési hely = getAddress (célNév) .vagyElseThrow (() -> új kivétel ("A cél nem található"); Üzenetüzenet = új Üzenet (cél, "Sziasztok!"); channel.send (üzenet); 

6. Üzenetek fogadása

Küldhetünk üzeneteket, most tegyük hozzá, próbáljuk meg megkapni őket most.

Írjuk felül ReceiverAdaptor's üres fogadási módszer:

public void fogad (Üzenet) {String line = Üzenet kapott: "+ message.getSrc () +" ide: "+ message.getDest () +" -> "+ message.getObject (); System.out.println (vonal); } 

Mivel tudjuk, hogy az üzenet a Húr, nyugodtan elhaladhatunk getObject () nak nek System.out.

7. Államcsere

Amikor egy csomópont belép a hálózatba, lehet, hogy be kell szereznie az állapotinformációkat a fürtről. A JGroups ehhez állami átviteli mechanizmust biztosít.

Amikor egy csomópont csatlakozik a fürthöz, egyszerűen hív getState (). A fürt általában lekéri az állapotot a csoport legidősebb tagjától - a koordinátortól.

Adjunk hozzá egy közvetítési üzenetszámot alkalmazásunkhoz. Hozzáadunk egy új tagváltozót, és növeljük benne kap():

private Integer messageCount = 0; public void fogad (Üzenet) {String line = "Üzenet kapott:" + message.getSrc () + "ide:" + message.getDest () + "->" + message.getObject (); System.out.println (vonal); if (message.getDest () == null) {messageCount ++; System.out.println ("Üzenetek száma:" + messageCount); }} 

Ellenőrizzük a nulla cél, mert ha közvetlen üzeneteket számolunk, akkor minden csomópontnak más lesz a száma.

Ezután felülírunk még két módszert ReceiverAdaptor:

public void setState (InputStream input) {try {messageCount = Util.objectFromStream (new DataInputStream (input)); } catch (e kivétel) {System.out.println ("Hiba a deserialing állapotban!"); } System.out.println (messageCount + "az aktuális üzenetszám."); } public void getState (OutputStream output) kiveti a {Util.objectToStream (messageCount, new DataOutputStream (output)) kivételt; } 

Az üzenetekhez hasonlóan a JGroups az állapotot tömbként továbbítja bájtokat.

A JGroups szállít egy InputStream a koordinátornak, hogy írja le az állapotot, és egy OutputStream hogy az új csomópont olvasható legyen. Az API kényelmi osztályokat biztosít az adatok sorosításához és dezerializálásához.

Vegye figyelembe, hogy a gyártási kódban az állapotinformációkhoz való hozzáférésnek szálbiztosnak kell lennie.

Végül hozzáadjuk a hívást getState () az indításkor, miután csatlakoztunk a fürthöz:

channel.connect (clusterName); channel.getState (null, 0); 

getState () elfogad egy célállomást, ahonnan milliszekundumban kérheti az állapotot és az időtúllépést. A nulla a rendeltetési hely jelzi a koordinátort, a 0 pedig nem időtúllépés.

Amikor ezt az alkalmazást pár csomóponttal futtatjuk és közvetített üzeneteket cserélünk, látjuk az üzenetszám növekedését.

Majd ha hozzáadunk egy harmadik klienst, vagy leállítjuk és elindítjuk az egyiket, látni fogjuk, hogy az újonnan csatlakoztatott csomópont a megfelelő üzenetszámot nyomtatja ki.

8. Következtetés

Ebben az oktatóanyagban a JGroups segítségével létrehoztunk egy alkalmazást az üzenetek cseréjére. Az API segítségével figyeltük, mely csomópontok csatlakoztak a fürthöz és elhagyták azt, valamint a fürt állapotát egy új csomópontra vittük át, amikor csatlakozott.

A kódminták, mint mindig, a GitHubon találhatók.


$config[zx-auto] not found$config[zx-overlay] not found