Apache Kafka Uygulaması

Selamlar, 3 oturumluk bir yazı dizisi tasarlamıştık. Bu zamana kadar mimarisel giriş ve apache kafka bilgisi üzerine yazdık. Şimdi ise kafka server uygulaması yapıp mesaj okuma/yazma gibi işlemler yapacağız.

Kafka örneklerini incelediğimde çok yüzeysel geçilmiş yazılar gördüm ve fazla detaya yer verilen türkçe derlemelere denk gelmedim. Şimdi bu konuyu detaylı ele alıyor olacağız.

Kısa bir özetle, soldan sağa doğru kavramlar:

  • Producer, Topic’lere veriyi yazanlara denir.
  • Topic, Mesajların tutulduğu bir collection yani örneğin bir kategori ismidir diyebiliriz, veri tabanlarındaki tablolar gibi.
  • Partition, topiclerin bir araya gelmesi ile partition’ lar oluşur,
  • Broker, partitionlar bir araya gelerek broker’ i oluşturur; broker’ lar bir araya gelince kafka cluster’ i oluştururlar,
  • Consumer, mesajları okuyanlara denir; birçok consumer bir araya gelerek belirli bir düzen içinde okuma yapınca bu duruma consumer group denir.

Detaylı bilgi için ayrıntılı yazımıza yönlendirelim.

Gereksiz Bilgi

Uygulamayı MacOS üzerinde yapacağım. İşlemleri Terminal kullanarak yapacağım. Mesela ben config vb. bilgileri terminal üzerinden değiştirirken siz windows’ ta iseniz path içerisine girip configleri notepad gibi uygulamalarla da değiştirebilirsiniz veya sizde cmd kullanabilirsiniz.

O zaman başlıyoruz.

Kurulum ve Gerekli Ortamlar

1. Zookeeper

https://zookeeper.apache.org/releases.html#download

2. Apache Kafka

https://kafka.apache.org/downloads

3. Java*

https://java.com/en/download/help/windows_manual_download.xml#download

*Kafka server, scala gibi jar’ lar kullandığı için (bkz. libs klasörü) bilgisayarınızda JVM yok ise java başlığında belirttiğim linkten indirmeniz gerekmekte.

Ayrıntı Bilgi

Zookeeper çalışmadan; Kafka çalışmaz!

Yapılandırmalar

1. Windows için

system variable’ lere zookeeper_home parametresi eklenmeli ve path bilgisi gösterilmelidir.
path içerisine de eklediğimiz var’ ı /bin klasörü ile gösterelim.

2. Mac için

İndirdiğimiz tar kafka_2.12-2.5.0.tgz dosyasınını Applications folder’ ine atalım unzip yaparak paketi dışarı çıkaralım.

/Applications içerisine girelim terminal ekranından.

			tar -xvf kafka_2.12-2.5.0.tgz

ile terminal’ den unzip yapabilirsiniz.

Bilgi

Kafka içerisinde de zookeeper server bulunmakta, biz bu yazıda onu kullanacağız.

Uygulamalar

1. Zookeeper

Aşağıdaki kod ile zookeeper server’ imizi config içerisindeki zookeeper.properties config bilgisi ile ayağa kaldıralım.

			./bin/zookeeper-server-start.sh config/zookeeper.properties

Şimdi 2.sekmeyi açalım terminal üzerinde, kafka’ nın server ayarlarının tutulduğu server.properties dosyasını bi açalım görelim bakalım. (1.sekmede zookeeper çalışıyor ona hiç dokunmadık.)

			cat config/server.properties

Server basics kısmında serverle ilgili temel ayarlar socket server settings kısmında serverin socket ayarları gibi ayarlar mevcutta ben önemli kısımları vurgulamak istiyorum, siz kalanına yine bir göz atın bro’ canlar.

Önemli*

broker.id ile unique broker numarası verilir. 2.bir kafka server yani bir broker oluşturduğunuzda bu id i değiştirmeniz gerekir.

#listeners=PLAINTEXT://:9092 (farklı porta çalışmasını isterseniz) commenti açıp düzeltin. eğer commentli olarak bırakırsanız default olan 9092 ile ayağa kalkacaktır. 2.bir kafka broker çalıştırmak istediğinizde yeni bir config dosyası oluşturmanız gerekecek çünkü aynı port ile ayağa kalkamayacak sunucu.

log.dirs=/tmp/kafka-logs kafka verilerini mevcut bulunduğu folder’ da bu path’ e atar. 2.bir kafka server oluşturduğunuzda bu ismi değiştirmeniz gerekir.

