Apache Kafka

Cum să citiți datele de la Kafka cu Python

Cum să citiți datele de la Kafka cu Python
Kafka este un sistem de mesagerie distribuită open-source pentru a trimite mesajul în subiecte partiționate și diferite. Fluxul de date în timp real poate fi implementat folosind Kafka pentru a primi date între aplicații. Are trei părți majore. Acestea sunt producător, consumator și subiecte. Producătorul este folosit pentru a trimite un mesaj către un anumit subiect și fiecare mesaj este atașat cu o cheie. Consumatorul este folosit pentru a citi un mesaj pe un anumit subiect din setul de partiții. Datele primite de la producător și stocate pe partiții pe baza unui anumit subiect. Multe biblioteci există în python pentru a crea producător și consumator pentru a construi un sistem de mesagerie folosind Kafka. Cum pot fi citite datele de la Kafka folosind python este prezentat în acest tutorial.

Condiție prealabilă

Trebuie să instalați biblioteca python necesară pentru a citi datele de la Kafka. Python3 este utilizat în acest tutorial pentru a scrie scriptul consumatorului și al producătorului. Dacă pachetul pip nu este instalat înainte în sistemul dvs. de operare Linux, atunci trebuie să instalați pip înainte de a instala biblioteca Kafka pentru python. python3-kafka este utilizat în acest tutorial pentru a citi date de la Kafka. Rulați următoarea comandă pentru a instala biblioteca.

$ pip instala python3-kafka

Citirea datelor de text simple de la Kafka

Diferite tipuri de date pot fi trimise de la producător pe un anumit subiect care poate fi citit de consumator. Modul în care un text simplu poate fi trimis și primit de la Kafka folosind producătorul și consumatorul este prezentat în această parte a acestui tutorial.

Creați un fișier numit producător1.py cu următorul script python. KafkaProducer modulul este importat din biblioteca Kafka. Lista brokerilor trebuie definită în momentul inițializării obiectului producătorului pentru a vă conecta la serverul Kafka. Portul implicit al Kafka este „9092'. Argumentul bootstrap_servers este utilizat pentru a defini numele gazdei cu portul. 'First_Topic'este setat ca un nume de subiect prin care mesajul text va fi trimis de la producător. Apoi, un mesaj text simplu, „Bună ziua de la Kafka'este trimis folosind trimite() Metodă de KafkaProducer la subiect,First_Topic'.

producător1.py:

# Importați KafkaProducer din biblioteca Kafka
de la kafka import KafkaProducer
# Definiți serverul cu portul
bootstrap_servers = ['localhost: 9092']
# Definiți numele subiectului în care va fi publicat mesajul
topicName = 'First_Topic'
# Inițializați variabila producător
producer = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Publică text într-un subiect definit
producător.trimite (topicName, b'Hello din kafka ... ')
# Imprimați mesajul
print („Mesaj trimis”)

Creați un fișier numit consumator1.py cu următorul script python. KafkaConsumer modulul este importat din biblioteca Kafka pentru a citi datele din Kafka. sys modulul este utilizat aici pentru a termina scriptul. Același nume de gazdă și același număr de port al producătorului sunt utilizate în scriptul consumatorului pentru a citi datele de la Kafka. Numele subiectului consumatorului și al producătorului trebuie să fie același care este „First_topic'.  Apoi, obiectul consumator este inițializat cu cele trei argumente. Numele subiectului, ID-ul grupului și informații despre server. pentru bucla este utilizată aici pentru a citi textul trimis de la producătorul Kafka.

consumator1.py:

# Importați KafkaConsumer din biblioteca Kafka
din importul kafka KafkaConsumer
# Importați modulul sys
import sisteme
# Definiți serverul cu portul
bootstrap_servers = ['localhost: 9092']
# Definiți numele subiectului de unde va primi mesajul
topicName = 'First_Topic'
# Inițializați variabila de consum
consumer = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Citiți și tipăriți mesajul de la consumator
pentru msg în consumator:
print ("Nume subiect =% s, Mesaj =% s"% (msg.subiect, msg.valoare))
# Încheiați scriptul
sys.Ieșire()

Ieșire:

Rulați următoarea comandă de la un terminal pentru a executa scriptul producător.

