
TensorFlow Extended(TFX)の特徴エンジニアリングコンポーネント

この例コラボノートがどのの幾分より高度な例を提供TensorFlowが変換tf.Transform )モデルを訓練し、生産に推論をサービングの両方のための正確に同じコードを使用して、前処理データを用いることができます。

TensorFlow Transformは、トレーニングデータセットのフルパスを必要とする機能の作成など、TensorFlowの入力データを前処理するためのライブラリです。たとえば、TensorFlow Transformを使用すると、次のことができます。

  • 平均と標準偏差を使用して入力値を正規化します
  • すべての入力値に対して語彙を生成することにより、文字列を整数に変換します
  • 観測されたデータ分布に基づいて、フロートをバケットに割り当てることにより、フロートを整数に変換します

TensorFlowには、単一の例または例のバッチに対する操作のサポートが組み込まれています。 tf.Transform全体のトレーニングデータセットを完全にパスをサポートするために、これらの機能を拡張します。






  import colab
  !pip install --upgrade pip


pip install tensorflow-transform


まず、Python 3を使用していることを確認してから、必要なものをインストールしてインポートします。

import sys

# Confirm that we're using Python 3
assert sys.version_info.major == 3, 'Oops, not running Python 3. Use Runtime > Change runtime type'
import math
import os
import pprint

import tensorflow as tf
print('TF: {}'.format(tf.__version__))

import apache_beam as beam
print('Beam: {}'.format(beam.__version__))

import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
print('Transform: {}'.format(tft.__version__))

from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples

!wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data
!wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.test

train = './adult.data'
test = './adult.test'
TF: 2.4.4
Beam: 2.34.0
Transform: 0.29.0
    'age', 'workclass', 'fnlwgt', 'education', 'education-num',
    'marital-status', 'occupation', 'relationship', 'race', 'sex',
    'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label'
LABEL_KEY = 'label'



    [(name, tf.io.FixedLenFeature([], tf.string))
     for name in CATEGORICAL_FEATURE_KEYS] +
    [(name, tf.io.FixedLenFeature([], tf.float32))
     for name in NUMERIC_FEATURE_KEYS] +
    [(name, tf.io.VarLenFeature(tf.float32))
    [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))]