log.retention.hours=168 bu ise logların kaç gün saklanacacağı bilgisidir saat cinsindendir bu değeri 24h böler isek 7 gün varsayılan olarak logları tutacağını görüyor oluruz,

O zaman bu property’ ler ile kafka’ ya startı verelim.

2. Kafka

Kafka server i çalıştırmak için aşağıdaki .sh komutunu çalıştırabilirz. Bu .sh komutları daha önce söylediğim gibi /bin klasörü içerisinde yer alır.

			./bin/kafka-server-start.sh config/server.properties

Zookeeper çalıştığı için kafka server’ imiz de sağlıklı bir şekilde ayağa kalkacaktır.

Ek Bilgi

Bu arada zookeeper server ayağa kalkarken hatırlarsanız zookeeper.properties dosyasını okuyarak ayağa kalkmıştı. Zookeeper’ in varsayılan portu= varsayılan Kafka’ nın zookeeper’ i dinleyeceği port. Bu ayarları hiç değiştirmedik ama ihtiyaç olursa ek bilgi olarak belirtelim.

An itibari ile 1.terminal sekmesinde 1 Zookeeper; 2.terminal sekmesinde 1 Kafka Broker çalıştırdık. Şimdi yeni bir 3.terminal sekmesi açalım.

Yeni bir kafka broker / server çalıştırıyor olacağız yani 2 tane broker’ imiz olsun.

Kafka’ nın okuyarak çalıştığı config bilgilerini kopyalayıp 2.bir config dosyası oluşturalım. Bu işlemi aşağıdaki gibi yaparız.

			cp server.properties server2.properties

Önemli* olarak vurguladığım kısımlardaki config parametrelerini aşağıdaki gibi değiştirdim:

yani naptık: broker.id=1, listeners port 9093 (ilki default 9092ydi), log bilgimiz kafka2.logs olarak setlendi.

Bu işlemi notepad üzerinden yapabilecğiniz gibi benim gibi terminal üzerinden yapmak isterseniz:

			vi server2.properties

komutu ile edit modunda properties dosyasını açarak klavyenin yukarı aşağı yön tuşları ile editleyip

			wq!

ile edit ekranını save ederek çıkabilirsiniz.

Neyse naptık config2 yi düzenledik. Ee o zaman 3.terminal sekmesinde yeni broker’ imizi ayağa kaldıralım:

			./bin/kafka-server-start.sh config/server2.properties

Şu anda;

1.terminalde Zookeeper, 2.terminalde kafka broker.id=0, 3.terminalde kafka broker.id=1 server’ ları çalışıyor…

Temel İşlemler

/bin komut içerisinde birçok komut var

Bu komutlara parametre vermeden çalıştırırsanız document olarak kullanım klavuzu çıkarır. Mesela topic oluşturmak için kullanılan kafka-topic.sh çalıştırıp görelim.

			cd /bin
			./kafka-topic.sh

Bakın bootstrap server required. Her .sh komuutnda required bir alandır. Mutlaka verilmelidir. Daha öncede söylediğim gibi consumer ve producer hangi server da çalıştığını bilir. Bu bazda işlemler devam eder. Ekran görseli büyk olduğu için her parametreyi paylaşmıyorum, siz bakarsınız.

4.terminal sekmemizde(new) yeni terminal açarak bir topic oluşturalım.

Hatırlarsınız ilk kafka server im listeners server ayarlarnı değiştirmediğim için 9092 de ayağa kalktı. 2. Kafka serverim 9093 te ayağa kalktı.

1. Topic Oluşturma

O Halde kafka server 1 de bir topic oluşturalım.

			./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic1

.sh komutu verdik parametre olarak sırayla bootstrap server bilgimiz, create ile yeni bir şey oluşturacağız, bu şey bir topic olacak, adı da topic1 olsun dedik.

Ama bir şey dikkatinizi çekti mi? Partition ve Replication Factor bilgilerini girmedik. Peki default olarak ne setlendi? Sizin kafka config bilgilerinizde (server.properties) default ayarlarınız ne ise o ayarlar setlendi. Biz o ayarlara dokunmadık dolayısıyla 1 partition ve 1 replication factor setlendi.

Şimdi bu oluşturduğumuz topic1 in detayını görelim. Kullanıcığımız parametre describe.

Bakınız. 1 lider den oluşan 1 replikesi olan 1 sync olan bir topic oldu topic1.

