Чтение данных из топика orders
order_msg = order_consumer.poll(0.1)
if order_msg and not order_msg.error():
order = json.loads(order_msg.value().decode('utf-8'))
product_id = order['product_id']
# Объединение данных о заказе и товаре
if product_id in product_catalog:
product = product_catalog[product_id]
total_price = order['quantity'] * product['price']
print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")
else:
print(f"Информация о товаре {product_id} отсутствует.")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
order_consumer.close()
product_consumer.close()
```
Объяснение:
– Данные из топика `products` кэшируются в словаре `product_catalog`.
– При чтении заказа из топика `orders` программа объединяет данные и вычисляет итоговую стоимость.
Задача 7: Потоковая обработка с вычислением скользящего среднего
Описание:
В топик `stock_prices` поступают данные о ценах акций:
– `symbol` – тикер акции.
– `price` – текущая цена.
– `timestamp` – время.
Ваша задача: вычислять скользящее среднее цены акции за последние 5 сообщений для каждого тикера.
Решение:
```python
from confluent_kafka import Consumer
import json
from collections import defaultdict, deque
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'stocks-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['stock_prices'])
# Дек для хранения последних цен по тикерам
price_window = defaultdict(lambda: deque(maxlen=5))
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
stock_data = json.loads(msg.value().decode('utf-8'))
# Добавляем цену в окно
symbol = stock_data['symbol']
price_window[symbol].append(stock_data['price'])
# Вычисляем скользящее среднее
moving_average = sum(price_window[symbol]) / len(price_window[symbol])
print(f"Скользящее среднее для {symbol}: {moving_average:.2f}")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Используется `deque` для хранения последних 5 цен.
– Скользящее среднее вычисляется как сумма значений, делённая на их количество.
Задача 8: Генерация уведомлений
Описание:
В топик `user_actions` поступают данные о действиях пользователей:
– `user_id` – идентификатор пользователя.
– `action` – выполненное действие (например, "login", "purchase").
Напишите программу, которая отслеживает пользователей, выполнивших вход (`login`), но не совершивших покупку (`purchase`) в течение 10 минут, и отправляет уведомление в топик `notifications`.
Решение:
```python
from confluent_kafka import Consumer, Producer
import json
from datetime import datetime, timedelta
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'user-actions-group',
'auto.offset.reset': 'earliest'
})
producer = Producer({'bootstrap.servers': broker})
consumer.subscribe(['user_actions'])
# Словарь для отслеживания пользователей
user_login_time = {}
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
action = json.loads(msg.value().decode('utf-8'))
user_id = action['user_id']
action_type