یادگیری ماشین قوی در جریان داده ها با استفاده از Kafka و Tensorflow-IO

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش در جریان داده ها از یک تمرکز کافکا خوشه به یک tf.data.Dataset است که پس از آن در رابطه با استفاده tf.keras برای آموزش و استنتاج است.

کافکا در درجه اول یک پلتفرم پخش رویداد توزیع شده است که داده‌های جریانی مقیاس‌پذیر و مقاوم در برابر خطا را در خطوط لوله داده ارائه می‌دهد. این یک جزء فنی ضروری از مجموعه‌ای از شرکت‌های بزرگ است که در آن تحویل داده‌های حیاتی یک نیاز اولیه است.

برپایی

بسته های tensorflow-io و kafka مورد نیاز را نصب کنید

pip install tensorflow-io
pip install kafka-python

بسته های وارداتی

import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

واردات tf و tfio را تأیید کنید

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1
tensorflow version: 2.8.0-rc0

نمونه های Kafka و Zookeeper را دانلود و تنظیم کنید

برای اهداف نمایشی، نمونه های زیر به صورت محلی تنظیم می شوند:

  • کافکا (کارگزاران: 127.0.0.1: 9092)
  • Zookeeper (Node: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz

استفاده از تنظیمات پیش فرض (ارائه شده توسط آپاچی کافکا) برای چرخش نمونه ها.

./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running

هنگامی که موارد به عنوان فرآیندهای شبح، grep برای آغاز kafka در لیست Processes. دو فرآیند جاوا با نمونه‌های zookeeper و kafka مطابقت دارند.

ps -ef | grep kafka
kbuilder 27856 20044  4 20:28 ?        00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000
kbuilder 28271     1 16 20:28 ?        00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties
kbuilder 28635     1 57 20:28 ?        00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties
kbuilder 28821 27860  0 20:28 pts/0    00:00:00 /bin/bash -c ps -ef | grep kafka
kbuilder 28823 28821  0 20:28 pts/0    00:00:00 grep kafka

موضوعات کافکا را با مشخصات زیر ایجاد کنید:

  • susy-train: partitions=1، replication-factor=1
  • susy-test: partitions=2، replication-factor=1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train.
Created topic susy-test.

موضوع را برای جزئیات بیشتر در مورد پیکربندی توضیح دهید

./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-train Partition: 0  Leader: 0 Replicas: 0   Isr: 0
Topic: susy-test  PartitionCount: 2 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-test  Partition: 0  Leader: 0 Replicas: 0   Isr: 0
    Topic: susy-test  Partition: 1  Leader: 0 Replicas: 0   Isr: 0

ضریب تکرار 1 نشان می دهد که داده ها تکرار نمی شوند. این به دلیل وجود یک کارگزار واحد در مجموعه کافکا ما است. در سیستم های تولید، تعداد سرورهای بوت استرپ می تواند در محدوده 100 نود باشد. اینجاست که تحمل خطا با استفاده از تکرار نمایان می شود.

لطفا به مراجعه اسناد برای جزئیات بیشتر.

مجموعه داده SUSY

کافکا به عنوان یک پلتفرم جریان رویداد، امکان نوشتن اطلاعات از منابع مختلف را در آن فراهم می کند. برای مثال:

  • سیاهههای مربوط به ترافیک وب
  • اندازه گیری های نجومی
  • داده های حسگر اینترنت اشیا
  • بررسی محصول و بسیاری موارد دیگر.

برای هدف از این آموزش، اجازه می دهد تا دانلود SUSY مجموعه داده و تغذیه داده ها را به کافکا دستی. هدف این مشکل طبقه بندی تمایز بین یک فرآیند سیگنالی است که ذرات فوق متقارن تولید می کند و یک فرآیند پس زمینه که تولید نمی کند.

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

مجموعه داده را کاوش کنید

اولین ستون برچسب کلاس (1 برای سیگنال، 0 برای پس‌زمینه)، و به دنبال آن 18 ویژگی (8 ویژگی سطح پایین و سپس 10 ویژگی سطح بالا) است. 8 ویژگی اول ویژگی های سینماتیکی هستند که توسط آشکارسازهای ذرات در شتاب دهنده اندازه گیری می شوند. 10 ویژگی آخر عملکردهای 8 ویژگی اول هستند. اینها ویژگی های سطح بالایی هستند که توسط فیزیکدانان برای کمک به تمایز بین این دو طبقه به دست آمده اند.

COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

کل مجموعه داده شامل 5 میلیون ردیف است. با این حال، برای هدف این آموزش، اجازه دهید تنها کسری از مجموعه داده (100000 ردیف) را در نظر بگیریم تا زمان کمتری برای جابجایی داده ها و زمان بیشتری برای درک عملکرد api صرف شود.

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)

مجموعه داده را تقسیم کنید

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples:  60000
Number of testing sample:  40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)

اطلاعات قطار و تست را در کافکا ذخیره کنید

ذخیره داده ها در کافکا محیطی را برای بازیابی مداوم داده از راه دور برای اهداف آموزشی و استنتاج شبیه سازی می کند.

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test

مجموعه داده قطار tfio را تعریف کنید

IODataset کلاس برای جریان داده ها را از کافکا به tensorflow استفاده شده است. به ارث برده کلاس از tf.data.Dataset و در نتیجه دارای تمام ویژگی مفید از tf.data.Dataset خارج از جعبه.

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

مدل را بسازید و آموزش دهید

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense (Dense)               (None, 128)               2432      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 129       
                                                                 
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?"
  return dispatch_target(*args, **kwargs)
938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691
Epoch 2/10
938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875
Epoch 3/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911
Epoch 4/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940
Epoch 5/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934
Epoch 6/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933
Epoch 7/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935
Epoch 8/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950
Epoch 9/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956
Epoch 10/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962
<keras.callbacks.History at 0x7fb364fd2a90>

از آنجایی که تنها بخشی از مجموعه داده در حال استفاده است، دقت ما در مرحله آموزش به 78٪ محدود می شود. با این حال، لطفاً برای عملکرد بهتر مدل، داده های اضافی را در کافکا ذخیره کنید. همچنین، از آنجایی که هدف فقط نشان دادن عملکرد مجموعه داده‌های tfio کافکا بود، از یک شبکه عصبی کوچک‌تر و کم‌پیچیدگی استفاده شد. با این حال، می توان پیچیدگی مدل را افزایش داد، استراتژی یادگیری را اصلاح کرد، پارامترهای فوق را تنظیم کرد و غیره برای اهداف اکتشافی. برای یک رویکرد پایه، لطفا به این اشاره مقاله .

استنتاج از داده های آزمایش

برای پی بردن به در داده ها از آزمون با پیوستن به معناشناسی، دقیقا یک بار 'همراه با تحمل خطا از streaming.KafkaGroupIODataset می تواند مورد استفاده.

مجموعه داده تست tfio را تعریف کنید

stream_timeout بلوک پارامتر برای مدت زمان داده شده برای نقاط داده های جدید به موضوع پخش می شود. در صورتی که داده ها به صورت متناوب به موضوع منتقل شوند، این نیاز به ایجاد مجموعه داده های جدید را از بین می برد.

test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)

اگرچه می توان از این کلاس برای اهداف آموزشی استفاده کرد، اما نکاتی وجود دارد که باید مورد توجه قرار گیرد. هنگامی که تمام پیام ها از کافکا به عنوان خوانده شده و آخرین شیپور خاموشی با استفاده از متعهد streaming.KafkaGroupIODataset ، مصرف کننده می کند راه اندازی مجدد نیست خواندن پیام از ابتدا. بنابراین، در حین آموزش، تنها امکان آموزش برای یک دوره واحد با داده‌ها به طور مداوم وجود دارد. این نوع عملکرد، موارد استفاده محدودی در طول مرحله آموزش دارد که در آن، زمانی که یک نقطه داده توسط مدل مصرف شد، دیگر قابل استفاده نیست. مورد نیاز است و می توان آن را دور انداخت.

با این حال، زمانی که صحبت از استنتاج قوی با معنایی دقیقاً یک بار به میان می آید، این عملکرد می درخشد.

ارزیابی عملکرد بر روی داده های آزمون

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194
2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915
test loss, test acc: [0.4436523914337158, 0.7915250062942505]
2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

از آنجایی که استنتاج بر اساس معنایی «دقیقاً یک بار» است، ارزیابی روی مجموعه آزمایشی تنها یک بار می‌تواند اجرا شود. برای اجرای مجدد استنتاج روی داده های آزمایشی، باید از یک گروه مصرف کننده جدید استفاده شود.

پیگیری تاخیر افست testcg گروه مصرف کننده

./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
testcg          susy-test       0          21626           21626           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka
testcg          susy-test       1          18374           18374           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka

هنگامی که current-offset مسابقات log-end-offset برای همه پارتیشن، آن را نشان می دهد که مصرف کننده (بازدید کنندگان) را تکمیل کرده اند واکشی تمام پیام ها را از موضوع کافکا.

یادگیری آنلاین

پارادایم یادگیری ماشین آنلاین کمی با روش سنتی/معمولی آموزش مدل‌های یادگیری ماشین متفاوت است. در حالت اول، به محض اینکه نقاط داده جدید در دسترس هستند، مدل به یادگیری/به روز رسانی تدریجی پارامترهای خود ادامه می دهد و انتظار می رود این فرآیند به طور نامحدود ادامه یابد. این است بر خلاف روش دوم که در آن مجموعه داده ثابت است و مدل آن را تکرار می کند n تعداد بار. در یادگیری آنلاین، داده‌هایی که یک بار توسط مدل مصرف می‌شوند ممکن است دوباره برای آموزش در دسترس نباشند.

با استفاده از streaming.KafkaBatchIODataset ، آن است که در حال حاضر امکان آموزش مدل در این مد. بیایید به استفاده از مجموعه داده SUSY خود برای نشان دادن این عملکرد ادامه دهیم.

مجموعه داده آموزشی tfio برای یادگیری آنلاین

streaming.KafkaBatchIODataset شبیه به است streaming.KafkaGroupIODataset در API آن است. علاوه بر این، توصیه می شود به استفاده از stream_timeout پارامتر پیکربندی مدت زمانی که مجموعه داده خواهد برای پیام های جدید قبل از زمان بندی کردن بلوک. در مثال زیر، مجموعه داده های با پیکربندی stream_timeout از 10000 میلی ثانیه است. این بدان معناست که پس از مصرف همه پیام‌های موضوع، مجموعه داده‌ها قبل از اتمام زمان و قطع ارتباط با خوشه کافکا، 10 ثانیه دیگر منتظر می‌ماند. اگر پیام‌های جدید قبل از اتمام زمان به موضوع ارسال شود، مصرف داده و آموزش مدل برای آن نقاط داده تازه مصرف‌شده از سر گرفته می‌شود. به صورت نامحدود، آن را به مجموعه -1 .

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

هر آیتم که online_train_ds تولید است tf.data.Dataset در خود دارد. بنابراین، تمام تبدیل های استاندارد را می توان به طور معمول اعمال کرد.

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933
2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

مدل آموزش تدریجی را می توان به صورت دوره ای (بر اساس موارد استفاده) ذخیره کرد و می توان از آن برای استنتاج داده های آزمایشی در حالت آنلاین یا آفلاین استفاده کرد.

منابع:

  • بالدی، پی، پی سادوسکی و دی وایتسون. "جستجوی ذرات عجیب و غریب در فیزیک پر انرژی با یادگیری عمیق." Nature Communications 5 (2 ژوئیه 2014)

  • SUSY مجموعه داده: https://archive.ics.uci.edu/ml/datasets/SUSY#