KafkaとTensorflow-IOを使用したスト​​リーミングデータでの堅牢な機械学習

TensorFlow.orgで表示GoogleColabで実行GitHubでソースを表示 ノートブックをダウンロード

概要

このチュートリアルでは、からデータをストリーミングに焦点を当てカフカのにクラスタtf.data.Dataset 、その後に関連して使用されtf.kerasトレーニングと推論のために。

Kafkaは、主に分散型イベントストリーミングプラットフォームであり、データパイプライン全体でスケーラブルでフォールトトレラントなストリーミングデータを提供します。これは、ミッションクリティカルなデータ配信が主要な要件である多数の大企業にとって不可欠な技術コンポーネントです。

設定

必要なtensorflow-ioおよびkafkaパッケージをインストールします

pip install tensorflow-io
pip install kafka-python

パッケージをインポートする

import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

tfおよびtfioのインポートを検証します

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1
tensorflow version: 2.8.0-rc0

KafkaおよびZookeeperインスタンスをダウンロードしてセットアップします

デモの目的で、次のインスタンスがローカルにセットアップされます。

  • カフカ(ブローカー:127.0.0.1:9092)
  • Zookeeper(ノード:127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz

インスタンスを起動するためにデフォルト構成(Apache Kafkaによって提供される)を使用します。

./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running

インスタンスは、デーモンプロセスのためのgrepとして開始されるとkafkaのプロセスのリストインチ2つのJavaプロセスは、zookeeperとkafkaインスタンスに対応しています。

ps -ef | grep kafka
kbuilder 27856 20044  4 20:28 ?        00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000
kbuilder 28271     1 16 20:28 ?        00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties
kbuilder 28635     1 57 20:28 ?        00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties
kbuilder 28821 27860  0 20:28 pts/0    00:00:00 /bin/bash -c ps -ef | grep kafka
kbuilder 28823 28821  0 20:28 pts/0    00:00:00 grep kafka

次の仕様でkafkaトピックを作成します。

  • susy-train:partitions = 1、replication-factor = 1
  • susy-test:partitions = 2、replication-factor = 1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train.
Created topic susy-test.

構成の詳細については、トピックを説明してください

./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-train Partition: 0  Leader: 0 Replicas: 0   Isr: 0
Topic: susy-test  PartitionCount: 2 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-test  Partition: 0  Leader: 0 Replicas: 0   Isr: 0
    Topic: susy-test  Partition: 1  Leader: 0 Replicas: 0   Isr: 0

複製係数1は、データが複製されていないことを示します。これは、kafkaセットアップに単一のブローカーが存在するためです。本番システムでは、ブートストラップサーバーの数は数百ノードの範囲になります。ここで、レプリケーションを使用したフォールトトレランスが明らかになります。

参照してくださいドキュメント詳細については。

SUSYデータセット

Kafkaはイベントストリーミングプラットフォームであり、さまざまなソースからのデータを書き込むことができます。例えば:

  • Webトラフィックログ
  • 天文測定
  • IoTセンサーデータ
  • 製品レビューなど。

このチュートリアルの目的のために、ダウンロードすることができますSUSYのデータセットを、手動でカフカへデータを送ります。この分類問題の目的は、超対称粒子を生成する信号プロセスと生成しないバックグラウンドプロセスを区別することです。

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

データセットを探索する

最初の列はクラスラベル(信号用に1、背景用に0)で、その後に18個の機能(8個の低レベル機能、次に10個の高レベル機能)が続きます。最初の8つの機能は、加速器の粒子検出器によって測定された運動学的特性です。最後の10個の機能は、最初の8個の機能の機能です。これらは、2つのクラスを区別するのに役立つ、物理学者によって導出された高レベルの機能です。

COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

データセット全体は500万行で構成されています。ただし、このチュートリアルでは、データセットのごく一部(100,000行)のみを検討して、データの移動に費やす時間を減らし、APIの機能を理解するために費やす時間を増やしましょう。

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)

データセットを分割する

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples:  60000
Number of testing sample:  40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)

列車とテストデータをkafkaに保存します

kafkaにデータを保存すると、トレーニングと推論の目的で継続的にリモートデータを取得するための環境がシミュレートされます。

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test

tfioトレインデータセットを定義する

IODatasetクラスはtensorflowにカフカからデータをストリーミングするために利用されています。クラスから継承tf.data.Datasetので、すべての便利な機能があるtf.data.Dataset箱から出して。

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

モデルを構築してトレーニングする

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense (Dense)               (None, 128)               2432      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 129       
                                                                 
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?"
  return dispatch_target(*args, **kwargs)
