إنشاء مجموعات بيانات كبيرة باستخدام Apache Beam

بعض مجموعات البيانات كبيرة جدًا بحيث لا يمكن معالجتها على جهاز واحد. يدعم tfds إنشاء البيانات عبر العديد من الأجهزة باستخدام Apache Beam .

يحتوي هذا المستند على قسمين:

  • للمستخدم الذي يريد إنشاء مجموعة بيانات Beam موجودة
  • للمطورين الذين يرغبون في إنشاء مجموعة بيانات Beam جديدة

إنشاء مجموعة بيانات الشعاع

فيما يلي أمثلة مختلفة لإنشاء مجموعة بيانات Beam، سواء على السحابة أو محليًا.

على Google Cloud Dataflow

لتشغيل المسار باستخدام Google Cloud Dataflow والاستفادة من الحساب الموزع، اتبع أولاً تعليمات Quickstart .

بمجرد إعداد البيئة الخاصة بك، يمكنك تشغيل tfds build CLI باستخدام دليل البيانات على GCS وتحديد الخيارات المطلوبة لعلامة --beam_pipeline_options .

لتسهيل تشغيل البرنامج النصي، من المفيد تحديد المتغيرات التالية باستخدام القيم الفعلية لإعداد GCP/GCS ومجموعة البيانات التي تريد إنشاءها:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

ستحتاج بعد ذلك إلى إنشاء ملف لإخبار Dataflow بتثبيت tfds على العمال:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

إذا كنت تستخدم tfds-nightly ، فتأكد من تكرار صدى tfds-nightly في حالة تحديث مجموعة البيانات منذ الإصدار الأخير.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

إذا كنت تستخدم تبعيات إضافية غير مضمنة في مكتبة TFDS، فاتبع الإرشادات الخاصة بإدارة تبعيات خطوط أنابيب Python .

وأخيرًا، يمكنك بدء المهمة باستخدام الأمر أدناه:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

محليا

لتشغيل البرنامج النصي محليًا باستخدام مشغل Apache Beam الافتراضي (يجب أن يلائم جميع البيانات الموجودة في الذاكرة)، يكون الأمر هو نفسه المستخدم في مجموعات البيانات الأخرى:

tfds build my_dataset

لتشغيل خط الأنابيب باستخدام Apache Flink، يمكنك قراءة الوثائق الرسمية . تأكد من أن Beam الخاص بك متوافق مع توافق إصدار Flink

لتسهيل تشغيل البرنامج النصي، من المفيد تحديد المتغيرات التالية باستخدام القيم الفعلية لإعداد Flink ومجموعة البيانات التي تريد إنشاءها:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

للتشغيل على مجموعة Flink المضمنة، يمكنك تشغيل المهمة باستخدام الأمر أدناه:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

مع برنامج نصي مخصص

لإنشاء مجموعة البيانات على Beam، تكون واجهة برمجة التطبيقات هي نفسها المستخدمة في مجموعات البيانات الأخرى. يمكنك تخصيص beam.Pipeline باستخدام وسيطات beam_optionsbeam_runner ) الخاصة بـ DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

تنفيذ مجموعة بيانات الشعاع

المتطلبات الأساسية

لكتابة مجموعات بيانات Apache Beam، يجب أن تكون على دراية بالمفاهيم التالية:

تعليمات

إذا كنت على دراية بدليل إنشاء مجموعة البيانات ، فإن إضافة مجموعة بيانات Beam تتطلب فقط تعديل وظيفة _generate_examples . يجب أن تقوم الدالة بإرجاع كائن شعاع، بدلاً من مولد:

مجموعة البيانات غير الشعاعية:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

مجموعة بيانات الشعاع:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

كل الباقي يمكن أن يكون متطابقًا بنسبة 100%، بما في ذلك الاختبارات.

بعض الاعتبارات الإضافية:

  • استخدم tfds.core.lazy_imports لاستيراد Apache Beam. باستخدام التبعية البطيئة، لا يزال بإمكان المستخدمين قراءة مجموعة البيانات بعد إنشائها دون الحاجة إلى تثبيت Beam.
  • كن حذرًا مع عمليات إغلاق بايثون. عند تشغيل خط الأنابيب، يتم إجراء تسلسل لوظائف beam.Map و beam.DoFn باستخدام pickle وإرسالها إلى جميع العمال. لا تستخدم كائنات قابلة للتغيير داخل beam.PTransform إذا كان يجب مشاركة الحالة بين العمال.
  • نظرًا للطريقة التي يتم بها تسلسل tfds.core.DatasetBuilder باستخدام المخلل، سيتم تجاهل تغيير tfds.core.DatasetBuilder أثناء إنشاء البيانات على العاملين (على سبيل المثال، ليس من الممكن تعيين self.info.metadata['offset'] = 123 في _split_generators والوصول إليها من العمال مثل beam.Map(lambda x: x + self.info.metadata['offset']) )
  • إذا كنت بحاجة إلى مشاركة بعض خطوات خط الأنابيب بين الانقسامات، فيمكنك إضافة إضافة pipeline: beam.Pipeline kwarg to _split_generator والتحكم في خط أنابيب الجيل الكامل. راجع وثائق _generate_examples الخاصة بـ tfds.core.GeneratorBasedBuilder .

مثال

فيما يلي مثال لمجموعة بيانات Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

تشغيل خط الأنابيب الخاص بك

لتشغيل خط الأنابيب، قم بإلقاء نظرة على القسم أعلاه.

tfds build my_dataset --register_checksums

خط أنابيب يستخدم TFDS كمدخل

إذا كنت تريد إنشاء خط أنابيب شعاعي يأخذ مجموعة بيانات TFDS كمصدر، فيمكنك استخدام tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

سيتم معالجة كل جزء من مجموعة البيانات بالتوازي.