TensorFlow.org पर देखें | Google Colab में चलाएं | GitHub पर स्रोत देखें | नोटबुक डाउनलोड करें |
अवलोकन
इस ट्यूटोरियल एक से डेटा स्ट्रीमिंग पर केंद्रित है काफ्का एक में क्लस्टर tf.data.Dataset
जो तब के साथ संयोजन के रूप में प्रयोग किया जाता है tf.keras
प्रशिक्षण और अनुमान के लिए।
काफ्का मुख्य रूप से एक वितरित इवेंट-स्ट्रीमिंग प्लेटफॉर्म है जो डेटा पाइपलाइनों में स्केलेबल और दोष-सहिष्णु स्ट्रीमिंग डेटा प्रदान करता है। यह प्रमुख उद्यमों के ढेरों का एक आवश्यक तकनीकी घटक है जहां मिशन-महत्वपूर्ण डेटा वितरण एक प्राथमिक आवश्यकता है।
सेट अप
आवश्यक टेंसरफ़्लो-आईओ और काफ्का पैकेज स्थापित करें
pip install tensorflow-io
pip install kafka-python
पैकेज आयात करें
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
मान्य tf और tfio आयात
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1 tensorflow version: 2.8.0-rc0
काफ्का और ज़ुकीपर इंस्टेंस को डाउनलोड और सेटअप करें
डेमो उद्देश्यों के लिए, निम्नलिखित उदाहरण स्थानीय रूप से सेट किए गए हैं:
- काफ्का (दलाल: 127.0.0.1:9092)
- ज़ुकीपर (नोड: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz
उदाहरणों को स्पिन करने के लिए डिफ़ॉल्ट कॉन्फ़िगरेशन (अपाचे काफ्का द्वारा प्रदान किया गया) का उपयोग करना।
./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running
एक बार जब उदाहरणों डेमॉन प्रक्रियाओं, के लिए ग्रेप के रूप में शुरू कर रहे हैं kafka
प्रक्रियाओं की सूची में। दो जावा प्रक्रियाएं ज़ूकीपर और काफ्का उदाहरणों के अनुरूप हैं।
ps -ef | grep kafka
kbuilder 27856 20044 4 20:28 ? 00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000 kbuilder 28271 1 16 20:28 ? 00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties kbuilder 28635 1 57 20:28 ? 00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties kbuilder 28821 27860 0 20:28 pts/0 00:00:00 /bin/bash -c ps -ef | grep kafka kbuilder 28823 28821 0 20:28 pts/0 00:00:00 grep kafka
निम्नलिखित विशिष्टताओं के साथ काफ्का विषय बनाएँ:
- सूसी-ट्रेन: विभाजन = 1, प्रतिकृति-कारक = 1
- सूसी-परीक्षण: विभाजन = 2, प्रतिकृति-कारक = 1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train. Created topic susy-test.
कॉन्फ़िगरेशन पर विवरण के लिए विषय का वर्णन करें
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-train Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
प्रतिकृति कारक 1 इंगित करता है कि डेटा को दोहराया नहीं जा रहा है। यह हमारे काफ्का सेटअप में एकल ब्रोकर की उपस्थिति के कारण है। उत्पादन प्रणालियों में, बूटस्ट्रैप सर्वरों की संख्या 100 नोड्स की सीमा में हो सकती है। यहीं से प्रतिकृति का उपयोग करते हुए दोष-सहिष्णुता चित्र में आती है।
देखें डॉक्स अधिक जानकारी के लिए।
सूसी डेटासेट
काफ्का एक इवेंट स्ट्रीमिंग प्लेटफॉर्म होने के कारण, इसमें विभिन्न स्रोतों से डेटा लिखा जा सकता है। उदाहरण के लिए:
- वेब ट्रैफिक लॉग
- खगोलीय माप
- IoT सेंसर डेटा
- उत्पाद समीक्षाएँ और बहुत कुछ।
इस ट्यूटोरियल के प्रयोजन के लिए, डाउनलोड करने देता है SUSY डाटासेट और मैन्युअल काफ्का में डेटा फ़ीड। इस वर्गीकरण समस्या का लक्ष्य एक सिग्नल प्रक्रिया के बीच अंतर करना है जो सुपरसिमेट्रिक कण उत्पन्न करता है और एक पृष्ठभूमि प्रक्रिया जो नहीं करता है।
curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
डेटासेट का अन्वेषण करें
पहला कॉलम क्लास लेबल (सिग्नल के लिए 1, बैकग्राउंड के लिए 0) है, इसके बाद 18 फीचर्स (8 लो-लेवल फीचर्स फिर 10 हाई-लेवल फीचर्स) हैं। पहली 8 विशेषताएं त्वरक में कण डिटेक्टरों द्वारा मापी गई गतिज गुण हैं। अंतिम 10 विशेषताएं पहले 8 सुविधाओं के कार्य हैं। ये दो वर्गों के बीच भेदभाव करने में मदद करने के लिए भौतिकविदों द्वारा प्राप्त उच्च-स्तरीय विशेषताएं हैं।
COLUMNS = [
# labels
'class',
# low-level features
'lepton_1_pT',
'lepton_1_eta',
'lepton_1_phi',
'lepton_2_pT',
'lepton_2_eta',
'lepton_2_phi',
'missing_energy_magnitude',
'missing_energy_phi',
# high-level derived features
'MET_rel',
'axial_MET',
'M_R',
'M_TR_2',
'R',
'MT2',
'S_R',
'M_Delta_R',
'dPhi_r_b',
'cos(theta_r1)'
]
संपूर्ण डेटासेट में 5 मिलियन पंक्तियाँ होती हैं। हालांकि, इस ट्यूटोरियल के उद्देश्य के लिए, आइए डेटासेट (100,000 पंक्तियों) के केवल एक अंश पर विचार करें ताकि डेटा को स्थानांतरित करने में कम समय और एपीआई की कार्यक्षमता को समझने में अधिक समय लगे।
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)
डेटासेट विभाजित करें
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples: 60000 Number of testing sample: 40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)
काफ्का में ट्रेन और परीक्षण डेटा स्टोर करें
काफ्का में डेटा संग्रहीत करना प्रशिक्षण और अनुमान उद्देश्यों के लिए निरंतर दूरस्थ डेटा पुनर्प्राप्ति के लिए एक वातावरण का अनुकरण करता है।
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka(topic_name, items):
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
count+=1
producer.flush()
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train Wrote 40000 messages into topic: susy-test
tfio ट्रेन डेटासेट को परिभाषित करें
IODataset
वर्ग tensorflow में काफ्का से डेटा स्ट्रीमिंग के लिए उपयोग किया जाता है। से वर्ग inherits tf.data.Dataset
है और इस तरह के सभी उपयोगी कार्यक्षमताओं है tf.data.Dataset
बॉक्स से बाहर।
def decode_kafka_item(item):
message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(item.key)
return (message, key)
BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
मॉडल बनाएं और प्रशिक्षित करें
# Set the parameters
OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(1, activation='sigmoid')
])
print(model.summary())
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense (Dense) (None, 128) 2432 dropout (Dropout) (None, 128) 0 dense_1 (Dense) (None, 256) 33024 dropout_1 (Dropout) (None, 256) 0 dense_2 (Dense) (None, 128) 32896 dropout_2 (Dropout) (None, 128) 0 dense_3 (Dense) (None, 1) 129 ================================================================= Total params: 68,481 Trainable params: 68,481 Non-trainable params: 0 _________________________________________________________________ None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10 /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?" return dispatch_target(*args, **kwargs) 938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691 Epoch 2/10 938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875 Epoch 3/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911 Epoch 4/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940 Epoch 5/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934 Epoch 6/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933 Epoch 7/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935 Epoch 8/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950 Epoch 9/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956 Epoch 10/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962 <keras.callbacks.History at 0x7fb364fd2a90>
चूंकि डेटासेट के केवल एक अंश का उपयोग किया जा रहा है, इसलिए प्रशिक्षण चरण के दौरान हमारी सटीकता ~78% तक सीमित है। हालांकि, कृपया बेहतर मॉडल प्रदर्शन के लिए काफ्का में अतिरिक्त डेटा स्टोर करने के लिए स्वतंत्र महसूस करें। इसके अलावा, चूंकि लक्ष्य केवल tfio kafka डेटासेट की कार्यक्षमता को प्रदर्शित करना था, इसलिए एक छोटे और कम जटिल तंत्रिका नेटवर्क का उपयोग किया गया था। हालांकि, कोई भी मॉडल की जटिलता को बढ़ा सकता है, सीखने की रणनीति को संशोधित कर सकता है, खोज उद्देश्यों के लिए हाइपर-पैरामीटर आदि को ट्यून कर सकता है। एक आधारभूत दृष्टिकोण के लिए, यह देखें लेख ।
परीक्षण डेटा पर अनुमान लगाएं
गलती सहिष्णुता के साथ 'वास्तव में एक बार' अर्थ विज्ञान का पालन करके परीक्षण डेटा पर अनुमान लगाने के लिए, streaming.KafkaGroupIODataset
उपयोग किया जा सकता।
tfio परीक्षण डेटासेट को परिभाषित करें
stream_timeout
नए डेटा बिंदुओं के लिए दिया अवधि के लिए पैरामीटर ब्लॉक विषय में स्ट्रीम किया है। यह नए डेटासेट बनाने की आवश्यकता को हटा देता है यदि डेटा को रुक-रुक कर विषय में स्ट्रीम किया जा रहा है।
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["susy-test"],
group_id="testcg",
servers="127.0.0.1:9092",
stream_timeout=10000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def decode_kafka_test_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Dataset.take_while(...)
यद्यपि इस वर्ग का उपयोग प्रशिक्षण उद्देश्यों के लिए किया जा सकता है, फिर भी कुछ चेतावनी हैं जिन्हें संबोधित करने की आवश्यकता है। एक बार सभी संदेशों काफ्का से पढ़ा जाता है और नवीनतम ऑफसेट का उपयोग कर के लिए प्रतिबद्ध हैं streaming.KafkaGroupIODataset
, उपभोक्ता शुरू से संदेशों को पढ़ते समय पुनः आरंभ नहीं करता है। इस प्रकार, प्रशिक्षण के दौरान, केवल एक ही युग के लिए प्रशिक्षित करना संभव है जिसमें डेटा लगातार प्रवाहित होता है। इस प्रकार की कार्यक्षमता में प्रशिक्षण चरण के दौरान सीमित उपयोग के मामले होते हैं, जिसमें एक बार मॉडल द्वारा डेटापॉइंट का उपभोग करने के बाद यह अब नहीं रह जाता है आवश्यक है और त्यागा जा सकता है।
हालाँकि, यह कार्यक्षमता तब चमकती है जब यह एक बार के शब्दार्थ के साथ मजबूत अनुमान की बात आती है।
परीक्षण डेटा पर प्रदर्शन का मूल्यांकन करें
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194 2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions 2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0 625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915 test loss, test acc: [0.4436523914337158, 0.7915250062942505] 2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
चूंकि अनुमान 'बिल्कुल-एक बार' शब्दार्थ पर आधारित है, परीक्षण सेट पर मूल्यांकन केवल एक बार चलाया जा सकता है। परीक्षण डेटा पर निष्कर्ष को फिर से चलाने के लिए, एक नए उपभोक्ता समूह का उपयोग किया जाना चाहिए।
की भरपाई अंतराल ट्रैक testcg
उपभोक्ता समूह
./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID testcg susy-test 0 21626 21626 0 rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103 rdkafka testcg susy-test 1 18374 18374 0 rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103 rdkafka
एक बार जब current-offset
मैचों log-end-offset
सभी विभाजनों के लिए, यह इंगित करता है कि उपभोक्ता (रों) सभी काफ्का विषय से संदेश प्राप्त करते समय पूरा कर लिया है।
ऑनलाइन सीखने
ऑनलाइन मशीन लर्निंग प्रतिमान प्रशिक्षण मशीन लर्निंग मॉडल के पारंपरिक/पारंपरिक तरीके से थोड़ा अलग है। पूर्व मामले में, जैसे ही नए डेटा बिंदु उपलब्ध होते हैं, मॉडल अपने मापदंडों को क्रमिक रूप से सीखना/अपडेट करना जारी रखता है और इस प्रक्रिया के अनिश्चित काल तक जारी रहने की उम्मीद है। यह बाद दृष्टिकोण जहां डाटासेट तय हो गई है के विपरीत है और मॉडल इस पर iterates n
समय की संख्या। ऑनलाइन सीखने में, मॉडल द्वारा एक बार उपभोग किया गया डेटा फिर से प्रशिक्षण के लिए उपलब्ध नहीं हो सकता है।
का उपयोग करके streaming.KafkaBatchIODataset
, यह अब इस फैशन में मॉडल प्रशिक्षित करने के लिए संभव है। आइए इस कार्यक्षमता को प्रदर्शित करने के लिए हमारे SUSY डेटासेट का उपयोग जारी रखें।
ऑनलाइन सीखने के लिए tfio प्रशिक्षण डेटासेट
streaming.KafkaBatchIODataset
के समान है streaming.KafkaGroupIODataset
यह के एपीआई में। इसके अतिरिक्त, यह उपयोग करने के लिए सिफारिश की है stream_timeout
अवधि जिसके लिए डाटासेट बाहर समय से पहले नए संदेशों के लिए अवरुद्ध कर देगा कॉन्फ़िगर करने के लिए पैरामीटर। नीचे दिए गए उदाहरण में, डाटासेट एक साथ कॉन्फ़िगर किया गया है stream_timeout
की 10000
मिलीसेकंड। इसका तात्पर्य यह है कि, विषय के सभी संदेशों के उपभोग के बाद, डेटासेट समय समाप्त होने और काफ्का क्लस्टर से डिस्कनेक्ट होने से पहले अतिरिक्त 10 सेकंड तक प्रतीक्षा करेगा। यदि समय समाप्त होने से पहले विषय में नए संदेश स्ट्रीम किए जाते हैं, तो उन नए उपभोग किए गए डेटा बिंदुओं के लिए डेटा खपत और मॉडल प्रशिक्षण फिर से शुरू हो जाता है। अनिश्चित काल के लिए ब्लॉक करने के लिए, यह करने के लिए सेट -1
।
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["susy-train"],
group_id="cgonline",
servers="127.0.0.1:9092",
stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
हर आइटम है कि online_train_ds
उत्पन्न करता है एक है tf.data.Dataset
अपने आप में। इस प्रकार, सभी मानक परिवर्तन हमेशा की तरह लागू किए जा सकते हैं।
def decode_kafka_online_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
for mini_ds in online_train_ds:
mini_ds = mini_ds.shuffle(buffer_size=32)
mini_ds = mini_ds.map(decode_kafka_online_item)
mini_ds = mini_ds.batch(32)
if len(mini_ds) > 0:
model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions 2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933 2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
वृद्धिशील रूप से प्रशिक्षित मॉडल को समय-समय पर (उपयोग-मामलों के आधार पर) सहेजा जा सकता है और इसका उपयोग ऑनलाइन या ऑफलाइन मोड में परीक्षण डेटा पर अनुमान लगाने के लिए किया जा सकता है।
सन्दर्भ:
बाल्दी, पी., पी. सैडोव्स्की, और डी. व्हाइटसन। "डीप लर्निंग के साथ उच्च-ऊर्जा भौतिकी में विदेशी कणों की खोज।" नेचर कम्युनिकेशंस 5 (2 जुलाई 2014)
SUSY डेटासेट: https://archive.ics.uci.edu/ml/datasets/SUSY#