938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691
Epoch 2/10
938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875
Epoch 3/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911
Epoch 4/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940
Epoch 5/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934
Epoch 6/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933
Epoch 7/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935
Epoch 8/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950
Epoch 9/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956
Epoch 10/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962
<keras.callbacks.History at 0x7fb364fd2a90>

データセットのごく一部しか使用されていないため、トレーニングフェーズ中の精度は約78%に制限されます。ただし、モデルのパフォーマンスを向上させるために、kafkaに追加のデータを自由に保存してください。また、目標はtfio kafkaデータセットの機能を示すことだけだったため、より小さく、より複雑でないニューラルネットワークが使用されました。ただし、モデルの複雑さを増したり、学習戦略を変更したり、探索目的でハイパーパラメーターを調整したりすることができます。ベースラインアプローチについては、これを参照してください記事

テストデータを推測する

フォールトトレランスと一緒に「まさに、かつて」セマンティクスに付着することにより、試験データに推測するに、 streaming.KafkaGroupIODataset利用することができます。

tfioテストデータセットを定義する

stream_timeoutトピックにストリーミングするための新しいデータポイントのために与えられた期間のためのパラメータブロック。これにより、データが断続的にトピックにストリーミングされる場合に、新しいデータセットを作成する必要がなくなります。

test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)

このクラスはトレーニング目的で使用できますが、対処する必要のある注意事項があります。すべてのメッセージがカフカから読み込まれ、最新のオフセットを使用してコミットされたらstreaming.KafkaGroupIODataset 、消費者が最初からのメッセージを読んで再起動しません。したがって、トレーニング中は、データが継続的に流入する単一のエポックでのみトレーニングできます。この種の機能は、トレーニングフェーズでのユースケースが限られており、データポイントがモデルによって消費されると、データポイントは使用されなくなります。必須であり、破棄できます。

ただし、この機能は、1回限りのセマンティクスによる堅牢な推論に関しては優れています。

テストデータのパフォーマンスを評価する

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194
2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915
test loss, test acc: [0.4436523914337158, 0.7915250062942505]
2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

推論は「exactly-once」セマンティクスに基づいているため、テストセットの評価は1回だけ実行できます。テストデータに対して推論を再度実行するには、新しいコンシューマーグループを使用する必要があります。

オフセットラグ追跡testcgコンシューマ・グループを

./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
testcg          susy-test       0          21626           21626           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka
testcg          susy-test       1          18374           18374           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka

いったんcurrent-offsetマッチはlog-end-offsetすべてのパーティションのために、それは消費者(s)はカフカのトピックからすべてのメッセージを取得完了していることを示しています。

オンライン学習

オンラインの機械学習パラダイムは、機械学習モデルをトレーニングする従来の方法とは少し異なります。前者の場合、モデルは、新しいデータポイントが利用可能になるとすぐに、そのパラメーターを段階的に学習/更新し続け、このプロセスは無期限に続くと予想されます。これは、データセットが固定され、後者のアプローチとは異なるモデルは、その上に反復処理n倍の数。オンライン学習では、モデルによって一度消費されたデータは、再びトレーニングに利用できない場合があります。

利用することでstreaming.KafkaBatchIODataset 、この方法でモデルを訓練することが可能になりました。この機能を示すために、SUSYデータセットを引き続き使用してみましょう。

オンライン学習のためのtfioトレーニングデータセット

streaming.KafkaBatchIODatasetに似てstreaming.KafkaGroupIODatasetそれのAPIインチさらに、利用することをお勧めしstream_timeoutデータセットがタイムアウトする前に新しいメッセージをブロックしますする期間を設定するためのパラメータを。以下の例では、データセットを用いて構成されてstream_timeout10000ミリ秒。これは、トピックからのすべてのメッセージが消費された後、データセットがタイムアウトしてkafkaクラスターから切断される前にさらに10秒間待機することを意味します。タイムアウトする前に新しいメッセージがトピックにストリーミングされると、それらの新しく消費されたデータポイントのデータ消費とモデルトレーニングが再開されます。無期限にブロックするには、それを設定-1

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

ことをすべての項目online_train_ds発生ですtf.data.Dataset自体インチしたがって、すべての標準変換を通常どおりに適用できます。

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933
2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

増分トレーニングされたモデルは、(ユースケースに基づいて)定期的に保存でき、オンラインモードまたはオフラインモードのいずれかでテストデータを推測するために利用できます。

参照:

  • Baldi、P.、P。Sadowski、およびD.Whiteson。 「深層学習による高エネルギー物理学でのエキゾチック粒子の検索」。ネイチャーコミュニケーションズ5(2014年7月2日)

  • SUSYデータセット: https://archive.ics.uci.edu/ml/datasets/SUSY#