Джейд Картер

Библиотеки Python Часть 2. Практическое применение


Скачать книгу

задача: написать программу, которая сохраняет все данные о температуре выше 30°C в файл `high_temp.json`.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import json

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'sensor-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['sensor_data'])

      # Открываем файл для записи

      with open('high_temp.json', 'w') as outfile:

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      sensor_data = json.loads(msg.value().decode('utf-8'))

      # Сохраняем данные, если температура выше 30°C

      if sensor_data['temperature'] > 30:

      json.dump(sensor_data, outfile)

      outfile.write('\n') # Новый ряд для каждого объекта

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Консьюмер читает данные из топика `sensor_data`.

      – Данные с температурой выше 30°C записываются в файл `high_temp.json`.

      Задача 5: Обнаружение аномалий в данных

      Описание:

      В топик `temperature_readings` поступают данные о температуре из различных городов:

      – `city` – название города.

      – `temperature` – измеренная температура.

      – `timestamp` – время измерения.

      Ваша задача: написать программу, которая будет находить и выводить аномалии – случаи, когда температура превышает 40°C или опускается ниже -10°C.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import json

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'temperature-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['temperature_readings'])

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      reading = json.loads(msg.value().decode('utf-8'))

      # Проверяем на аномалии

      if reading['temperature'] > 40 or reading['temperature'] < -10:

      print(f"Аномалия! Город: {reading['city']}, Температура: {reading['temperature']}°C")

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Консьюмер читает данные о температуре из топика.

      – Если температура выходит за пределы нормального диапазона, программа выводит сообщение об аномалии.

      Задача 6: Потоковое объединение данных

      Описание:

      Есть два топика:

      1. `orders` – содержит данные о заказах: `order_id`, `product_id`, `quantity`.

      2. `products` – содержит данные о товарах: `product_id`, `product_name`, `price`.

      Ваша задача: написать программу, которая объединяет данные из этих двух топиков и выводит итоговую информацию о каждом заказе, включая название продукта и общую стоимость.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import json

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмеров для обоих топиков

      order_consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'order-group',

      'auto.offset.reset': 'earliest'

      })

      product_consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'product-group',

      'auto.offset.reset':