задача: написать программу, которая сохраняет все данные о температуре выше 30°C в файл `high_temp.json`.
Решение:
```python
from confluent_kafka import Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'sensor-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['sensor_data'])
# Открываем файл для записи
with open('high_temp.json', 'w') as outfile:
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
sensor_data = json.loads(msg.value().decode('utf-8'))
# Сохраняем данные, если температура выше 30°C
if sensor_data['temperature'] > 30:
json.dump(sensor_data, outfile)
outfile.write('\n') # Новый ряд для каждого объекта
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает данные из топика `sensor_data`.
– Данные с температурой выше 30°C записываются в файл `high_temp.json`.
Задача 5: Обнаружение аномалий в данных
Описание:
В топик `temperature_readings` поступают данные о температуре из различных городов:
– `city` – название города.
– `temperature` – измеренная температура.
– `timestamp` – время измерения.
Ваша задача: написать программу, которая будет находить и выводить аномалии – случаи, когда температура превышает 40°C или опускается ниже -10°C.
Решение:
```python
from confluent_kafka import Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'temperature-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['temperature_readings'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
reading = json.loads(msg.value().decode('utf-8'))
# Проверяем на аномалии
if reading['temperature'] > 40 or reading['temperature'] < -10:
print(f"Аномалия! Город: {reading['city']}, Температура: {reading['temperature']}°C")
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает данные о температуре из топика.
– Если температура выходит за пределы нормального диапазона, программа выводит сообщение об аномалии.
Задача 6: Потоковое объединение данных
Описание:
Есть два топика:
1. `orders` – содержит данные о заказах: `order_id`, `product_id`, `quantity`.
2. `products` – содержит данные о товарах: `product_id`, `product_name`, `price`.
Ваша задача: написать программу, которая объединяет данные из этих двух топиков и выводит итоговую информацию о каждом заказе, включая название продукта и общую стоимость.
Решение:
```python
from confluent_kafka import Consumer
import json
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмеров для обоих топиков
order_consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'order-group',
'auto.offset.reset': 'earliest'
})
product_consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'product-group',
'auto.offset.reset':