Şimdi manuel setlemeyelim mi bu parametreleri?

			./kafka-topics.sh --bootstrap-server localhost:9092 --partitions 5 --replication-factor 2 --create --topic topic5

Kafka server 1 imde 5 partition dan oluşan ve replication factor u 2 olan bir topic oluşturduk. Detayını görelim.

Bilgi

Replication factor 2 verdik çünkü 2 tane broker imiz var. Eğer 3 verseydik hata alırdık.

2. Topic Silme

Şimdi oluşturulan bir topic i silelim.,

			./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic topic1

Parametreler görüldüğü gibi bootstrap server belirtildikten sonra delete, bu bir topic, adı da topic1.

3. Topic Listeleme

Şimdi tüm topiclerimizi listeleyelim.

			./kafka-topics.sh --bootstrap-server localhost:9092 --list

4. Topic’ den Mesaj Okumak

Mevcut 4. Terminal sekmemde (new) devam ediyorum.

Console üzerinden okuma ve yazma yapacağız şimdilik.

Bin içeriisinde /kafka-console-consumer.sh adında bir komut dizini var. Bunu parametre vermeden bi çalıştırıp önemli parametrelerine bakalım hadi.,

Önemli parametreler

From-beginning ile en baştan okumak istediğinizi parametre vereblirsiniz.

Offset ile hangi offset değerinden okumaya başlayacağınız bilgisini verebilirsiniz.,

Partition ile başlamak istediğiniz partition bilgisini vereblirisiiniz.

Hem offset hem partitioın parametreleri ile hangi partition ın hangi offset inden başlamak istediğiniz bilgisini girebilirsiniz.

Başka gözüme çarpan ne var diye bakıyorum… Formatter kısmı var mesela. Konunun en başında dediğimiz gibi hangi formatta okumak istiyorsanız bunu parametre olarak geçebilirsiniz (json, xml vb.)

Neyse konumuza geri dönelim. Mesaj okuma başlatalım en baştan başlasın.

			./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic5

Evet şimdi Akıt gelsin modunda consumer mesaj bekliyor..(bkz imleç)

Bilgi

Yeni bir terminal sekmesi açalım 5.olacak. Bu da producer olsun. Mesaj göndersin.

5. Topic’ e Mesaj Yazmak

			./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic5
mesajımızı yazıyoruz…

Topic5 te yeni bir producer başlattık ve mesajlarımızı yazdık.

Terminal 4 e geri dönüp consumer a bakalım mesajlarımız en baştan başlayarak gelmiş mi?

Topic’ den Mesaj Okumak: Consumer’ a Gelen Mesajlar

Evet consumer sorunsuz bir şekilde okuma işlemini yerine getirmiş.

Son olarak consumer groups olayına da değienelim.

6. Consumer Group

Consumer grupları listeleyelim.

			./kafka-consumer-group.sh --bootstrap-server localhost:9092 --list

Görüldüğü gibi 1 tane topic imiz var. 5 partition lu. Bunun id no su 46761 miş.

Detaylı bilgi almak için napıyorduk. Describe kullanıyorduk. O halde,

			./kafka-consumer-group.sh --bootstrap-server localhost:9092 --describe --group console-consumer-46761

Current offset: consumer tarafından okunmus ve commit edilmis en sonki offset numarasıdır. Hiçbir şey yazmadıgı içibn tüm mesajlar okunmuş ve commitlenmiş.

Log end offset: partition içeriisibndekş en son ki offset numarasıdır.

Lag: partition a yazılmış mesajların sayısı – okunma sayısı. Yani farkı gösterir. Eğer lag değeri gittikçe artıyorsa real time durumu için sorun oluşabilir. Sn de 1000 mesaj yazılsın. Sn de 998 mesaj okunsun. Her saniye lag 2 şer olarak artar. Böylekikle okunmamış mesaj yazısı yükselir. Lag  e dikkat etmek lazım.

Sonuç

Yazı dizimizin sonuna geldik arkadaşlar. Kafka ile ilgili bahsedeceklerim bu kadar. Sorun yaşarsanız yazıların altına yorum yapabilirsiniz. Okuyor olacağım.

Eğer talep olursa da bir spring boot uygulaması hazırlarız? güzel olmaz mı 🙂

Sevgiyle, sağlıkla, güzellikle kalın…

Bilgi

Bu yazı 3 aşamadan oluşmaktadır.

1.yazı genel mimari yapıları anlatır.

2.yazı Apache Kafka Hakkında her detaya yer verilir.

3.yazı ise bu yazıdır, uygulama aşamasıdır.

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir