Hur man använder Redis för strömbehandling i realtid

Roshan Kumar är senior produktchef på Redis Labs.

Realtidsuppspelning av direktuppspelning av data är ett vanligt krav för många stora dataanvändningsfall. Inom områden som IoT, e-handel, säkerhet, kommunikation, underhållning, ekonomi och detaljhandel, där så mycket beror på snabb och korrekt datadriven beslutsfattande, är realtids datainsamling och analys faktiskt kärnan i verksamheten.

Samling, lagring och bearbetning av strömmande data i stora volymer och med hög hastighet innebär dock arkitektoniska utmaningar. Ett viktigt första steg i att leverera dataanalys i realtid är att säkerställa att tillräckliga nätverks-, beräknings-, lagrings- och minnesresurser är tillgängliga för att fånga snabba dataströmmar. Men ett företags mjukvarustack måste matcha prestanda för dess fysiska infrastruktur. Annars kommer företag att möta en enorm eftersläpning av data, eller värre, saknade eller ofullständiga uppgifter.

Redis har blivit ett populärt val för sådana snabba datainmatningsscenarier. En lätt databasplattform i minnet, Redis uppnår genomströmning i miljontals operationer per sekund med fördröjningar under millisekunder, samtidigt som man drar på minimala resurser. Det erbjuder också enkla implementeringar, aktiverade av dess flera datastrukturer och funktioner.

I den här artikeln kommer jag att visa hur Redis Enterprise kan lösa vanliga utmaningar i samband med intag och bearbetning av stora volymer data med hög hastighet. Vi går igenom tre olika tillvägagångssätt (inklusive kod) för att bearbeta ett Twitter-flöde i realtid med Redis Pub / Sub, Redis Lists respektive Redis Sorted Sets. Som vi kommer att se har alla tre metoderna en roll att spela vid snabb datainmatning, beroende på användningsfallet.

Utmaningar i utformningen av snabba lösningar för datainmatning

Höghastighetsdataintag involverar ofta flera olika typer av komplexitet:

  • Stora datamängder som ibland kommer i skurar. Bursty-data kräver en lösning som kan bearbeta stora datamängder med minimal latens. Helst ska den kunna utföra miljontals skrivningar per sekund med fördröjning under millisekund med minimala resurser.
  • Data från flera källor. Datainmatningslösningar måste vara tillräckligt flexibla för att hantera data i många olika format, behålla källidentitet vid behov och omvandla eller normalisera i realtid.
  • Data som behöver filtreras, analyseras eller vidarebefordras. De flesta datainmatningslösningar har en eller flera prenumeranter som konsumerar data. Dessa är ofta olika applikationer som fungerar på samma eller olika platser med en varierad uppsättning antaganden. I sådana fall behöver databasen inte bara transformera data utan också filtrera eller aggregera beroende på kraven på de konsumerande applikationerna.
  • Data kommer från geografiskt distribuerade källor. I det här scenariot är det ofta praktiskt att distribuera datainsamlingsnoderna och placera dem nära källorna. Själva noderna blir en del av lösningen för snabb datainmatning, för att samla in, bearbeta, vidarebefordra eller omdirigera inmatningsdata.

Hantering av snabb datainmatning i Redis

Många lösningar som stöder snabb dataintag idag är komplexa, funktionsrika och överkonstruerade för enkla krav. Redis är å andra sidan extremt lätt, snabb och enkel att använda. Med kunder tillgängliga på mer än 60 språk kan Redis enkelt integreras med de populära mjukvarustackarna.

Redis erbjuder datastrukturer som listor, uppsättningar, sorterade uppsättningar och kontakter som erbjuder enkel och mångsidig databehandling. Redis levererar mer än en miljon läs- / skrivoperationer per sekund, med fördröjning under millisekunder på en blygsam råvarumolninstans, vilket gör den extremt resurseffektiv för stora datamängder. Redis stöder också meddelandetjänster och klientbibliotek på alla populära programmeringsspråk, vilket gör den väl lämpad för att kombinera höghastighetsdatainmatning och realtidsanalys. Redis Pub / Sub-kommandon gör att den kan spela rollen som en meddelandemäklare mellan utgivare och prenumeranter, en funktion som ofta används för att skicka meddelanden eller meddelanden mellan distribuerade datainmatningsnoder.