$ python3 producer1.py

Următoarea ieșire va apărea după trimiterea mesajului.

Rulați următoarea comandă de la un alt terminal pentru a executa scriptul de consum.

$ python3 consumer1.py

Ieșirea arată numele subiectului și mesajul text trimis de la producător.

Citirea datelor formatate JSON de la Kafka

Datele formatate JSON pot fi trimise de către producătorul Kafka și citite de către consumatorul Kafka folosind json modul de python. Modul în care datele JSON pot fi serializate și de-serializate înainte de trimiterea și primirea datelor folosind modulul python-kafka este prezentat în această parte a acestui tutorial.

Creați un script Python numit producător2.py cu următorul script. Un alt modul numit JSON este importat cu KafkaProducer modul aici. valoare_serializator argumentul este folosit cu bootstrap_servers argument aici pentru a inițializa obiectul producătorului Kafka. Acest argument indică faptul că datele JSON vor fi codificate folosind „utf-8'set de caractere în momentul trimiterii. Apoi, datele formatate JSON sunt trimise la subiectul numit JSONtopic.

producător2.py:

# Importați KafkaProducer din biblioteca Kafka
de la kafka import KafkaProducer
# Importați modulul JSON pentru a serializa datele
import json
# Inițializați variabila producător și setați parametrul pentru codificarea JSON
producer = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.halde (v).codifica ('utf-8'))
# Trimiteți date în format JSON
producător.send ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
 
# Imprimați mesajul
print („Mesaj trimis către JSONtopic”)

Creați un script Python numit consumator2.py cu următorul script. KafkaConsumer, sys și modulele JSON sunt importate în acest script. KafkaConsumer modulul este utilizat pentru a citi datele formatate JSON din Kafka. Modulul JSON este utilizat pentru decodarea datelor JSON codate trimise de la producătorul Kafka. Sys modulul este utilizat pentru a termina scriptul. valoare_deserializator argumentul este folosit cu bootstrap_servers pentru a defini cum vor fi decodate datele JSON. Următorul, pentru bucla este utilizată pentru a imprima toate înregistrările consumatorilor și datele JSON recuperate de la Kafka.

consumator2.py:

# Importați KafkaConsumer din biblioteca Kafka
din importul kafka KafkaConsumer
# Importați modulul sys
import sisteme
# Importați modulul json pentru a serializa datele
import json
# Inițializați variabila de consum și setați proprietatea pentru decodarea JSON
consumer = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.sarcini (m.decodifica ('utf-8'))))
# Citiți datele de la kafka
pentru mesaj în consumator:
print ("Înregistrări consumator: \ n")
print (mesaj)
print ("\ nCitirea din date JSON \ n")
print ("Nume:", mesaj [6] ['nume'])
print ("Email:", mesaj [6] ['email'])
# Încheiați scriptul
sys.Ieșire()

Ieșire:

Rulați următoarea comandă de la un terminal pentru a executa scriptul producător.

$ python3 producer2.py

Scriptul va imprima următorul mesaj după trimiterea datelor JSON.

Rulați următoarea comandă de la un alt terminal pentru a executa scriptul de consum.

$ python3 consumer2.py

Următoarea ieșire va apărea după rularea scriptului.

Concluzie:

Datele pot fi trimise și primite în diferite formate de la Kafka folosind python. Datele pot fi, de asemenea, stocate în baza de date și preluate din baza de date folosind Kafka și python. Acasă, acest tutorial îl va ajuta pe utilizatorul python să înceapă să lucreze cu Kafka.

Top 10 jocuri de jucat pe Ubuntu
Platforma Windows a fost una dintre platformele dominante pentru jocuri din cauza procentului imens de jocuri care se dezvoltă astăzi pentru a sprijin...
Cele mai bune 5 jocuri arcade pentru Linux
În zilele noastre, computerele sunt mașini serioase folosite pentru jocuri. Dacă nu puteți obține noul scor mare, veți ști la ce mă refer. În această ...
Battle For Wesnoth 1.13.6 Development Released
Battle For Wesnoth 1.13.6 released last month, is the sixth development release in the 1.13.x series and it delivers a number of improvements, most no...