Setting up Kafka stream

Description

I have kafka and zookeeper running using docker-compose from (Docker Hub) official site, and also a memgraph instance 1.6.0. Before doing anything with memgraph I have created a dummy topic test by entering into kafka container and running:

kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
and confirmed it with:
kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
which confirms that I have created everything successfully

After that i have used simple transformation script shown in just to run it end to end, and transformation script was loaded successfully. But when I try to create stream using:
CREATE STREAM testStream TOPICS test TRANSFORM transformations.transform_test;
I get the following error:
Client received exception: Failed to initialize Kafka consumer testStream : Local: Broker transport failure
and I am not sure why that is.
I am not sure where to debug the error, is the problem between communication with memgraph and kafka or something else?

Steps to reproduce

Just run this in docker-compose file:
`
version: “3”

services:
zookeeper:
image: bitnami/zookeeper:3.7
ports:
- “2181:2181”
volumes:
- “zookeeper_data:/bitnami”
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- app-tier

kafka:
image: bitnami/kafka:2
ports:
- “9092:9092”
volumes:
- “kafka_data:/bitnami”
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
networks:
- app-tier

memgraph:
image: memgraph/memgraph:latest
volumes:
- “./query_modules:/usr/lib/memgraph/query_modules”
ports:
- 7687:7687
networks:
- app-tier

volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local

networks:

  • app-tier:
    `
    And then create a dummy transformation script from memgraph tutorials, and try to create topic test, and just create stream with already given command.

Expected behavior

I just wanted to have stream to continue the process…

Your environment

Linux manjaro 5.9.16-1-MANJARO #1 SMP PREEMPT Mon Dec 21 22:00:46 UTC 2020 x86_64 GNU/Linux
Memgraph 1.6.0

Whoops my bad, when running in docker I had to change bootstrap-server to kafka:9092…
So the only difference in my working configuration is the addition of:
entrypoint: ["/usr/lib/memgraph/memgraph", "--kafka-bootstrap-servers=kafka:9092"]
in docker-compose.yml file under memgraph service.

1 Like

and now when I have create a steam with test topic, I have everything working and have a producer publishing to my topic, but memgraph is not receiving anything… I ahve tested with kafkas consumer client scripts and I can see the data on my stream.

When I try to check the stream using:
CHECK STREAM steam BATCH_LIMIT 3 TIMEOUT 60000;
I get Client received exception: Kafka consumer steam check failed: timeout reached…
When I run SHOW STREAMS; I can clearly see my stream has been created. There is nothing in logs… Does anyone know how I might find more information about this?

When you run SHOW STREAMS does that show that your stream is running or not?

I get this row:

  • name: “stream”
  • topics: [“steam”]
  • consumer_group: “mg_consumer”
  • batch_interval: Null
  • batch_size: Null
  • transformation_name: transformations.transform_test
  • is running: False

But that’s because I didn’t want to start it yet, I was running:
CHECK STREAM steam BATCH_LIMIT 3 TIMEOUT 60000; and as far I understand from the documentation I don’t have to start the stream for the check.

Are you pushing messages to the kafka topics while the CHECK STREAM query is running? Based on the documentation:

When a stream is started, it should resume from the last committed offset. If there is no committed offset for the consumer group, then the largest offset will be used, therefore only the new messages will be consumed.

I am constantly pushing messages, and have:
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092 running in the background to check if the messages are arriving… I can see them in the console, but memgraph does not receive anything… While I am seeing thoes messages under test topic, I cant seem to connect the stream with memgraph…

Soo it seems I needed to study more kafka configuration…
I have change the docker-compose file env setup for kafka to this:

    environment:
      - KAFKA_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT

and now its working… As stated in Docker Hub in chapter Accessing Kafka with internal and external clients.

Hello! Wondering if y’all can put up a GitHub repo with the above work already included in it? Would love to get a head start on the Kafka stuff

Hi, of course, you can see our whole setup here GitHub - memgraph/boolean-pundits: Memgraph Hackaton Boolean-Pundits team., we have 4 services up and running producer, kafka, zookeeper and memgraph. There are few shortcoming for now, but as always we will remember them and resolve them later on :smiley:

Cool! This seemed to all work! Going to try to get a dummy producer feeding the graph unless the Spotify stream above is intended to actually have data flowing from it?

You can try the spotify stream for the duration of the week, otherwise you can check out the commit history for a simpler producer.

Thanks! Feels like I’m really close but I’m not getting to the point where Memgraph is executing the transforms based on the stream.

If the hosted kafka instance at the AWS address is working, shouldn’t I be able to spin up a memgraph instance that listens to that and builds a graph accordingly? If so, can you describe how I would get to that point?

You can check that the stream is working with the following command:

kafka-console-consumer.sh --bootstrap-server ec2-34-245-33-31.eu-west-1.compute.amazonaws.com:9092 --topic spotify --from-beginning

Regarding Memgraph, you need to run it while specifying the host and port of the kafka stream.
For our application, instead of giving the following argument:

--kafka-bootstrap-servers=kafka:9092

you need to specify the aws address:

--kafka-bootstrap-servers=ec2-34-245-33-31.eu-west-1.compute.amazonaws.com:9092

I believe that’s the solution you’re looking for.

You can check out here all the code needed to use the spotify stream here Boolean Pundints. All you need is an transformation thas has been loaded into memgraph and a then create the stream along with the defined transformation.

Ah got it! Woot, thanks y’all. More later.

Since there have been a lot of changes on the mentioned repo here is a gist Apache Kafka & Memgraph · GitHub to remember the initial setup :slight_smile:

1 Like