Redis Enterprise förbättrar Redis med sömlös skalning, alltid tillgänglighet, automatiserad distribution och möjligheten att använda kostnadseffektivt flashminne som RAM-förlängare så att bearbetning av stora datamängder kan genomföras kostnadseffektivt.

I avsnitten nedan kommer jag att beskriva hur man använder Redis Enterprise för att hantera vanliga datainmatningsutmaningar.

Redis med hastigheten på Twitter

För att illustrera Redis enkelhet kommer vi att undersöka ett exempel på snabb lösning för datainmatning som samlar meddelanden från ett Twitter-flöde. Målet med denna lösning är att bearbeta tweets i realtid och trycka ner dem i röret när de bearbetas.

Twitter-data som intas av lösningen konsumeras sedan av flera processorer längs linjen. Som visas i figur 1 handlar detta exempel om två processorer - den engelska Tweet Processor och Influencer Processor. Varje processor filtrerar tweets och skickar dem genom sina respektive kanaler till andra konsumenter. Denna kedja kan gå så långt som lösningen kräver. I vårt exempel stannar vi dock på tredje nivån, där vi samlar populära diskussioner bland engelsktalande och främsta influenser.

Redis Labs

Observera att vi använder exemplet för bearbetning av Twitter-flöden på grund av hastigheten på datainkomst och enkelhet. Observera också att Twitter-data når vårt snabba dataintag via en enda kanal. I många fall, till exempel IoT, kan det finnas flera datakällor som skickar data till huvudmottagaren.

Det finns tre möjliga sätt att implementera den här lösningen med Redis: intag med Redis Pub / Sub, intag med listdatastrukturen eller intag med datastrukturen för sorterad uppsättning. Låt oss undersöka vart och ett av dessa alternativ.

Förtäring med Redis Pub / Sub

Detta är den enklaste implementeringen av snabb datainmatning. Denna lösning använder Redis's Pub / Sub-funktion, som gör det möjligt för applikationer att publicera och prenumerera på meddelanden. Som visas i figur 2 behandlar varje steg data och publicerar det till en kanal. Det efterföljande steget prenumererar på kanalen och tar emot meddelandena för vidare bearbetning eller filtrering.

Redis Labs

Fördelar

  • Lätt att implementera.
  • Fungerar bra när datakällorna och processorerna distribueras geografiskt.

Nackdelar 

  • Lösningen kräver att förläggare och prenumeranter är uppe hela tiden. Prenumeranter tappar data när de stoppas eller när anslutningen går förlorad.
  • Det kräver fler anslutningar. Ett program kan inte publicera och prenumerera på samma anslutning, så varje mellanliggande databehandlare kräver två anslutningar - en för att prenumerera och en för att publicera. Om du kör Redis på en DBaaS-plattform är det viktigt att kontrollera om ditt paket eller din servicenivå har några gränser för antalet anslutningar.

En anteckning om anslutningar

Om mer än en klient prenumererar på en kanal, skickar Redis data till varje klient linjärt, en efter en. Stora datanyttolaster och många anslutningar kan införa latens mellan en publicist och dess abonnenter. Även om standardgränsen för maximalt antal anslutningar är 10 000 måste du testa och jämföra hur många anslutningar som passar din nyttolast.

Redis upprätthåller en klientutmatningsbuffert för varje klient. Standardgränserna för klientutmatningsbufferten för Pub / Sub ställs in som:

client-output-buffer-limit pubsub 32mb 8mb 60

Med den här inställningen kommer Redis att tvinga klienter att koppla ifrån sig under två förhållanden: om utgångsbufferten växer över 32 MB, eller om utgångsbufferten håller 8 MB data konsekvent i 60 sekunder.

Detta är indikationer på att kunder konsumerar data långsammare än de publiceras. Om en sådan situation skulle uppstå, försök först att optimera konsumenterna så att de inte lägger till latens när de konsumerar data. Om du märker att dina kunder fortfarande kopplas bort kan du öka gränserna för client-output-buffer-limit pubsubfastigheten i redis.conf. Tänk på att eventuella ändringar av inställningarna kan öka latensen mellan utgivaren och abonnenten. Eventuella ändringar måste testas och verifieras noggrant.

Koddesign för Redis Pub / Sub-lösningen

Redis Labs

Detta är den enklaste av de tre lösningarna som beskrivs i detta dokument. Här är de viktiga Java-klasserna som implementerats för denna lösning. Ladda ner källkoden med fullständig implementering här: //github.com/redislabsdemo/IngestPubSub.

