Xem trên TensorFlow.org | Chạy trong Google Colab | Xem nguồn trên GitHub | Tải xuống sổ ghi chép |
Tổng quat
Hướng dẫn này tập trung vào việc truyền dữ liệu từ một Kafka cụm thành một tf.data.Dataset
mà sau đó được sử dụng kết hợp với tf.keras
cho đào tạo và suy luận.
Kafka chủ yếu là một nền tảng phát trực tuyến sự kiện phân tán, cung cấp dữ liệu phát trực tuyến có thể mở rộng và chịu được lỗi trên các đường ống dữ liệu. Đây là một thành phần kỹ thuật thiết yếu của rất nhiều doanh nghiệp lớn nơi mà việc phân phối dữ liệu quan trọng là yêu cầu chính.
Thành lập
Cài đặt các gói tensorflow-io và kafka cần thiết
pip install tensorflow-io
pip install kafka-python
Nhập gói
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
Xác thực nhập tf và tfio
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1 tensorflow version: 2.8.0-rc0
Tải xuống và thiết lập các phiên bản Kafka và Zookeeper
Đối với mục đích demo, các phiên bản sau được thiết lập cục bộ:
- Kafka (Môi giới: 127.0.0.1:9092)
- Zookeeper (Nút: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz
Sử dụng các cấu hình mặc định (do Apache Kafka cung cấp) để quay các phiên bản.
./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running
Khi các trường hợp được bắt đầu như quy trình daemon, grep cho kafka
trong danh sách các tiến trình. Hai quy trình java tương ứng với các phiên bản Zookeeper và kafka.
ps -ef | grep kafka
kbuilder 27856 20044 4 20:28 ? 00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000 kbuilder 28271 1 16 20:28 ? 00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties kbuilder 28635 1 57 20:28 ? 00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties kbuilder 28821 27860 0 20:28 pts/0 00:00:00 /bin/bash -c ps -ef | grep kafka kbuilder 28823 28821 0 20:28 pts/0 00:00:00 grep kafka
Tạo các chủ đề kafka với các thông số kỹ thuật sau:
- susy-train: partition = 1, replication-factor = 1
- susy-test: partition = 2, replication-factor = 1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train. Created topic susy-test.
Mô tả chủ đề để biết chi tiết về cấu hình
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-train Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Yếu tố sao chép 1 chỉ ra rằng dữ liệu không được sao chép. Điều này là do sự hiện diện của một nhà môi giới duy nhất trong thiết lập kafka của chúng tôi. Trong hệ thống sản xuất, số lượng máy chủ bootstrap có thể nằm trong khoảng 100 nút. Đó là nơi mà khả năng chịu lỗi bằng cách sử dụng sao chép được đưa vào hình ảnh.
Vui lòng tham khảo tài liệu để biết thêm chi tiết.
Tập dữ liệu SUSY
Kafka là một nền tảng phát trực tuyến sự kiện, cho phép ghi dữ liệu từ nhiều nguồn khác nhau vào đó. Ví dụ:
- Nhật ký lưu lượng truy cập web
- Các phép đo thiên văn
- Dữ liệu cảm biến IoT
- Đánh giá sản phẩm và nhiều hơn nữa.
Với mục đích của hướng dẫn này, cho phép tải về SUSY bộ dữ liệu và thức ăn dữ liệu vào Kafka bằng tay. Mục tiêu của bài toán phân loại này là để phân biệt giữa quá trình tín hiệu tạo ra các hạt siêu đối xứng và quá trình nền không tạo ra.
curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
Khám phá tập dữ liệu
Cột đầu tiên là nhãn lớp (1 cho tín hiệu, 0 cho nền), tiếp theo là 18 đối tượng (8 đối tượng cấp thấp sau đó là 10 đối tượng cấp cao). 8 đặc điểm đầu tiên là các đặc tính động học được đo bởi các máy dò hạt trong máy gia tốc. 10 tính năng cuối cùng là chức năng của 8 tính năng đầu tiên. Đây là những đặc điểm cấp cao do các nhà vật lý suy ra để giúp phân biệt giữa hai lớp.
COLUMNS = [
# labels
'class',
# low-level features
'lepton_1_pT',
'lepton_1_eta',
'lepton_1_phi',
'lepton_2_pT',
'lepton_2_eta',
'lepton_2_phi',
'missing_energy_magnitude',
'missing_energy_phi',
# high-level derived features
'MET_rel',
'axial_MET',
'M_R',
'M_TR_2',
'R',
'MT2',
'S_R',
'M_Delta_R',
'dPhi_r_b',
'cos(theta_r1)'
]
Toàn bộ tập dữ liệu bao gồm 5 triệu hàng. Tuy nhiên, với mục đích của hướng dẫn này, chúng ta hãy chỉ xem xét một phần nhỏ của tập dữ liệu (100.000 hàng) để dành ít thời gian hơn cho việc di chuyển dữ liệu và nhiều thời gian hơn để hiểu chức năng của api.
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)
Tách tập dữ liệu
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples: 60000 Number of testing sample: 40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)
Lưu trữ dữ liệu về tàu và kiểm tra trong kafka
Lưu trữ dữ liệu trong kafka mô phỏng một môi trường để truy xuất dữ liệu từ xa liên tục cho các mục đích đào tạo và suy luận.
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka(topic_name, items):
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
count+=1
producer.flush()
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train Wrote 40000 messages into topic: susy-test
Xác định tập dữ liệu tfio train
Các IODataset
lớp được sử dụng cho việc truyền dữ liệu từ Kafka vào tensorflow. Các kế thừa lớp từ tf.data.Dataset
và do đó có tất cả các chức năng hữu ích của tf.data.Dataset
ra khỏi hộp.
def decode_kafka_item(item):
message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(item.key)
return (message, key)
BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Xây dựng và đào tạo mô hình
# Set the parameters
OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(1, activation='sigmoid')
])
print(model.summary())
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense (Dense) (None, 128) 2432 dropout (Dropout) (None, 128) 0 dense_1 (Dense) (None, 256) 33024 dropout_1 (Dropout) (None, 256) 0 dense_2 (Dense) (None, 128) 32896 dropout_2 (Dropout) (None, 128) 0 dense_3 (Dense) (None, 1) 129 ================================================================= Total params: 68,481 Trainable params: 68,481 Non-trainable params: 0 _________________________________________________________________ None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10 /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?" return dispatch_target(*args, **kwargs) 938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691 Epoch 2/10 938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875 Epoch 3/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911 Epoch 4/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940 Epoch 5/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934 Epoch 6/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933 Epoch 7/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935 Epoch 8/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950 Epoch 9/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956 Epoch 10/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962 <keras.callbacks.History at 0x7fb364fd2a90>
Vì chỉ một phần nhỏ của tập dữ liệu đang được sử dụng, độ chính xác của chúng tôi bị giới hạn ở ~ 78% trong giai đoạn đào tạo. Tuy nhiên, vui lòng lưu trữ thêm dữ liệu trong kafka để có hiệu suất mô hình tốt hơn. Ngoài ra, vì mục tiêu chỉ là chứng minh chức năng của bộ dữ liệu tfio kafka, nên một mạng nơ-ron nhỏ hơn và ít phức tạp hơn đã được sử dụng. Tuy nhiên, người ta có thể tăng độ phức tạp của mô hình, sửa đổi chiến lược học tập, điều chỉnh các siêu tham số, v.v. cho các mục đích thăm dò. Đối với một cách tiếp cận cơ bản, vui lòng tham khảo này bài viết .
Suy luận trên dữ liệu thử nghiệm
Để suy ra trên các dữ liệu thử nghiệm bằng cách tôn trọng những 'chính xác-once' ngữ nghĩa cùng với lỗi khoan dung, các streaming.KafkaGroupIODataset
thể được sử dụng.
Xác định tập dữ liệu kiểm tra tfio
Các stream_timeout
khối tham số trong suốt thời gian đưa ra cho các điểm dữ liệu mới được xem trực tiếp vào chủ đề này. Điều này loại bỏ nhu cầu tạo tập dữ liệu mới nếu dữ liệu đang được truyền trực tuyến vào chủ đề một cách không liên tục.
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["susy-test"],
group_id="testcg",
servers="127.0.0.1:9092",
stream_timeout=10000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def decode_kafka_test_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Dataset.take_while(...)
Mặc dù lớp này có thể được sử dụng cho các mục đích đào tạo, nhưng có những điều cần lưu ý cần được giải quyết. Một khi tất cả các tin nhắn được đọc từ Kafka và offsets mới nhất được cam kết sử dụng streaming.KafkaGroupIODataset
, người tiêu dùng không khởi động lại đọc những thông điệp từ đầu. Do đó, trong khi đào tạo, chỉ có thể đào tạo cho một kỷ nguyên duy nhất với dữ liệu liên tục chảy vào. Loại chức năng này có các trường hợp sử dụng hạn chế trong giai đoạn đào tạo, trong đó, khi một điểm dữ liệu đã được mô hình sử dụng thì nó không còn nữa bắt buộc và có thể bị loại bỏ.
Tuy nhiên, chức năng này tỏa sáng khi nói đến khả năng suy luận mạnh mẽ với ngữ nghĩa chính xác một lần.
đánh giá hiệu suất trên dữ liệu thử nghiệm
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194 2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions 2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0 625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915 test loss, test acc: [0.4436523914337158, 0.7915250062942505] 2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
Vì suy luận dựa trên ngữ nghĩa 'chính xác một lần', đánh giá trên tập kiểm tra chỉ có thể được chạy một lần. Để chạy lại suy luận trên dữ liệu thử nghiệm, nên sử dụng một nhóm người tiêu dùng mới.
Theo dõi lag bù đắp của testcg
nhóm người tiêu dùng
./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID testcg susy-test 0 21626 21626 0 rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103 rdkafka testcg susy-test 1 18374 18374 0 rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103 rdkafka
Khi current-offset
các trận đấu các log-end-offset
cho tất cả các phân vùng, nó chỉ ra rằng người tiêu dùng (s) đã hoàn thành lấy tất cả các tin nhắn từ Kafka chủ đề.
Học trực tuyến
Mô hình học máy trực tuyến hơi khác so với cách đào tạo mô hình học máy truyền thống / thông thường. Trong trường hợp trước đây, mô hình tiếp tục tìm hiểu / cập nhật từng bước các tham số của nó ngay khi có các điểm dữ liệu mới và quá trình này dự kiến sẽ tiếp tục vô thời hạn. Đây là không giống như các phương pháp tiếp cận thứ hai nơi tập dữ liệu là cố định và mô hình lặp trên nó n
số lần. Trong học tập trực tuyến, dữ liệu đã được mô hình sử dụng một lần có thể không có sẵn để đào tạo lại.
Bằng cách sử dụng streaming.KafkaBatchIODataset
, bây giờ có thể để đào tạo các mô hình theo cách này. Hãy tiếp tục sử dụng tập dữ liệu SUSY của chúng tôi để chứng minh chức năng này.
Tập dữ liệu đào tạo tfio cho việc học trực tuyến
Các streaming.KafkaBatchIODataset
cũng tương tự như streaming.KafkaGroupIODataset
trong đó của API. Thêm vào đó, nó được khuyến khích sử dụng các stream_timeout
tham số cấu hình trong suốt thời gian mà bộ dữ liệu sẽ chặn thư mới trước khi thời gian ra ngoài. Trong ví dụ dưới đây, các dữ liệu được cấu hình với một stream_timeout
của 10000
mili giây. Điều này ngụ ý rằng, sau khi tất cả các thông báo từ chủ đề đã được tiêu thụ, tập dữ liệu sẽ đợi thêm 10 giây trước khi hết thời gian và ngắt kết nối khỏi cụm kafka. Nếu các thông báo mới được truyền vào chủ đề trước khi hết thời gian, quá trình tiêu thụ dữ liệu và đào tạo mô hình sẽ tiếp tục cho các điểm dữ liệu mới được sử dụng đó. Để chặn vô thời hạn, đặt nó là -1
.
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["susy-train"],
group_id="cgonline",
servers="127.0.0.1:9092",
stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
Mỗi mặt hàng mà người online_train_ds
tạo là một tf.data.Dataset
của riêng mình. Vì vậy, tất cả các phép biến đổi tiêu chuẩn có thể được áp dụng như bình thường.
def decode_kafka_online_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
for mini_ds in online_train_ds:
mini_ds = mini_ds.shuffle(buffer_size=32)
mini_ds = mini_ds.map(decode_kafka_online_item)
mini_ds = mini_ds.batch(32)
if len(mini_ds) > 0:
model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions 2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947 Epoch 1/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906 Epoch 2/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922 Epoch 3/3 313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933 2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
Mô hình được đào tạo tăng dần có thể được lưu theo kiểu định kỳ (dựa trên các trường hợp sử dụng) và có thể được sử dụng để suy luận trên dữ liệu thử nghiệm ở chế độ trực tuyến hoặc ngoại tuyến.
Người giới thiệu:
Baldi, P., P. Sadowski và D. Whiteson. “Tìm kiếm các hạt kỳ lạ trong Vật lý năng lượng cao với Học sâu”. Nature Communications 5 (ngày 2 tháng 7 năm 2014)
SUSY Dataset: https://archive.ics.uci.edu/ml/datasets/SUSY#