بعض مجموعات البيانات كبيرة جدًا بحيث لا يمكن معالجتها على جهاز واحد. يدعم tfds
إنشاء البيانات عبر العديد من الأجهزة باستخدام Apache Beam .
يحتوي هذا المستند على قسمين:
- للمستخدم الذي يريد إنشاء مجموعة بيانات Beam موجودة
- للمطورين الذين يرغبون في إنشاء مجموعة بيانات Beam جديدة
إنشاء مجموعة بيانات الشعاع
فيما يلي أمثلة مختلفة لإنشاء مجموعة بيانات Beam، سواء على السحابة أو محليًا.
على Google Cloud Dataflow
لتشغيل المسار باستخدام Google Cloud Dataflow والاستفادة من الحساب الموزع، اتبع أولاً تعليمات Quickstart .
بمجرد إعداد البيئة الخاصة بك، يمكنك تشغيل tfds build
CLI باستخدام دليل البيانات على GCS وتحديد الخيارات المطلوبة لعلامة --beam_pipeline_options
.
لتسهيل تشغيل البرنامج النصي، من المفيد تحديد المتغيرات التالية باستخدام القيم الفعلية لإعداد GCP/GCS ومجموعة البيانات التي تريد إنشاءها:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
ستحتاج بعد ذلك إلى إنشاء ملف لإخبار Dataflow بتثبيت tfds
على العمال:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
إذا كنت تستخدم tfds-nightly
، فتأكد من تكرار صدى tfds-nightly
في حالة تحديث مجموعة البيانات منذ الإصدار الأخير.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
إذا كنت تستخدم تبعيات إضافية غير مضمنة في مكتبة TFDS، فاتبع الإرشادات الخاصة بإدارة تبعيات خطوط أنابيب Python .
وأخيرًا، يمكنك بدء المهمة باستخدام الأمر أدناه:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
محليا
لتشغيل البرنامج النصي محليًا باستخدام مشغل Apache Beam الافتراضي (يجب أن يلائم جميع البيانات الموجودة في الذاكرة)، يكون الأمر هو نفسه المستخدم في مجموعات البيانات الأخرى:
tfds build my_dataset
مع أباتشي فلينك
لتشغيل خط الأنابيب باستخدام Apache Flink، يمكنك قراءة الوثائق الرسمية . تأكد من أن Beam الخاص بك متوافق مع توافق إصدار Flink
لتسهيل تشغيل البرنامج النصي، من المفيد تحديد المتغيرات التالية باستخدام القيم الفعلية لإعداد Flink ومجموعة البيانات التي تريد إنشاءها:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
للتشغيل على مجموعة Flink المضمنة، يمكنك تشغيل المهمة باستخدام الأمر أدناه:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
مع برنامج نصي مخصص
لإنشاء مجموعة البيانات على Beam، تكون واجهة برمجة التطبيقات هي نفسها المستخدمة في مجموعات البيانات الأخرى. يمكنك تخصيص beam.Pipeline
باستخدام وسيطات beam_options
(و beam_runner
) الخاصة بـ DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
تنفيذ مجموعة بيانات الشعاع
المتطلبات الأساسية
لكتابة مجموعات بيانات Apache Beam، يجب أن تكون على دراية بالمفاهيم التالية:
- كن على دراية بدليل إنشاء مجموعة بيانات
tfds
حيث أن معظم المحتوى لا يزال ينطبق على مجموعات بيانات Beam. - احصل على مقدمة عن Apache Beam من خلال دليل برمجة Beam .
- إذا كنت تريد إنشاء مجموعة البيانات الخاصة بك باستخدام Cloud Dataflow، فاقرأ وثائق Google Cloud ودليل تبعية Apache Beam .
تعليمات
إذا كنت على دراية بدليل إنشاء مجموعة البيانات ، فإن إضافة مجموعة بيانات Beam تتطلب فقط تعديل وظيفة _generate_examples
. يجب أن تقوم الدالة بإرجاع كائن شعاع، بدلاً من مولد:
مجموعة البيانات غير الشعاعية:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
مجموعة بيانات الشعاع:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
كل الباقي يمكن أن يكون متطابقًا بنسبة 100%، بما في ذلك الاختبارات.
بعض الاعتبارات الإضافية:
- استخدم
tfds.core.lazy_imports
لاستيراد Apache Beam. باستخدام التبعية البطيئة، لا يزال بإمكان المستخدمين قراءة مجموعة البيانات بعد إنشائها دون الحاجة إلى تثبيت Beam. - كن حذرًا مع عمليات إغلاق بايثون. عند تشغيل خط الأنابيب، يتم إجراء تسلسل لوظائف
beam.Map
وbeam.DoFn
باستخدامpickle
وإرسالها إلى جميع العمال. لا تستخدم كائنات قابلة للتغيير داخلbeam.PTransform
إذا كان يجب مشاركة الحالة بين العمال. - نظرًا للطريقة التي يتم بها تسلسل
tfds.core.DatasetBuilder
باستخدام المخلل، سيتم تجاهل تغييرtfds.core.DatasetBuilder
أثناء إنشاء البيانات على العاملين (على سبيل المثال، ليس من الممكن تعيينself.info.metadata['offset'] = 123
في_split_generators
والوصول إليها من العمال مثلbeam.Map(lambda x: x + self.info.metadata['offset'])
) - إذا كنت بحاجة إلى مشاركة بعض خطوات خط الأنابيب بين الانقسامات، فيمكنك إضافة إضافة
pipeline: beam.Pipeline
kwarg to_split_generator
والتحكم في خط أنابيب الجيل الكامل. راجع وثائق_generate_examples
الخاصة بـtfds.core.GeneratorBasedBuilder
.
مثال
فيما يلي مثال لمجموعة بيانات Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
تشغيل خط الأنابيب الخاص بك
لتشغيل خط الأنابيب، قم بإلقاء نظرة على القسم أعلاه.
tfds build my_dataset --register_checksums
خط أنابيب يستخدم TFDS كمدخل
إذا كنت تريد إنشاء خط أنابيب شعاعي يأخذ مجموعة بيانات TFDS كمصدر، فيمكنك استخدام tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
سيتم معالجة كل جزء من مجموعة البيانات بالتوازي.