Den Subscriberklass är kärnan klass av denna design. Varje Subscriberobjekt upprätthåller en ny anslutning till Redis.

class Abonnent utökar JedisPubSub implementerar Runnable {

       privat strängnamn;

       privat RedisConnection conn = null;

       privat Jedis jedis = null;

       privat String subscriberChannel;

       public Subscriber (String subscriberName, String channelName) kastar undantag {

              name = subscriberName;

              subscriberChannel = channelName;

              Tråd t = ny tråd (detta);

              t.start ();

       }

       @Åsidosätta

       offentlig ogiltig körning () {

              Prova{

                      conn = RedisConnection.getRedisConnection ();

                      jedis = conn.getJedis ();

                      medan (sant) {

                             jedis.subscribe (detta, this.subscriberChannel);

                      }

              } fånga (Undantag e) {

                      e.printStackTrace ();

              }

       }

       @Åsidosätta

       public void onMessage (Strängkanal, Strängmeddelande) {

              super.onMessage (kanal, meddelande);

       }

}

Den Publisherklass upprätthåller en separat anslutning till Redis för att publicera meddelanden till en kanal.

public class Publisher {

       RedisConnection conn = null;

       Jedis jedis = null;

       privat strängkanal;

       public Publisher (String channelName) kastar undantag {

              kanal = kanalnamn;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();

       }

       public void publish (String msg) kastar Undantag {

              jedis.publish (kanal, msg);

       }

}

De EnglishTweetFilter, InfluencerTweetFilter, HashTagCollectoroch InfluencerCollectorfilter sträcker Subscriber, som ger dem möjlighet att lyssna på inkommande kanaler. Eftersom du behöver separata Redis-anslutningar för att prenumerera och publicera har varje filterklass sitt eget RedisConnectionobjekt. Filter lyssnar på de nya meddelandena i sina kanaler i en loop. Här är EnglishTweetFilterklassens exempelkod :

public class EnglishTweetFilter utökar prenumeranten

{

       privat RedisConnection conn = null;

       privat Jedis jedis = null; 

       privat sträng publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) kastar undantag {

              super (namn, subscriberChannel);

              this.publisherChannel = publisherChannel;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();           

       }

       @Åsidosätta

       public void onMessage (String subscriberChannel, String message) {

              JsonParser jsonParser = ny JsonParser ();

              JsonElement jsonElement = jsonParser.parse (meddelande);

              JsonObject jsonObject = jsonElement.getAsJsonObject ();

              // filtermeddelanden: publicera endast engelska tweets           

if (jsonObject.get (“lang”)! = null &&

       jsonObject.get (“lang”). getAsString (). är lika med (“en”)) {

                      jedis.publish (publisherChannel, meddelande);

              }

       }

}

Den Publisherklass har en publicerar metod som publicerar meddelanden till önskad kanal.

public class Publisher {

.

.     

       public void publish (String msg) kastar Undantag {

              jedis.publish (kanal, msg);

       }

.

}

Huvudklassen läser data från inmatningsströmmen och skickar den till AllDatakanalen. Huvudmetoden i denna klass startar alla filterobjekten.

allmän klass IngestPubSub