SCHEMA = tft.tf_metadata.dataset_metadata.DatasetMetadata(



testing = os.getenv("WEB_TEST_BROWSER", False)
if testing:

# Names of temp files
EXPORTED_MODEL_DIR = 'exported_model_dir'


作成tf.Transform preprocessing_fnを

前処理機能はtf.Transformの最も重要な概念です。前処理関数は、データセットの変換が実際に行われる場所です。それは受け入れ、テンソルは意味テンソルの辞書、返しTensorまたはSparseTensor 。通常、前処理関数の中心を形成するAPI呼び出しには2つの主要なグループがあります。

  1. TensorFlowオプス:テンソルを受け入れ、返す任意の関数で、通常TensorFlowオプスを意味しています。これらは、生データを一度に1つの特徴ベクトルで変換されたデータに変換するグラフにTensorFlow操作を追加します。これらは、トレーニングとサービングの両方で、すべての例で実行されます。
  2. アナライザを変換TensorFlow:tf.Transformが提供する分析器のいずれか。アナライザーもテンソルを受け入れて返しますが、TensorFlow opsとは異なり、トレーニング中に1回だけ実行され、通常はトレーニングデータセット全体を完全に通過します。彼らは、作成テンソル定数あなたのグラフに追加されています、。例えば、 tft.minトレーニングデータセットを超えるテンソルの最小値を計算します。 tf.Transformは固定されたアナライザーのセットを提供しますが、これは将来のバージョンで拡張される予定です。
def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
    outputs[key] = tft.scale_to_0_1(inputs[key])

    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
    sparse = tf.sparse.SparseTensor(inputs[key].indices, inputs[key].values,
                                    [inputs[key].dense_shape[0], 1])
    dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
    outputs[key] = tft.compute_and_apply_vocabulary(

  # For the label column we provide the mapping from string to index.
  table_keys = ['>50K', '<=50K']
  with tf.init_scope():
    initializer = tf.lookup.KeyValueTensorInitializer(
        values=tf.cast(tf.range(len(table_keys)), tf.int64),
    table = tf.lookup.StaticHashTable(initializer, default_value=-1)
  # Remove trailing periods for test data when the data is read with tf.data.
  label_str = tf.strings.regex_replace(inputs[LABEL_KEY], r'\.', '')
  label_str = tf.strings.strip(label_str)
  data_labels = table.lookup(label_str)
  transformed_label = tf.one_hot(
      indices=data_labels, depth=len(table_keys), on_value=1.0, off_value=0.0)
  outputs[LABEL_KEY] = tf.reshape(transformed_label, [-1, len(table_keys)])

  return outputs



  1. CSVリーダーを使用してデータを読み込みます
  2. 各カテゴリの語彙を作成することにより、数値データをスケーリングし、カテゴリデータを文字列からint64値インデックスに変換する前処理パイプラインを使用して変換します
  3. その結果を書き出すTFRecordExample我々は、後でモデルのトレーニングに使用するプロト、
def transform_data(train_data_file, test_data_file, working_dir):
  """Transform the data and write out as a TFRecord of Example protos.

  Read in the data using the CSV reader, and transform it using a
  preprocessing pipeline that scales numeric data and converts categorical data
  from strings to int64 values indices, by creating a vocabulary for each

    train_data_file: File containing training data
    test_data_file: File containing test data
    working_dir: Directory to write transformed data and metadata to

  # The "with" block will create a pipeline, and run that pipeline at the exit
  # of the block.
  with beam.Pipeline() as pipeline:
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
      # Create a TFXIO to read the census data with the schema. To do this we
      # need to list all columns in order since the schema doesn't specify the
      # order of columns in the csv.
      # We first read CSV files and use BeamRecordCsvTFXIO whose .BeamSource()
      # accepts a PCollection[bytes] because we need to patch the records first
      # (see "FixCommasTrainData" below). Otherwise, tfxio.CsvTFXIO can be used
      # to both read the CSV files and parse them to TFT inputs:
      # csv_tfxio = tfxio.CsvTFXIO(...)
      # raw_data = (pipeline | 'ToRecordBatches' >> csv_tfxio.BeamSource())
      csv_tfxio = tfxio.BeamRecordCsvTFXIO(

      # Read in raw data and convert using CSV TFXIO.  Note that we apply
      # some Beam transformations here, which will not be encoded in the TF
      # graph since we don't do the from within tf.Transform's methods
      # (AnalyzeDataset, TransformDataset etc.).  These transformations are just
      # to get data into a format that the CSV TFXIO can read, in particular
      # removing spaces after commas.
      raw_data = (
          | 'ReadTrainData' >> beam.io.ReadFromText(
              train_data_file, coder=beam.coders.BytesCoder())
          | 'FixCommasTrainData' >> beam.Map(
              lambda line: line.replace(b', ', b','))
          | 'DecodeTrainData' >> csv_tfxio.BeamSource())

      # Combine data and schema into a dataset tuple.  Note that we already used
      # the schema to read the CSV data, but we also need it to interpret
      # raw_data.
      raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())

      # The TFXIO output format is chosen for improved performance.
      transformed_dataset, transform_fn = (
          raw_dataset | tft_beam.AnalyzeAndTransformDataset(
              preprocessing_fn, output_record_batches=True))

      # Transformed metadata is not necessary for encoding.
      transformed_data, _ = transformed_dataset

      # Extract transformed RecordBatches, encode and write them to the given
      # directory.
      _ = (
          | 'EncodeTrainData' >>
          beam.FlatMapTuple(lambda batch, _: RecordBatchToExamples(batch))
          | 'WriteTrainData' >> beam.io.WriteToTFRecord(
              os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE)))

      # Now apply transform function to test data.  In this case we remove the
      # trailing period at the end of each line, and also ignore the header line
      # that is present in the test data file.
      raw_test_data = (
          | 'ReadTestData' >> beam.io.ReadFromText(
              test_data_file, skip_header_lines=1,
          | 'FixCommasTestData' >> beam.Map(
              lambda line: line.replace(b', ', b','))
          | 'RemoveTrailingPeriodsTestData' >> beam.Map(lambda line: line[:-1])
          | 'DecodeTestData' >> csv_tfxio.BeamSource())

      raw_test_dataset = (raw_test_data, csv_tfxio.TensorAdapterConfig())

      # The TFXIO output format is chosen for improved performance.
      transformed_test_dataset = (
          (raw_test_dataset, transform_fn)
          | tft_beam.TransformDataset(output_record_batches=True))

      # Transformed metadata is not necessary for encoding.
      transformed_test_data, _ = transformed_test_dataset

      # Extract transformed RecordBatches, encode and write them to the given
      # directory.
      _ = (
          | 'EncodeTestData' >>
          beam.FlatMapTuple(lambda batch, _: RecordBatchToExamples(batch))
          | 'WriteTestData' >> beam.io.WriteToTFRecord(
              os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE)))

      # Will write a SavedModel and metadata to working_dir, which can then
      # be read by the tft.TFTransformOutput class.
      _ = (
          | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))




def _make_training_input_fn(tf_transform_output, transformed_examples,
  """An input function reading from transformed data, converting to model input.

    tf_transform_output: Wrapper around output of tf.Transform.
    transformed_examples: Base filename of examples.
    batch_size: Batch size.

    The input data for training or eval, in the form of k.
  def input_fn():
    return tf.data.experimental.make_batched_features_dataset(

  return input_fn



def _make_serving_input_fn(tf_transform_output, raw_examples, batch_size):
  """An input function reading from raw data, converting to model input.

    tf_transform_output: Wrapper around output of tf.Transform.
    raw_examples: Base filename of examples.
    batch_size: Batch size.

    The input data for training or eval, in the form of k.

  def get_ordered_raw_data_dtypes():
    result = []
    for col in ORDERED_CSV_COLUMNS:
      if col not in RAW_DATA_FEATURE_SPEC:
      spec = RAW_DATA_FEATURE_SPEC[col]
      if isinstance(spec, tf.io.FixedLenFeature):
    return result

  def input_fn():
    dataset = tf.data.experimental.make_csv_dataset(

    tft_layer = tf_transform_output.transform_features_layer()

    def transform_dataset(data):
      raw_features = {}
      for key, val in data.items():
        if key not in RAW_DATA_FEATURE_SPEC:
        if isinstance(RAW_DATA_FEATURE_SPEC[key], tf.io.VarLenFeature):
          raw_features[key] = tf.RaggedTensor.from_tensor(
              tf.expand_dims(val, -1)).to_sparse()
        raw_features[key] = val
      transformed_features = tft_layer(raw_features)
      data_labels = transformed_features.pop(LABEL_KEY)
      return (transformed_features, data_labels)

    return dataset.map(

  return input_fn


def export_serving_model(tf_transform_output, model, output_dir):
  """Exports a keras model for serving.

    tf_transform_output: Wrapper around output of tf.Transform.
    model: A keras model to export for serving.
    output_dir: A directory where the model will be exported to.
  # The layer has to be saved to the model for keras tracking purpases.
  model.tft_layer = tf_transform_output.transform_features_layer()

  def serve_tf_examples_fn(serialized_tf_examples):
    """Serving tf.function model wrapper."""
    feature_spec = RAW_DATA_FEATURE_SPEC.copy()
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
    transformed_features = model.tft_layer(parsed_features)
    outputs = model(transformed_features)
    classes_names = tf.constant([['0', '1']])
    classes = tf.tile(classes_names, [tf.shape(outputs)[0], 1])
    return {'classes': classes, 'scores': outputs}

  concrete_serving_fn = serve_tf_examples_fn.get_concrete_function(
      tf.TensorSpec(shape=[None], dtype=tf.string, name='inputs'))
  signatures = {'serving_default': concrete_serving_fn}

  # This is required in order to make this model servable with model_server.
  versioned_output_dir = os.path.join(output_dir, '1')
  model.save(versioned_output_dir, save_format='tf', signatures=signatures)
def train_and_evaluate(working_dir,
  """Train the model on training data and evaluate on test data.

    working_dir: The location of the Transform output.
    num_train_instances: Number of instances in train set
    num_test_instances: Number of instances in test set

    The results from the estimator's 'evaluate' method
  train_data_path_pattern = os.path.join(working_dir,
                                 TRANSFORMED_TRAIN_DATA_FILEBASE + '*')
  eval_data_path_pattern = os.path.join(working_dir,
                            TRANSFORMED_TEST_DATA_FILEBASE + '*')
  tf_transform_output = tft.TFTransformOutput(working_dir)

  train_input_fn = _make_training_input_fn(
      tf_transform_output, train_data_path_pattern, batch_size=TRAIN_BATCH_SIZE)
  train_dataset = train_input_fn()

  # Evaluate model on test dataset.
  eval_input_fn = _make_training_input_fn(
      tf_transform_output, eval_data_path_pattern, batch_size=TRAIN_BATCH_SIZE)
  validation_dataset = eval_input_fn()

  feature_spec = tf_transform_output.transformed_feature_spec().copy()

  inputs = {}
  for key, spec in feature_spec.items():
    if isinstance(spec, tf.io.VarLenFeature):
      inputs[key] = tf.keras.layers.Input(
          shape=[None], name=key, dtype=spec.dtype, sparse=True)
    elif isinstance(spec, tf.io.FixedLenFeature):
      inputs[key] = tf.keras.layers.Input(
          shape=spec.shape, name=key, dtype=spec.dtype)
      raise ValueError('Spec type is not supported: ', key, spec)

  encoded_inputs = {}
  for key in inputs:
    feature = tf.expand_dims(inputs[key], -1)
      num_buckets = tf_transform_output.num_buckets_for_transformed_feature(key)
      encoding_layer = (
              max_tokens=num_buckets, output_mode='binary', sparse=False))
      encoded_inputs[key] = encoding_layer(feature)
      encoded_inputs[key] = feature

  stacked_inputs = tf.concat(tf.nest.flatten(encoded_inputs), axis=1)
  output = tf.keras.layers.Dense(100, activation='relu')(stacked_inputs)
  output = tf.keras.layers.Dense(70, activation='relu')(output)
  output = tf.keras.layers.Dense(50, activation='relu')(output)
  output = tf.keras.layers.Dense(20, activation='relu')(output)
  output = tf.keras.layers.Dense(2, activation='sigmoid')(output)
  model = tf.keras.Model(inputs=inputs, outputs=output)


  model.fit(train_dataset, validation_data=validation_dataset,
            steps_per_epoch=math.ceil(num_train_instances / TRAIN_BATCH_SIZE),
            validation_steps=math.ceil(num_test_instances / TRAIN_BATCH_SIZE))

  # Export the model.
  exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR)
  export_serving_model(tf_transform_output, model, exported_model_dir)

  metrics_values = model.evaluate(validation_dataset, steps=num_test_instances)
  metrics_labels = model.metrics_names
  return {l: v for l, v in zip(metrics_labels, metrics_values)}



import tempfile
temp = os.path.join(tempfile.gettempdir(), 'keras')

transform_data(train, test, temp)
results = train_and_evaluate(temp)
この例では、使用tf.Transform国勢調査データのデータセットを前処理すると、洗浄され、変換されたデータとモデルを訓練します。また、トレーニング済みモデルを本番環境にデプロイして推論を実行するときに使用できる入力関数も作成しました。トレーニングと推論の両方に同じコードを使用することで、データの偏りに関する問題を回避します。その過程で、データのクリーンアップに必要な変換を実行するためのApacheBeam変換の作成について学習しました。また、いずれかを使用してモデルを訓練するために、この変換されたデータを使用する方法を説明しましたtf.kerasまたはtf.estimator 。これは、TensorFlowTransformでできることのほんの一部です。我々はに飛び込むことをお勧めしtf.Transform 、それはあなたのために何ができるかを発見します。