יצירת מערכי נתונים גדולים עם Apache Beam

חלק ממערכי הנתונים גדולים מכדי לעבד אותם במכונה אחת. tfds תומך ביצירת נתונים במכונות רבות באמצעות Apache Beam .

למסמך זה שני חלקים:

  • למשתמש המעוניין ליצור מערך נתונים קיים של Beam
  • למפתחים שרוצים ליצור מערך נתונים חדש של Beam

יצירת מערך נתונים של Beam

להלן דוגמאות שונות ליצירת מערך נתונים של Beam, הן בענן והן באופן מקומי.

ב-Google Cloud Dataflow

כדי להפעיל את הצינור באמצעות Google Cloud Dataflow ולנצל את היתרונות של חישוב מבוזר, תחילה עקוב אחר הוראות ההתחלה המהירה .

לאחר הגדרת הסביבה שלך, אתה יכול להפעיל את tfds build CLI באמצעות ספריית נתונים ב- GCS ולציין את האפשרויות הנדרשות עבור הדגל --beam_pipeline_options .

כדי להקל על הפעלת הסקריפט, כדאי להגדיר את המשתנים הבאים באמצעות הערכים בפועל עבור הגדרת ה-GCP/GCS שלך ומערך הנתונים שברצונך ליצור:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

לאחר מכן תצטרך ליצור קובץ כדי לומר ל-Dataflow להתקין tfds על העובדים:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

אם אתה משתמש ב- tfds-nightly , הקפד לבצע הד מ- tfds-nightly למקרה שמערך הנתונים עודכן מאז המהדורה האחרונה.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

אם אתה משתמש בתלות נוספות שאינן כלולות בספריית TFDS, עקוב אחר ההוראות לניהול תלות בצנרת של Python .

לבסוף, אתה יכול להפעיל את העבודה באמצעות הפקודה למטה:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

באופן מקומי

כדי להריץ את הסקריפט שלך באופן מקומי באמצעות רץ ברירת המחדל של Apache Beam (הוא חייב להתאים לכל הנתונים בזיכרון), הפקודה זהה למערכי נתונים אחרים:

tfds build my_dataset

כדי להפעיל את הצינור באמצעות Apache Flink אתה יכול לקרוא את התיעוד הרשמי . ודא שה-Beam שלך תואם לתאימות גרסת Flink

כדי להקל על הפעלת הסקריפט, כדאי להגדיר את המשתנים הבאים תוך שימוש בערכים האמיתיים עבור הגדרת Flink שלך ומערך הנתונים שברצונך ליצור:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

כדי להפעיל על אשכול Flink משובץ, אתה יכול להפעיל את העבודה באמצעות הפקודה למטה:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

עם סקריפט מותאם אישית

כדי ליצור את מערך הנתונים ב-Beam, ה-API זהה לזה של מערכי נתונים אחרים. אתה יכול להתאים אישית את beam.Pipeline באמצעות הארגומנטים beam_options (ו- beam_runner ) של DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

הטמעת מערך נתונים של Beam

דרישות מוקדמות

על מנת לכתוב מערכי נתונים של Apache Beam, עליך להכיר את המושגים הבאים:

הוראות

אם אתה מכיר את המדריך ליצירת מערך נתונים , הוספת מערך נתונים של Beam דורשת רק לשנות את הפונקציה _generate_examples . הפונקציה צריכה להחזיר אובייקט קרן, ולא מחולל:

מערך נתונים ללא קרן:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

מערך נתונים של קרן:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

כל השאר יכולים להיות זהים ב-100%, כולל בדיקות.

כמה שיקולים נוספים:

  • השתמש ב- tfds.core.lazy_imports כדי לייבא Apache Beam. על ידי שימוש בתלות עצלה, משתמשים עדיין יכולים לקרוא את מערך הנתונים לאחר יצירתו מבלי להתקין את Beam.
  • היזהר עם סגירות Python. בעת הפעלת הצינור, פונקציות beam.Map ו- beam.DoFn מסודרות באמצעות pickle ונשלחות לכל העובדים. אל תשתמש באובייקטים הניתנים לשינוי בתוך beam.PTransform אם המצב צריך להיות משותף בין עובדים.
  • בשל האופן שבו tfds.core.DatasetBuilder מסודר עם חמוצים, שינוי tfds.core.DatasetBuilder במהלך יצירת הנתונים יתעלם מהעובדים (למשל, לא ניתן להגדיר self.info.metadata['offset'] = 123 ב- _split_generators וגישה אליו מהעובדים כמו beam.Map(lambda x: x + self.info.metadata['offset']) )
  • אם אתה צריך לשתף כמה שלבי צינור בין הפיצולים, אתה יכול להוסיף להוסיף pipeline: beam.Pipeline kwarg ל _split_generator ולשלוט בצינור הדור המלא. ראה תיעוד _generate_examples של tfds.core.GeneratorBasedBuilder .

דוּגמָה

הנה דוגמה למערך נתונים של Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

הפעלת הצינור שלך

כדי להפעיל את הצינור, עיין בסעיף לעיל.

tfds build my_dataset --register_checksums

צינור באמצעות TFDS כקלט

אם אתה רוצה ליצור צינור קרן שלוקח מערך נתונים של TFDS כמקור, אתה יכול להשתמש ב- tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

זה יעבד כל רסיס של מערך הנתונים במקביל.