{

.

       offentlig ogiltig start () kastar undantag {

       .

       .

              publisher = new Publisher (“AllData”);

              englishFilter = new EnglishTweetFilter (“English Filter”, ”AllData”,

                                           ”EnglishTweets”);

              influencerFilter = new InfluencerTweetFilter (“Influencer Filter”,

                                           “AllData”, “InfluencerTweets”);

              hashtagCollector = ny HashTagCollector (“Hashtag Collector”, 

                                           ”EnglishTweets”);

              influencerCollector = ny InfluencerCollector (“Influencer Collector”,

                                           ”InfluencerTweets”);

       .

       .

}

Inges med Redis-listor

Listdatastrukturen i Redis gör implementeringen av en kölösning enkel och okomplicerad. I den här lösningen skjuter producenten varje meddelande på baksidan av kön, och abonnenten undersöker kön och drar nya meddelanden från andra änden.

Redis Labs

Fördelar

  • Denna metod är tillförlitlig vid anslutningsförlust. När data har skjutits in i listorna, bevaras de där tills abonnenterna läser det. Detta gäller även om prenumeranterna stoppas eller förlorar sin anslutning till Redis-servern.
  • Producenter och konsumenter behöver inget samband mellan dem.

Nackdelar

  • När data har hämtats från listan tas de bort och kan inte hämtas igen. Såvida inte konsumenterna kvarhåller uppgifterna går de förlorade så fort de konsumeras.
  • Varje konsument kräver en separat kö, vilket kräver lagring av flera kopior av data.

Koddesign för Redis Lists-lösningen

Redis Labs

Du kan ladda ner källkoden för Redis Lists-lösningen här: //github.com/redislabsdemo/IngestList. Lösningens huvudklasser förklaras nedan.

MessageListbäddar in Redis List-datastrukturen. Den push()metod skjuter det nya meddelandet till vänster om kön och pop()väntar på ett nytt meddelande från höger om kön är tom.

offentlig klass MessageList {

       skyddad strängnamn = “MyList”; // Namn

.

.     

       public void push (String msg) kastar Undantag {

              jedis.lpush (namn, msg); // Vänster tryck

       }

       offentlig strängpop () kastar undantag {

              returnera jedis.brpop (0, namn) .toString ();

       }

.

.

}

MessageListenerär en abstrakt klass som implementerar lyssnare och förlagslogik. Ett MessageListenerobjekt lyssnar bara på en lista, men kan publiceras i flera kanaler ( MessageFilterobjekt). Denna lösning kräver ett separat MessageFilterobjekt för varje abonnent i röret.

klass MessageListener implementerar Runnable {

       privat Strängnamn = null;

       privat MessageList inboundList = null;

       Map outBoundMsgFilters = ny HashMap ();

.

.     

       public void registerOutBoundMessageList (MessageFilter msgFilter) {

              om (msgFilter! = null) {

                      om (outBoundMsgFilters.get (msgFilter.name) == null) {

                             outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

       @Åsidosätta

       offentlig ogiltig körning () {

.

                      medan (sant) {

                             Sträng msg = inboundList.pop ();

                             processMessage (msg);

                      }                                  

.

       }

.

       skyddad ogiltig pushMessage (String msg) kastar undantag {

              Set outBoundMsgNames = outBoundMsgFilters.keySet ();

              för (Strängnamn: outBoundMsgNames) {

                      MessageFilter msgList = outBoundMsgFilters.get (namn);

                      msgList.filterAndPush (msg);

              }

       }

}

MessageFilterär en föräldraklass som underlättar filterAndPush()metoden. När data flödar genom intagsystemet filtreras det ofta eller transformeras innan det skickas till nästa steg. Klasser som utökar MessageFilterklassen åsidosätter filterAndPush()metoden och implementerar sin egen logik för att driva det filtrerade meddelandet till nästa lista.

public class MessageFilter {

       MessageList messageList = null;

.

.

       public void filterAndPush (String msg) kastar undantag {

              messageList.push (msg);

       }

.

.     

}

AllTweetsListenerär ett exempel på implementering av en MessageListenerklass. Detta lyssnar på alla tweets på AllDatakanalen och publicerar data till EnglishTweetsFilteroch InfluencerFilter.

public class AllTweetsListener utökar MessageListener {

.

.     

       public static void main (String [] args) kastar undantag {

              MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (ny

              EnglishTweetsFilter (“EnglishTweetsFilter”, “EnglishTweets”));

              allTweetsProcessor.registerOutBoundMessageList (ny

                             InfluencerFilter (“InfluencerFilter”, “Influencers”));

              allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFiltersträcker sig MessageFilter. Denna klass implementerar logik för att bara välja de tweets som är markerade som engelska tweets. Filtret kasserar icke-engelska tweets och skjuter engelska tweets till nästa lista.

public class EnglishTweetsFilter utökar MessageFilter {

       public EnglishTweetsFilter (String name, String listName) kastar undantag {

              super (namn, listnamn);

       }

       @Åsidosätta

       public void filterAndPush (Strängmeddelande) kastar undantag {

              JsonParser jsonParser = ny JsonParser ();

              JsonElement jsonElement = jsonParser.parse (meddelande);

              JsonArray jsonArray = jsonElement.getAsJsonArray ();

              JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

              if (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). är lika med (“en”)) {

                             Jedis jedis = super.getJedisInstance ();

                             om (jedis! = null) {

                                    jedis.lpush (super.name, jsonObject.toString ());

                             }

              }

       }

}