TensorFlow.org-এ দেখুন | Google Colab-এ চালান | GitHub-এ উৎস দেখুন | নোটবুক ডাউনলোড করুন |
ওভারভিউ
এই টিউটোরিয়ালটি একটি থেকে তথ্য স্ট্রিমিং উপর দৃষ্টি নিবদ্ধ করে কাফকা একটি মধ্যে ক্লাস্টার tf.data.Dataset
যা পরে সাথে ব্যবহার করা হয় tf.keras
প্রশিক্ষণ ও অনুমান জন্য।
কাফকা প্রাথমিকভাবে একটি বিতরণ করা ইভেন্ট-স্ট্রিমিং প্ল্যাটফর্ম যা ডেটা পাইপলাইন জুড়ে স্কেলযোগ্য এবং ত্রুটি-সহনশীল স্ট্রিমিং ডেটা সরবরাহ করে। এটি প্রধান উদ্যোগগুলির আধিক্যের একটি অপরিহার্য প্রযুক্তিগত উপাদান যেখানে মিশন-সমালোচনামূলক ডেটা সরবরাহ একটি প্রাথমিক প্রয়োজন।
সেটআপ
প্রয়োজনীয় tensorflow-io এবং kafka প্যাকেজ ইনস্টল করুন
pip install tensorflow-io
pip install kafka-python
প্যাকেজ আমদানি করুন
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
tf এবং tfio আমদানি বৈধ করুন
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1 tensorflow version: 2.8.0-rc0
Kafka এবং Zookeeper দৃষ্টান্ত ডাউনলোড এবং সেটআপ করুন
ডেমো উদ্দেশ্যে, নিম্নলিখিত উদাহরণগুলি স্থানীয়ভাবে সেটআপ করা হয়েছে:
- কাফকা (দালাল: 127.0.0.1:9092)
- চিড়িয়াখানা (নোড: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz
দৃষ্টান্তগুলি ঘোরানোর জন্য ডিফল্ট কনফিগারেশন (অ্যাপাচি কাফকা দ্বারা সরবরাহ করা) ব্যবহার করা।
./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running
একবার দৃষ্টান্ত ডেমন প্রক্রিয়া, জন্য grep হিসাবে শুরু হয় kafka
প্রসেস তালিকায়। দুটি জাভা প্রক্রিয়া চিড়িয়াখানা এবং কাফকা দৃষ্টান্তের সাথে মিলে যায়।
ps -ef | grep kafka
kbuilder 27856 20044 4 20:28 ? 00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000 kbuilder 28271 1 16 20:28 ? 00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties kbuilder 28635 1 57 20:28 ? 00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties kbuilder 28821 27860 0 20:28 pts/0 00:00:00 /bin/bash -c ps -ef | grep kafka kbuilder 28823 28821 0 20:28 pts/0 00:00:00 grep kafka
নিম্নলিখিত চশমাগুলির সাথে কাফকা বিষয়গুলি তৈরি করুন:
- susy-train: partitions=1, replication-factor=1
- susy-পরীক্ষা: পার্টিশন=2, প্রতিলিপি-ফ্যাক্টর=1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train. Created topic susy-test.
কনফিগারেশনের বিশদ বিবরণের জন্য বিষয়টি বর্ণনা করুন
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-train Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
প্রতিলিপি ফ্যাক্টর 1 নির্দেশ করে যে ডেটা প্রতিলিপি করা হচ্ছে না। এটি আমাদের কাফকা সেটআপে একক ব্রোকারের উপস্থিতির কারণে। উৎপাদন ব্যবস্থায়, বুটস্ট্র্যাপ সার্ভারের সংখ্যা 100 এর নোডের মধ্যে হতে পারে। সেখানেই প্রতিলিপি ব্যবহার করে দোষ-সহনশীলতা ছবিতে আসে।
পড়ুন দয়া করে দস্তাবেজ আরো বিস্তারিত জানার জন্য।
SUSY ডেটাসেট
কাফকা একটি ইভেন্ট স্ট্রিমিং প্ল্যাটফর্ম, এটিতে বিভিন্ন উত্স থেকে ডেটা লিখতে সক্ষম করে। এই ক্ষেত্রে:
- ওয়েব ট্রাফিক লগ
- জ্যোতির্বিজ্ঞানের পরিমাপ
- আইওটি সেন্সর ডেটা
- পণ্য পর্যালোচনা এবং আরো অনেক.
এই টিউটোরিয়ালের উদ্দেশ্যে, ডাউনলোড করতে দেয় SUSY ডেটা সেটটি এবং ম্যানুয়ালি কাফকা ডেটা ভোজন। এই শ্রেণিবিন্যাস সমস্যার লক্ষ্য হল একটি সংকেত প্রক্রিয়ার মধ্যে পার্থক্য করা যা সুপারসিমেট্রিক কণা তৈরি করে এবং একটি পটভূমি প্রক্রিয়া যা করে না।
curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
ডেটাসেট অন্বেষণ করুন
প্রথম কলামটি হল ক্লাস লেবেল (সংকেতের জন্য 1, পটভূমির জন্য 0), তারপরে 18টি বৈশিষ্ট্য (8 নিম্ন-স্তরের বৈশিষ্ট্য তারপর 10টি উচ্চ-স্তরের বৈশিষ্ট্য)। প্রথম 8টি বৈশিষ্ট্য হল গতিগত বৈশিষ্ট্য যা অ্যাক্সিলারেটরের কণা ডিটেক্টর দ্বারা পরিমাপ করা হয়। শেষ 10টি বৈশিষ্ট্য হল প্রথম 8টি বৈশিষ্ট্যের ফাংশন। এই দুটি শ্রেণীর মধ্যে বৈষম্য করতে সাহায্য করার জন্য পদার্থবিদদের দ্বারা প্রাপ্ত উচ্চ-স্তরের বৈশিষ্ট্য।
COLUMNS = [
# labels
'class',
# low-level features
'lepton_1_pT',
'lepton_1_eta',
'lepton_1_phi',
'lepton_2_pT',
'lepton_2_eta',
'lepton_2_phi',
'missing_energy_magnitude',
'missing_energy_phi',
# high-level derived features
'MET_rel',
'axial_MET',
'M_R',
'M_TR_2',
'R',
'MT2',
'S_R',
'M_Delta_R',
'dPhi_r_b',
'cos(theta_r1)'
]
সম্পূর্ণ ডেটাসেট 5 মিলিয়ন সারি নিয়ে গঠিত। যাইহোক, এই টিউটোরিয়ালের উদ্দেশ্যে, আসুন শুধুমাত্র ডেটাসেটের একটি ভগ্নাংশ (100,000 সারি) বিবেচনা করি যাতে ডেটা সরাতে কম সময় ব্যয় হয় এবং api-এর কার্যকারিতা বোঝার জন্য আরও বেশি সময় ব্যয় হয়।
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)
ডেটাসেট বিভক্ত করুন
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples: 60000 Number of testing sample: 40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)
কাফকাতে ট্রেন এবং পরীক্ষার ডেটা সংরক্ষণ করুন
কাফকাতে ডেটা সংরক্ষণ করা প্রশিক্ষণ এবং অনুমানের উদ্দেশ্যে ক্রমাগত দূরবর্তী ডেটা পুনরুদ্ধারের জন্য একটি পরিবেশকে অনুকরণ করে।
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka(topic_name, items):
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
count+=1
producer.flush()
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train Wrote 40000 messages into topic: susy-test
tfio ট্রেন ডেটাসেট সংজ্ঞায়িত করুন
IODataset
বর্গ tensorflow মধ্যে কাফকা থেকে তথ্য স্ট্রিমিং করার জন্য ব্যবহৃত হয়। থেকে বর্গ উত্তরাধিকারী tf.data.Dataset
এবং এইভাবে সব দরকারী বৈশিষ্ট্য রয়েছে tf.data.Dataset
বাক্সের বাইরে।
def decode_kafka_item(item):
message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(item.key)
return (message, key)
BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
মডেল তৈরি এবং প্রশিক্ষণ
# Set the parameters
OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(1, activation='sigmoid')
])
print(model.summary())
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense (Dense) (None, 128) 2432 dropout (Dropout) (None, 128) 0 dense_1 (Dense) (None, 256) 33024 dropout_1 (Dropout) (None, 256) 0 dense_2 (Dense) (None, 128) 32896 dropout_2 (Dropout) (None, 128) 0 dense_3 (Dense) (None, 1) 129 ================================================================= Total params: 68,481 Trainable params: 68,481 Non-trainable params: 0 _________________________________________________________________ None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10 /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?" return dispatch_target(*args, **kwargs) 938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691 Epoch 2/10 938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875 Epoch 3/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911 Epoch 4/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940 Epoch 5/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934 Epoch 6/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933 Epoch 7/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935 Epoch 8/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950 Epoch 9/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956 Epoch 10/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962 <keras.callbacks.History at 0x7fb364fd2a90>
যেহেতু ডেটাসেটের শুধুমাত্র একটি ভগ্নাংশ ব্যবহার করা হচ্ছে, তাই প্রশিক্ষণ পর্বের সময় আমাদের নির্ভুলতা ~78% পর্যন্ত সীমাবদ্ধ। যাইহোক, একটি ভাল মডেল পারফরম্যান্সের জন্য অনুগ্রহ করে কাফকাতে অতিরিক্ত ডেটা সঞ্চয় করুন। এছাড়াও, যেহেতু লক্ষ্য টিফিও কাফকা ডেটাসেটের কার্যকারিতা প্রদর্শন করা ছিল, তাই একটি ছোট এবং কম-জটিল নিউরাল নেটওয়ার্ক ব্যবহার করা হয়েছিল। যাইহোক, কেউ মডেলের জটিলতা বাড়াতে পারে, শেখার কৌশল পরিবর্তন করতে পারে, অন্বেষণের উদ্দেশ্যে হাইপার-প্যারামিটার ইত্যাদি টিউন করতে পারে। একটি বেসলাইন পদ্ধতির জন্য, এই পড়ুন দয়া নিবন্ধ ।
পরীক্ষার তথ্য অনুমান করুন
দোষ সহনশীলতা সহ 'ঠিক-একবার' শব্দার্থবিদ্যা লগ্ন দ্বারা পরীক্ষা ডেটার উপর অনুমান করার জন্য, streaming.KafkaGroupIODataset
কাজে লাগানো যায়।
tfio পরীক্ষার ডেটাসেট সংজ্ঞায়িত করুন
stream_timeout
নতুন ডাটা পয়েন্টের জন্য দেওয়া চলাকালীন প্যারামিটার ব্লক বিষয় মধ্যে প্রবাহিত হবে। এটি নতুন ডেটাসেট তৈরির প্রয়োজনীয়তাকে সরিয়ে দেয় যদি তথ্যটি একটি অন্তর্বর্তী ফ্যাশনে বিষয়টিতে প্রবাহিত হয়।
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["susy-test"],
group_id="testcg",
servers="127.0.0.1:9092",
stream_timeout=10000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def decode_kafka_test_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Dataset.take_while(...)
যদিও এই শ্রেণীটি প্রশিক্ষণের উদ্দেশ্যে ব্যবহার করা যেতে পারে, তবে কিছু সতর্কতা রয়েছে যা সমাধান করা প্রয়োজন। একবার সব বার্তা কাফকা থেকে পড়া হয় এবং সর্বশেষ অফসেট ব্যবহার প্রতিশ্রুতিবদ্ধ হয় streaming.KafkaGroupIODataset
, ভোক্তা শুরু থেকে বার্তা পড়া পুনরায় আরম্ভ করা হয় না। এইভাবে, প্রশিক্ষণের সময়, অবিচ্ছিন্নভাবে ডেটা প্রবাহিত করে শুধুমাত্র একটি একক যুগের জন্য প্রশিক্ষণ দেওয়া সম্ভব। প্রশিক্ষণ পর্বে এই ধরনের কার্যকারিতার সীমিত ব্যবহারের ক্ষেত্রে থাকে যেখানে, একবার একটি ডেটাপয়েন্ট মডেল দ্বারা গ্রাস করা হলে তা আর থাকে না। প্রয়োজন এবং বাতিল করা যেতে পারে।
যাইহোক, এই কার্যকারিতা উজ্জ্বল হয় যখন এটি ঠিক-একবার শব্দার্থবিদ্যার সাথে শক্তিশালী অনুমানের ক্ষেত্রে আসে।
পরীক্ষার ডেটাতে কর্মক্ষমতা মূল্যায়ন করুন
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194 2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions 2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0 625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915 test loss, test acc: [0.4436523914337158, 0.7915250062942505] 2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
যেহেতু অনুমানটি 'ঠিক-একবার' শব্দার্থবিদ্যার উপর ভিত্তি করে, তাই পরীক্ষার সেটে মূল্যায়ন শুধুমাত্র একবার চালানো যেতে পারে। পরীক্ষার ডেটাতে আবার অনুমান চালানোর জন্য, একটি নতুন ভোক্তা গোষ্ঠী ব্যবহার করা উচিত।
অফসেট ল্যাগ ট্র্যাক testcg
ভোক্তা গ্রুপ
./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID testcg susy-test 0 21626 21626 0 rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103 rdkafka testcg susy-test 1 18374 18374 0 rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103 rdkafka
একবার current-offset
ম্যাচ log-end-offset
সমস্ত পার্টিশন জন্য, এটি নির্দেশ করে যে ভোক্তা (গুলি) সব কাফকা বিষয় থেকে বার্তা আনয়ন সম্পন্ন করেছেন।
অনলাইন শিক্ষা
অনলাইন মেশিন লার্নিং দৃষ্টান্তটি মেশিন লার্নিং মডেল প্রশিক্ষণের প্রথাগত/প্রচলিত পদ্ধতি থেকে একটু ভিন্ন। প্রাক্তন ক্ষেত্রে, নতুন ডেটা পয়েন্ট উপলব্ধ হওয়ার সাথে সাথে মডেলটি ক্রমবর্ধমানভাবে এর পরামিতিগুলি শিখতে/আপডেট করতে থাকে এবং এই প্রক্রিয়াটি অনির্দিষ্টকালের জন্য অব্যাহত থাকবে বলে আশা করা হচ্ছে। এই আধুনিক পন্থা যেখানে ডেটা সেটটি সংশোধন করা হয়েছে অসদৃশ হয় এবং মডেল এটি উপর iterates n
যতবার। অনলাইন শেখার ক্ষেত্রে, মডেল দ্বারা একবার ব্যবহার করা ডেটা আবার প্রশিক্ষণের জন্য উপলব্ধ নাও হতে পারে।
ব্যবহার দ্বারা streaming.KafkaBatchIODataset
, এটি এখন এই ফ্যাশন মডেলগুলির প্রশিক্ষণ করা সম্ভব। আসুন এই কার্যকারিতা প্রদর্শনের জন্য আমাদের SUSY ডেটাসেট ব্যবহার করা চালিয়ে যাই।
অনলাইন শিক্ষার জন্য tfio প্রশিক্ষণ ডেটাসেট
streaming.KafkaBatchIODataset
অনুরূপ streaming.KafkaGroupIODataset
এটা API এ। উপরন্তু, এটা ব্যবহার করতে সুপারিশ করা হয় stream_timeout
সময়কাল, যার জন্য ডেটা সেটটি বাইরে সময়জ্ঞান সামনে নতুন বার্তার জন্য অবরুদ্ধ করবে কনফিগার করতে প্যারামিটার। নিচে ইনস্ট্যান্সের মধ্যে, ডেটা সেটটি একটি মাধ্যমে কনফিগার করা হয়েছে stream_timeout
এর 10000
মিলিসেকেন্ড। এটি বোঝায় যে, বিষয়ের সমস্ত বার্তা গ্রহণ করার পরে, ডেটাসেটটি কাফকা ক্লাস্টার থেকে টাইম আউট এবং সংযোগ বিচ্ছিন্ন করার আগে অতিরিক্ত 10 সেকেন্ডের জন্য অপেক্ষা করবে। সময় শেষ হওয়ার আগে যদি নতুন বার্তাগুলি বিষয়ের মধ্যে স্ট্রীম করা হয়, তবে সেই নতুনভাবে ব্যবহৃত ডেটা পয়েন্টগুলির জন্য ডেটা খরচ এবং মডেল প্রশিক্ষণ পুনরায় শুরু হয়। অনির্দিষ্টকালের জন্য অবরুদ্ধ করতে চাইলে, এটি সেট -1
।
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["susy-train"],
group_id="cgonline",
servers="127.0.0.1:9092",
stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
প্রতিটি আইটেম যে online_train_ds
উত্পন্ন একটি হল tf.data.Dataset
নিজেই। এইভাবে, সমস্ত আদর্শ রূপান্তর স্বাভাবিক হিসাবে প্রয়োগ করা যেতে পারে।
def decode_kafka_online_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
for mini_ds in online_train_ds:
mini_ds = mini_ds.shuffle(buffer_size=32)
mini_ds = mini_ds.map(decode_kafka_online_item)
mini_ds = mini_ds.batch(32)
if len(mini_ds) > 0:
model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions 2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933 2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
ক্রমবর্ধমানভাবে প্রশিক্ষিত মডেলটি পর্যায়ক্রমিক ফ্যাশনে সংরক্ষণ করা যেতে পারে (ব্যবহারের ক্ষেত্রের উপর ভিত্তি করে) এবং অনলাইন বা অফলাইন মোডে পরীক্ষার ডেটা অনুমান করতে ব্যবহার করা যেতে পারে।
তথ্যসূত্র:
বাল্ডি, পি., পি. সাডোস্কি এবং ডি. হোয়াইটসন। "গভীর শিক্ষার সাথে উচ্চ-শক্তি পদার্থবিদ্যায় বহিরাগত কণাগুলির জন্য অনুসন্ধান করা।" প্রকৃতি যোগাযোগ 5 (জুলাই 2, 2014)
SUSY ডেটা সেটটি: https://archive.ics.uci.edu/ml/datasets/SUSY#