Джейд Картер

Библиотеки Python Часть 2. Практическое применение


Скачать книгу

Чтение данных из топика orders

      order_msg = order_consumer.poll(0.1)

      if order_msg and not order_msg.error():

      order = json.loads(order_msg.value().decode('utf-8'))

      product_id = order['product_id']

      # Объединение данных о заказе и товаре

      if product_id in product_catalog:

      product = product_catalog[product_id]

      total_price = order['quantity'] * product['price']

      print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")

      else:

      print(f"Информация о товаре {product_id} отсутствует.")

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      order_consumer.close()

      product_consumer.close()

      ```

      Объяснение:

      – Данные из топика `products` кэшируются в словаре `product_catalog`.

      – При чтении заказа из топика `orders` программа объединяет данные и вычисляет итоговую стоимость.

      Задача 7: Потоковая обработка с вычислением скользящего среднего

      Описание:

      В топик `stock_prices` поступают данные о ценах акций:

      – `symbol` – тикер акции.

      – `price` – текущая цена.

      – `timestamp` – время.

      Ваша задача: вычислять скользящее среднее цены акции за последние 5 сообщений для каждого тикера.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import json

      from collections import defaultdict, deque

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'stocks-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['stock_prices'])

      # Дек для хранения последних цен по тикерам

      price_window = defaultdict(lambda: deque(maxlen=5))

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      stock_data = json.loads(msg.value().decode('utf-8'))

      # Добавляем цену в окно

      symbol = stock_data['symbol']

      price_window[symbol].append(stock_data['price'])

      # Вычисляем скользящее среднее

      moving_average = sum(price_window[symbol]) / len(price_window[symbol])

      print(f"Скользящее среднее для {symbol}: {moving_average:.2f}")

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Используется `deque` для хранения последних 5 цен.

      – Скользящее среднее вычисляется как сумма значений, делённая на их количество.

      Задача 8: Генерация уведомлений

      Описание:

      В топик `user_actions` поступают данные о действиях пользователей:

      – `user_id` – идентификатор пользователя.

      – `action` – выполненное действие (например, "login", "purchase").

      Напишите программу, которая отслеживает пользователей, выполнивших вход (`login`), но не совершивших покупку (`purchase`) в течение 10 минут, и отправляет уведомление в топик `notifications`.

      Решение:

      ```python

      from confluent_kafka import Consumer, Producer

      import json

      from datetime import datetime, timedelta

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'user-actions-group',

      'auto.offset.reset': 'earliest'

      })

      producer = Producer({'bootstrap.servers': broker})

      consumer.subscribe(['user_actions'])

      # Словарь для отслеживания пользователей

      user_login_time = {}

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      action = json.loads(msg.value().decode('utf-8'))

      user_id = action['user_id']

      action_type