на топик
consumer.subscribe(['orders'])
# Чтение сообщений из Kafka
try:
while True:
msg = consumer.poll(1.0) # Ожидание сообщения (1 секунда)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# Конец партиции
continue
else:
print(f"Ошибка: {msg.error()}")
break
# Обработка сообщения
print(f"Получено сообщение: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
print("Завершение работы…")
finally:
# Закрытие консьюмера
consumer.close()
```
В этом примере консьюмер подключается к Kafka, читает сообщения из топика `orders` и выводит их на экран.
Потоковая обработка данных
Kafka часто используется совместно с платформами потоковой обработки, такими как Apache Spark или Apache Flink, для анализа данных в реальном времени. Однако вы также можете обрабатывать данные прямо в Python.
Например, предположим, что мы хотим обработать события из топика `orders` и рассчитать суммарную стоимость всех заказов:
```python
from confluent_kafka import Consumer
import json
# Настройки консьюмера
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-sum-group',
'auto.offset.reset': 'earliest'
}
# Создание консьюмера
consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])
# Суммарная стоимость заказов
total_sales = 0
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Обработка сообщения
order = json.loads(msg.value().decode('utf-8'))
total_sales += order['price']
print(f"Обработан заказ: {order['order_id']}, текущая сумма: {total_sales}")
except KeyboardInterrupt:
print(f"Общая сумма всех заказов: {total_sales}")
finally:
consumer.close()
```
Преимущества использования Kafka
1. Высокая производительность. Kafka поддерживает миллионы событий в секунду благодаря своей архитектуре и использованию партиций.
2. Надежность. Данные хранятся в Kafka до тех пор, пока их не обработают все подписчики.
3. Масштабируемость. Kafka легко масштабируется путем добавления новых брокеров.
4. Универсальность. Kafka поддерживает интеграцию с большинством современных инструментов обработки данных.
Apache Kafka предоставляет мощный набор инструментов для потоковой обработки данных. Используя Python, вы можете легко настроить передачу данных, их обработку и анализ в реальном времени. Это особенно полезно для систем, где требуется высокая производительность и минимальная задержка при обработке больших потоков данных.
Задача 1: Фильтрация событий по условию
Описание:
У вас есть топик `clickstream`, содержащий события о кликах на веб-сайте. Каждое событие содержит следующие поля:
– `user_id` – идентификатор пользователя.
– `url` – URL-адрес, на который был клик.
– `timestamp` – время клика.
Ваша задача: создать консьюмера, который будет читать события из Kafka, фильтровать только события с URL-адресами, содержащими слово "product", и сохранять их в новый топик `filtered_clicks`.
Решение:
```python
from confluent_kafka import Producer, Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание продюсера для записи в новый топик
producer = Producer({'bootstrap.servers':