Джейд Картер

Библиотеки Python Часть 2. Практическое применение


Скачать книгу

на топик

      consumer.subscribe(['orders'])

      # Чтение сообщений из Kafka

      try:

      while True:

      msg = consumer.poll(1.0) # Ожидание сообщения (1 секунда)

      if msg is None:

      continue

      if msg.error():

      if msg.error().code() == KafkaException._PARTITION_EOF:

      # Конец партиции

      continue

      else:

      print(f"Ошибка: {msg.error()}")

      break

      # Обработка сообщения

      print(f"Получено сообщение: {msg.value().decode('utf-8')}")

      except KeyboardInterrupt:

      print("Завершение работы…")

      finally:

      # Закрытие консьюмера

      consumer.close()

      ```

      В этом примере консьюмер подключается к Kafka, читает сообщения из топика `orders` и выводит их на экран.

      Потоковая обработка данных

      Kafka часто используется совместно с платформами потоковой обработки, такими как Apache Spark или Apache Flink, для анализа данных в реальном времени. Однако вы также можете обрабатывать данные прямо в Python.

      Например, предположим, что мы хотим обработать события из топика `orders` и рассчитать суммарную стоимость всех заказов:

      ```python

      from confluent_kafka import Consumer

      import json

      # Настройки консьюмера

      consumer_config = {

      'bootstrap.servers': 'localhost:9092',

      'group.id': 'order-sum-group',

      'auto.offset.reset': 'earliest'

      }

      # Создание консьюмера

      consumer = Consumer(consumer_config)

      consumer.subscribe(['orders'])

      # Суммарная стоимость заказов

      total_sales = 0

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Обработка сообщения

      order = json.loads(msg.value().decode('utf-8'))

      total_sales += order['price']

      print(f"Обработан заказ: {order['order_id']}, текущая сумма: {total_sales}")

      except KeyboardInterrupt:

      print(f"Общая сумма всех заказов: {total_sales}")

      finally:

      consumer.close()

      ```

      Преимущества использования Kafka

      1. Высокая производительность. Kafka поддерживает миллионы событий в секунду благодаря своей архитектуре и использованию партиций.

      2. Надежность. Данные хранятся в Kafka до тех пор, пока их не обработают все подписчики.

      3. Масштабируемость. Kafka легко масштабируется путем добавления новых брокеров.

      4. Универсальность. Kafka поддерживает интеграцию с большинством современных инструментов обработки данных.

      Apache Kafka предоставляет мощный набор инструментов для потоковой обработки данных. Используя Python, вы можете легко настроить передачу данных, их обработку и анализ в реальном времени. Это особенно полезно для систем, где требуется высокая производительность и минимальная задержка при обработке больших потоков данных.

Задачи для практики

      Задача 1: Фильтрация событий по условию

      Описание:

      У вас есть топик `clickstream`, содержащий события о кликах на веб-сайте. Каждое событие содержит следующие поля:

      – `user_id` – идентификатор пользователя.

      – `url` – URL-адрес, на который был клик.

      – `timestamp` – время клика.

      Ваша задача: создать консьюмера, который будет читать события из Kafka, фильтровать только события с URL-адресами, содержащими слово "product", и сохранять их в новый топик `filtered_clicks`.

      Решение:

      ```python

      from confluent_kafka import Producer, Consumer

      import json

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание продюсера для записи в новый топик

      producer = Producer({'bootstrap.servers':