Джейд Картер

Библиотеки Python Часть 2. Практическое применение


Скачать книгу

Преобразуем сообщение из Kafka в Python-объект

      event = json.loads(msg.value().decode('utf-8'))

      # Фильтруем события с URL, содержащими "product"

      if 'product' in event['url']:

      print(f"Фильтруем событие: {event}")

      produce_filtered_event(event)

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Консьюмер читает события из топика `clickstream`.

      – Каждое сообщение проверяется на наличие слова "product" в поле `url`.

      – Отфильтрованные события отправляются в новый топик `filtered_clicks` через продюсера.

      Задача 2: Подсчет количества событий в реальном времени

      Описание:

      Топик `log_events` содержит логи системы. Каждое сообщение содержит:

      – `log_level` (например, "INFO", "ERROR", "DEBUG").

      – `message` (текст лога).

      Ваша задача: написать программу, которая считает количество событий уровня "ERROR" в реальном времени и каждые 10 секунд выводит их общее количество.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import time

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'log-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['log_events'])

      error_count = 0

      start_time = time.time()

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      log_event = json.loads(msg.value().decode('utf-8'))

      # Увеличиваем счетчик, если уровень лога "ERROR"

      if log_event['log_level'] == 'ERROR':

      error_count += 1

      # Каждые 10 секунд выводим текущий счетчик

      if time.time() – start_time >= 10:

      print(f"Количество ошибок за последние 10 секунд: {error_count}")

      error_count = 0

      start_time = time.time()

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Консьюмер читает события из топика `log_events`.

      – Если уровень лога "ERROR", увеличивается счетчик `error_count`.

      – Каждые 10 секунд программа выводит количество событий "ERROR" и сбрасывает счетчик.

      Задача 3: Агрегация данных по группам

      Описание:

      Топик `transactions` содержит данные о финансовых транзакциях:

      – `user_id` – идентификатор пользователя.

      – `amount` – сумма транзакции.

      Ваша задача: написать программу, которая подсчитывает общую сумму транзакций для каждого пользователя и выводит результаты в реальном времени.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import json

      from collections import defaultdict

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'transaction-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['transactions'])

      # Словарь для хранения сумм по пользователям

      user_totals = defaultdict(float)

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      transaction = json.loads(msg.value().decode('utf-8'))

      # Обновляем сумму для пользователя

      user_id = transaction['user_id']

      user_totals[user_id] += transaction['amount']

      # Вывод текущих сумм

      print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")

      except