Kafka Consumer & Producer

Kafka 0.11.0.0 with kafka-manager running with docker-compose (Python & Docker yaml)

6

Votes

Consumer.py

import os
from kafka import KafkaConsumer
from settings import BOOTSTRAP_SERVERS, TOPIC

consumer = KafkaConsumer(TOPIC, bootstrap_servers=BOOTSTRAP_SERVERS, auto_offset_reset='earliest')

print("connected to: {}".format(BOOTSTRAP_SERVERS))

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

Producer.py

!/usr/bin/env python

import string import random from kafka import KafkaProducer from settings import BOOTSTRAP_SERVERS, TOPIC

producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)

for _ in range(100): msg = "message {}".format(_) key = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)) print(msg) producer.send(TOPIC, key=bytes(key, encoding='utf-8'), value=bytes(msg, encoding='utf-8')).get()

settings.py BOOTSTRAP_SERVERS="192.168.39.1:9093" TOPIC="__consumer_offsets"

docker-compose.yml version: "3" services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: image: wurstmeister/kafka:0.11.0.0 environment: KAFKA_BROKER_ID: 901 KAFKA_CREATE_TOPICS: "topic_1:1:1:compact,topic_2:1:1:compact" KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_ADVERTISED_HOST_NAME: 192.168.39.1 KAFKA_ADVERTISED_PORT: 9092 KAFKA_NUM_PARTITIONS: 2 KAFKA_ZOOKEEPER_CONNECT: zk:2181 KAFKA_DELETE_TOPIC_ENABLE: 'true' ports: - "9092:9092" - "9992:9999" links: - zookeeper:zk

kafka2:
    image: wurstmeister/kafka:0.11.0.0
    environment:
        KAFKA_BROKER_ID: 902
        KAFKA_CREATE_TOPICS: "topic_1:1:1:compact,topic_2:1:1:compact"
        KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
        KAFKA_ADVERTISED_HOST_NAME: 192.168.39.1
        KAFKA_ADVERTISED_PORT: 9093
        KAFKA_NUM_PARTITIONS: 2
        KAFKA_ZOOKEEPER_CONNECT: zk:2181
        KAFKA_DELETE_TOPIC_ENABLE: 'true'
    ports:
        - "9093:9092"
        - "9993:9999"
    links:
        - zookeeper:zk

kafka3:
    image: wurstmeister/kafka:0.11.0.0
    environment:
        KAFKA_BROKER_ID: 903
        KAFKA_CREATE_TOPICS: "topic_1:1:1:compact,topic_2:1:1:compact"
        KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
        KAFKA_ADVERTISED_HOST_NAME: 192.168.39.1
        KAFKA_ADVERTISED_PORT: 9094
        KAFKA_NUM_PARTITIONS: 2
        KAFKA_ZOOKEEPER_CONNECT: zk:2181
        KAFKA_DELETE_TOPIC_ENABLE: 'true'
    ports:
        - "9094:9092"
        - "9994:9999"
    links:
        - zookeeper:zk

kafka-manager:
    image: sheepkiller/kafka-manager:latest
    ports:
        - "9000:9000"
    links:
        - zookeeper
        - kafka1
        - kafka2
        - kafka3
    environment:
        ZK_HOSTS: zookeeper:2181
        APPLICATION_SECRET: letmein
        KM_ARGS: -Djava.net.preferIPv4Stack=true

Vote Here

You must earn at least 1 vote on your snippets to be allowed to vote

Terms Of Use

Privacy Policy

Featured snippets are MIT license

Gears & Masters

Advertise

DevOpsnipp.com © 2020

medium.png