Джейд Картер

Библиотеки Python Часть 2. Практическое применение


Скачать книгу

сортирует данные по указанному столбцу.

      – `write.csv` сохраняет результат в новом файле.

      Эти задачи демонстрируют, как использовать Dask и PySpark для работы с большими объемами данных.

      – Dask подходит для локальных задач и интеграции с Python-библиотеками.

      – PySpark эффективен для кластерной обработки данных и интеграции с экосистемой Hadoop.

      Обе библиотеки упрощают решение задач, которые сложно выполнить традиционными методами из-за ограничений памяти или мощности процессора.

1.2 Потоковая обработка данных с Apache Kafka

      Apache Kafka – это мощная платформа для обработки потоков данных в реальном времени. Она широко используется для обработки и анализа событий, поступающих из различных источников, таких как веб-серверы, базы данных, датчики IoT, системы мониторинга и многое другое. Kafka обеспечивает высокую производительность, надежность и масштабируемость, что делает её одним из лучших инструментов для потоковой обработки данных.

      В основе Apache Kafka лежат несколько ключевых компонентов:

      1. Брокеры – серверы, которые принимают, хранят и доставляют данные.

      2. Топики – логические каналы, через которые данные передаются.

      3. Продюсеры – приложения или устройства, которые отправляют данные в Kafka.

      4. Консьюмеры – приложения, которые получают данные из Kafka.

      Kafka организует поток данных в виде последовательностей сообщений. Сообщения записываются в топики и разделяются на партиции, что позволяет обрабатывать данные параллельно.

      Пример потока данных

      Представим, что у нас есть система интернет-магазина, где Kafka используется для обработки событий, таких как заказы, клики на странице, добавление товаров в корзину и платежи. Каждое из этих событий записывается в топик Kafka. Например, топик `orders` может содержать события, описывающие новые заказы.

      Установка и настройка Apache Kafka

      Перед началом работы убедитесь, что Kafka установлена. Для локальной работы используйте официальные сборки Kafka с сайта [Apache Kafka](https://kafka.apache.org/).

      1. Установите Kafka и запустите ZooKeeper, необходимый для управления брокерами.

      2. Запустите Kafka-брокер.

      3. Создайте топик с помощью команды:

      ```bash

      bin/kafka-topics.sh –create –topic orders –bootstrap-server localhost:9092 –partitions 3 –replication-factor 1

      ```

      Отправка данных в Kafka

      Теперь создадим простого продюсера на Python, который будет отправлять данные в топик `orders`. Для работы с Kafka на Python используется библиотека `confluent-kafka`. Установите её с помощью команды:

      ```bash

      pip install confluent-kafka

      ```

      Пример кода, который отправляет сообщения в топик:

      ```python

      from confluent_kafka import Producer

      import json

      import time

      # Настройки продюсера

      producer_config = {

      'bootstrap.servers': 'localhost:9092' # Адрес Kafka-брокера

      }

      # Создание продюсера

      producer = Producer(producer_config)

      # Функция для обратного вызова при успешной отправке сообщения

      def delivery_report(err, msg):

      if err is not None:

      print(f'Ошибка доставки сообщения: {err}')

      else:

      print(f'Сообщение отправлено: {msg.topic()} [{msg.partition()}]')

      # Отправка данных в Kafka

      orders = [

      {'order_id': 1, 'product': 'Laptop', 'price': 1000},

      {'order_id': 2, 'product': 'Phone', 'price': 500},

      {'order_id': 3, 'product': 'Headphones', 'price': 150}

      ]

      for order in orders:

      producer.produce(

      'orders',

      key=str(order['order_id']),

      value=json.dumps(order),

      callback=delivery_report

      )

      producer.flush()