Skip to content

Commit e53a349

Browse files
Adding projects created for content about How to ingest data to Elasticsearch through Apache Kafka (#353)
1 parent 9f9d099 commit e53a349

File tree

7 files changed

+256
-0
lines changed

7 files changed

+256
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/docker/elasticsearch/
2+
/elasticsearch/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Data Ingestion with Apache Kafka and Elasticsearch
2+
3+
This project demonstrates a data ingestion pipeline using **Apache Kafka** and **Elasticsearch** with **Python**. Messages are produced and consumed through Kafka, indexed in Elasticsearch, and visualized in Kibana.
4+
5+
## Project Structure
6+
7+
The infrastructure is managed with **Docker Compose**, which starts the following services:
8+
9+
- **Zookeeper**: Manages and coordinates the Kafka brokers.
10+
- **Kafka**: Responsible for distributing and storing messages.
11+
- **Elasticsearch**: Stores and indexes the messages for analysis.
12+
- **Kibana**: Visualization interface for data stored in Elasticsearch.
13+
14+
The **Producer** code sends messages to Kafka, while the **Consumer** reads and indexes these messages in Elasticsearch.
15+
16+
---
17+
18+
## Prerequisites
19+
20+
- **Docker and Docker Compose**: Ensure you have Docker and Docker Compose installed on your machine.
21+
- **Python 3.x**: To run the Producer and Consumer scripts.
22+
23+
---
24+
25+
## Configure the Producer and Consumer
26+
27+
### Producer
28+
The producer.py sends messages to the logs topic in Kafka in batches.
29+
It uses the batch_size and linger_ms settings to optimize message sending.
30+
````
31+
python producer.py
32+
````
33+
34+
### Consumer
35+
The consumer.py reads messages from the logs topic and indexes them in Elasticsearch. It consumes messages in batches and automatically commits the processing of messages.
36+
37+
````
38+
python consumer.py
39+
````
40+
41+
## Data Verification in Kibana
42+
After running the producer.py and consumer.py scripts, access Kibana at http://localhost:5601 to visualize the indexed data. Messages sent by the producer and processed by the consumer will be in the Elasticsearch index.
43+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
version: "3"
2+
3+
services:
4+
5+
zookeeper:
6+
image: confluentinc/cp-zookeeper:latest
7+
container_name: zookeeper
8+
environment:
9+
ZOOKEEPER_CLIENT_PORT: 2181
10+
11+
kafka:
12+
image: confluentinc/cp-kafka:latest
13+
container_name: kafka
14+
depends_on:
15+
- zookeeper
16+
ports:
17+
- "9092:9092"
18+
- "9094:9094"
19+
environment:
20+
KAFKA_BROKER_ID: 1
21+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
22+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
23+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
24+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
25+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
26+
27+
kafka-connect:
28+
image: confluentinc/cp-kafka-connect-base:6.0.0
29+
container_name: kafka-connect
30+
platform: linux/amd64
31+
depends_on:
32+
- zookeeper
33+
- kafka
34+
ports:
35+
- 8083:8083
36+
environment:
37+
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
38+
CONNECT_REST_PORT: 8083
39+
CONNECT_GROUP_ID: kafka-connect
40+
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
41+
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
42+
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
43+
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
44+
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
45+
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
46+
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
47+
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
48+
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
49+
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
50+
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
51+
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
52+
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
53+
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
54+
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
55+
volumes:
56+
- $PWD/data:/data
57+
command:
58+
- bash
59+
- -c
60+
- |
61+
echo "Installing Connector"
62+
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:10.0.1
63+
#
64+
echo "Launching Kafka Connect worker"
65+
/etc/confluent/docker/run &
66+
#
67+
sleep infinity
68+
69+
elasticsearch:
70+
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1
71+
container_name: elasticsearch-8.15.1
72+
environment:
73+
- node.name=elasticsearch
74+
- xpack.security.enabled=false
75+
- discovery.type=single-node
76+
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
77+
volumes:
78+
- ./elasticsearch:/usr/share/elasticsearch/data
79+
ports:
80+
- 9200:9200
81+
82+
kibana:
83+
image: docker.elastic.co/kibana/kibana:8.15.1
84+
container_name: kibana-8.15.1
85+
ports:
86+
- 5601:5601
87+
environment:
88+
ELASTICSEARCH_URL: http://elasticsearch:9200
89+
ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
name=elasticsearch-sink-connector
2+
topics=logs
3+
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
4+
connection.url=http://localhost:9200
5+
type.name=_doc
6+
value.converter=org.apache.kafka.connect.json.JsonConverter
7+
value.converter.schemas.enable=false
8+
schema.ignore=true
9+
key.ignore=true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from kafka import KafkaConsumer
2+
from elasticsearch import Elasticsearch, helpers
3+
from datetime import datetime
4+
import json
5+
6+
es = Elasticsearch(["http://localhost:9200"])
7+
8+
consumer = KafkaConsumer(
9+
"logs", # Topic name
10+
bootstrap_servers=["localhost:9092"],
11+
auto_offset_reset="latest", # Ensures reading from the latest offset if the group has no offset stored
12+
enable_auto_commit=True, # Automatically commits the offset after processing
13+
group_id="log_consumer_group", # Specifies the consumer group to manage offset tracking
14+
max_poll_records=10, # Maximum number of messages per batch
15+
fetch_max_wait_ms=2000, # Maximum wait time to form a batch (in ms)
16+
)
17+
18+
19+
def create_bulk_actions(logs):
20+
for log in logs:
21+
yield {
22+
"_index": "logs",
23+
"_source": {
24+
"level": log["level"],
25+
"message": log["message"],
26+
"timestamp": log["timestamp"],
27+
},
28+
}
29+
30+
31+
if __name__ == "__main__":
32+
try:
33+
print("Starting message consumption...")
34+
while True:
35+
36+
messages = consumer.poll(timeout_ms=1000)
37+
38+
# process each batch messages
39+
for _, records in messages.items():
40+
logs = [json.loads(record.value) for record in records]
41+
# print(logs)
42+
bulk_actions = create_bulk_actions(logs)
43+
response = helpers.bulk(es, bulk_actions)
44+
print(f"Indexed {response[0]} logs.")
45+
except Exception as e:
46+
print(f"Error: {e}")
47+
finally:
48+
consumer.close()
49+
print(f"Finish")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from datetime import datetime
2+
3+
from kafka import KafkaProducer
4+
import json
5+
import time
6+
import logging
7+
import random
8+
9+
logging.basicConfig(level=logging.INFO)
10+
logger = logging.getLogger("log_producer")
11+
12+
producer = KafkaProducer(
13+
bootstrap_servers=["localhost:9092"], # Specifies the Kafka server to connect
14+
value_serializer=lambda x: json.dumps(x).encode(
15+
"utf-8"
16+
), # Serializes data as JSON and encodes it to UTF-8 before sending
17+
batch_size=16384, # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending
18+
linger_ms=10, # Sets the maximum delay (in milliseconds) before sending the batch
19+
acks="all", # Specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge
20+
)
21+
22+
23+
def generate_log_message():
24+
25+
diff_seconds = random.uniform(300, 600)
26+
timestamp = time.time() - diff_seconds
27+
28+
log_messages = {
29+
"INFO": [
30+
"User login successful",
31+
"Database connection established",
32+
"Service started",
33+
"Payment processed",
34+
],
35+
"WARNING": ["Service stopped", "Payment may not have been processed"],
36+
"ERROR": ["User login failed", "Database connection failed", "Payment failed"],
37+
"DEBUG": ["Debugging user login flow", "Debugging database connection"],
38+
}
39+
40+
level = random.choice(list(log_messages.keys()))
41+
42+
message = random.choice(log_messages[level])
43+
44+
log_entry = {"level": level, "message": message, "timestamp": timestamp}
45+
46+
return log_entry
47+
48+
49+
def send_log_batches(topic, num_batches=5, batch_size=10):
50+
for i in range(num_batches):
51+
logger.info(f"Sending batch {i + 1}/{num_batches}")
52+
for _ in range(batch_size):
53+
log_message = generate_log_message()
54+
producer.send(topic, value=log_message)
55+
producer.flush()
56+
time.sleep(1)
57+
58+
59+
if __name__ == "__main__":
60+
topic = "logs"
61+
send_log_batches(topic)
62+
producer.close()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
kafka-python==2.0.2
2+
elasticsearch==7.10.0

0 commit comments

Comments
 (0)