חלק ממערכי הנתונים גדולים מכדי לעבד אותם במכונה אחת. tfds
תומך ביצירת נתונים במכונות רבות באמצעות Apache Beam .
למסמך זה שני חלקים:
- למשתמש המעוניין ליצור מערך נתונים קיים של Beam
- למפתחים שרוצים ליצור מערך נתונים חדש של Beam
יצירת מערך נתונים של Beam
להלן דוגמאות שונות ליצירת מערך נתונים של Beam, הן בענן והן באופן מקומי.
ב-Google Cloud Dataflow
כדי להפעיל את הצינור באמצעות Google Cloud Dataflow ולנצל את היתרונות של חישוב מבוזר, תחילה עקוב אחר הוראות ההתחלה המהירה .
לאחר הגדרת הסביבה שלך, אתה יכול להפעיל את tfds build
CLI באמצעות ספריית נתונים ב- GCS ולציין את האפשרויות הנדרשות עבור הדגל --beam_pipeline_options
.
כדי להקל על הפעלת הסקריפט, כדאי להגדיר את המשתנים הבאים באמצעות הערכים בפועל עבור הגדרת ה-GCP/GCS שלך ומערך הנתונים שברצונך ליצור:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
לאחר מכן תצטרך ליצור קובץ כדי לומר ל-Dataflow להתקין tfds
על העובדים:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
אם אתה משתמש ב- tfds-nightly
, הקפד לבצע הד מ- tfds-nightly
למקרה שמערך הנתונים עודכן מאז המהדורה האחרונה.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
אם אתה משתמש בתלות נוספות שאינן כלולות בספריית TFDS, עקוב אחר ההוראות לניהול תלות בצנרת של Python .
לבסוף, אתה יכול להפעיל את העבודה באמצעות הפקודה למטה:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
באופן מקומי
כדי להפעיל את הסקריפט שלך באופן מקומי באמצעות רץ ברירת המחדל של Apache Beam (הוא חייב להתאים לכל הנתונים בזיכרון), הפקודה זהה למערכי נתונים אחרים:
tfds build my_dataset
עם אפאצ'י פלינק
כדי להפעיל את הצינור באמצעות Apache Flink אתה יכול לקרוא את התיעוד הרשמי . ודא שה-Beam שלך תואם לתאימות גרסת Flink
כדי להקל על הפעלת הסקריפט, כדאי להגדיר את המשתנים הבאים תוך שימוש בערכים האמיתיים עבור הגדרת Flink שלך ומערך הנתונים שברצונך ליצור:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
כדי להפעיל על אשכול Flink משובץ, אתה יכול להפעיל את העבודה באמצעות הפקודה למטה:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
עם סקריפט מותאם אישית
כדי ליצור את מערך הנתונים ב-Beam, ה-API זהה לזה של מערכי נתונים אחרים. אתה יכול להתאים אישית את beam.Pipeline
באמצעות הארגומנטים beam_options
(ו- beam_runner
) של DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
הטמעת מערך נתונים של Beam
דרישות מוקדמות
על מנת לכתוב מערכי נתונים של Apache Beam, עליך להכיר את המושגים הבאים:
- הכירו את המדריך ליצירת מערכי נתונים
tfds
מכיוון שרוב התוכן עדיין חל על מערכי נתונים של Beam. - קבל היכרות עם Apache Beam עם מדריך התכנות Beam .
- אם אתה רוצה ליצור את מערך הנתונים שלך באמצעות Cloud Dataflow, קרא את התיעוד של Google Cloud ואת מדריך התלות של Apache Beam .
הוראות
אם אתה מכיר את המדריך ליצירת מערך נתונים , הוספת מערך נתונים של Beam דורשת רק לשנות את הפונקציה _generate_examples
. הפונקציה צריכה להחזיר אובייקט קרן, ולא מחולל:
מערך נתונים ללא קרן:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
מערך נתונים של קרן:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
כל השאר יכולים להיות זהים ב-100%, כולל בדיקות.
כמה שיקולים נוספים:
- השתמש ב-
tfds.core.lazy_imports
כדי לייבא Apache Beam. על ידי שימוש בתלות עצלה, משתמשים עדיין יכולים לקרוא את מערך הנתונים לאחר יצירתו מבלי להתקין את Beam. - היזהר עם סגירות Python. בעת הפעלת הצינור, פונקציות
beam.Map
ו-beam.DoFn
מסודרות באמצעותpickle
ונשלחות לכל העובדים. אל תשתמש באובייקטים הניתנים לשינוי בתוךbeam.PTransform
אם המצב צריך להיות משותף בין עובדים. - בשל האופן שבו
tfds.core.DatasetBuilder
מסודר עם חמוצים, שינויtfds.core.DatasetBuilder
במהלך יצירת הנתונים יתעלם מהעובדים (למשל, לא ניתן להגדירself.info.metadata['offset'] = 123
ב-_split_generators
וגישה אליו מהעובדים כמוbeam.Map(lambda x: x + self.info.metadata['offset'])
) - אם אתה צריך לשתף כמה שלבי צינור בין הפיצולים, אתה יכול להוסיף להוסיף
pipeline: beam.Pipeline
kwarg ל_split_generator
ולשלוט בצינור הדור המלא. ראה תיעוד_generate_examples
שלtfds.core.GeneratorBasedBuilder
.
דוּגמָה
הנה דוגמה למערך נתונים של Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
הפעלת הצינור שלך
כדי להפעיל את הצינור, עיין בסעיף לעיל.
tfds build my_dataset --register_checksums
צינור באמצעות TFDS כקלט
אם אתה רוצה ליצור צינור קרן שלוקח מערך נתונים של TFDS כמקור, אתה יכול להשתמש ב- tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
זה יעבד כל רסיס של מערך הנתונים במקביל.