قم بإنشاء خط أنابيب TFX باستخدام القوالب

مقدمة

وهذه الوثيقة توفير الإرشادات لإنشاء TensorFlow الموسعة (TFX) خط أنابيب باستخدام القوالب التي يتم توفيرها مع حزمة TFX بيثون. العديد من الإرشادات عبارة عن أوامر Linux shell ، والتي سيتم تشغيلها على مثيل AI Platform Notebooks. المقابلة Jupyter خلايا كود الدفتري التي تحتج تلك الأوامر باستخدام ! تم تقديمة.

سوف بناء خط أنابيب باستخدام تاكسي رحلات بيانات الصادرة عن مدينة شيكاغو. نحن نشجعك بشدة على محاولة بناء خط الأنابيب الخاص بك باستخدام مجموعة البيانات الخاصة بك من خلال استخدام خط الأنابيب هذا كخط أساس.

الخطوة 1. قم بإعداد بيئتك.

ستقوم AI Platform Pipelines بإعداد بيئة تطوير لبناء خط أنابيب ، ومجموعة Kubeflow Pipeline لتشغيل خط الأنابيب المبني حديثًا.

تثبيت tfx حزمة الثعبان مع kfp متطلبات اضافية.

import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"

دعنا نتحقق من إصدارات TFX.

python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 0.29.0

في منصة AI خطوط الأنابيب، TFX قيد التشغيل في بيئة Kubernetes استضافت باستخدام Kubeflow خطوط الأنابيب .

دعنا نضع بعض متغيرات البيئة لاستخدام خطوط أنابيب Kubeflow.

أولاً ، احصل على معرّف مشروع GCP.

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)
env: GOOGLE_CLOUD_PROJECT=tf-benchmark-dashboard
GCP project ID:tf-benchmark-dashboard

نحتاج أيضًا إلى الوصول إلى مجموعة KFP الخاصة بك. يمكنك الوصول إليه في Google Cloud Console ضمن قائمة "AI Platform> Pipeline". يمكن العثور على "نقطة النهاية" لمجموعة KFP من عنوان URL الخاص بلوحة معلومات خطوط الأنابيب ، أو يمكنك الحصول عليها من عنوان URL الخاص بصفحة "البدء" حيث أطلقت هذا الكمبيوتر الدفتري. دعونا تخلق ENDPOINT متغير البيئة وتعيينه إلى نقطة النهاية مجموعة KFP. يجب أن تحتوي نقطة النهاية على جزء اسم المضيف فقط من عنوان URL. على سبيل المثال، إذا كان URL من لوحة القيادة KFP هو <a href="https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start">https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start</a> ، قيمة ENDPOINT يصبح 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com .

# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

تعيين اسم الصورة كما tfx-pipeline في إطار مشروع GCP الحالي.

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

وقد انتهى الأمر. نحن على استعداد لإنشاء خط أنابيب.

الخطوة 2. انسخ النموذج المحدد مسبقًا إلى دليل المشروع.

في هذه الخطوة ، سننشئ دليل وملفات مشروع خط أنابيب عمل عن طريق نسخ ملفات إضافية من قالب محدد مسبقًا.

يمكنك إعطاء خط أنابيب اسما مختلفا عن طريق تغيير PIPELINE_NAME أدناه. سيصبح هذا أيضًا اسم دليل المشروع حيث سيتم وضع ملفاتك.

PIPELINE_NAME="my_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

يشمل TFX على taxi قالب مع حزمة TFX الثعبان. إذا كنت تخطط لحل مشكلة تنبؤ نقطي ، بما في ذلك التصنيف والانحدار ، فيمكن استخدام هذا النموذج كنقطة بداية.

في tfx template copy CLI الأمر بنسخ محددة مسبقا ملفات القالب إلى دليل المشروع الخاص بك.

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
kubeflow_v2_dag_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_v2_dag_runner.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/estimator/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/estimator/__init__.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/keras/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/keras/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/keras/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/keras/__init__.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
model_analysis.ipynb -> /home/kbuilder/imported/my_pipeline/model_analysis.ipynb
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
data_validation.ipynb -> /home/kbuilder/imported/my_pipeline/data_validation.ipynb
.gitignore -> /home/kbuilder/imported/my_pipeline/.gitignore
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 185, in copy_template
    fileio.copy(src_path, dst_path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 51, in copy
    src_fs.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 48, in copy
    tf.io.gfile.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 516, in copy_v2
    compat.path_to_bytes(src), compat.path_to_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AlreadyExistsError: file already exists

قم بتغيير سياق دليل العمل في دفتر الملاحظات هذا إلى دليل المشروع.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

الخطوة 3. تصفح ملفات المصدر المنسوخة

يوفر قالب TFX ملفات سقالة أساسية لإنشاء خط أنابيب ، بما في ذلك كود مصدر Python وبيانات العينة ودفاتر Jupyter لتحليل إخراج خط الأنابيب. في taxi يستخدم قالب نفس مجموعة البيانات شيكاغو تاكسي ونموذج ML مثل تدفق الهواء التعليمي .

فيما يلي مقدمة موجزة عن كل ملف من ملفات Python.

  • pipeline - يحتوي هذا الدليل على تعريف خط أنابيب
    • configs.py - يحدد الثوابت المشتركة للعدائين خط أنابيب
    • pipeline.py - يعرف مكونات TFX وخط أنابيب
  • models - يحتوي هذا الدليل على تعريفات نموذج ML.
    • features.py ، features_test.py - يعرف ملامح لنموذج
    • preprocessing.py ، preprocessing_test.py - يعرف تجهيزها وظائف باستخدام tf::Transform
    • estimator - يحتوي هذا الدليل على نموذج يستند مقدر.
      • constants.py - يعرف الثوابت من طراز
      • model.py ، model_test.py - يحدد نموذج DNN باستخدام TF مقدر
    • keras - يحتوي هذا الدليل نموذج يستند Keras.
      • constants.py - يعرف الثوابت من طراز
      • model.py ، model_test.py - يحدد نموذج DNN باستخدام Keras
  • local_runner.py ، kubeflow_runner.py - تعريف المتسابقين لكل محرك تزامن

كنت قد لاحظت أن هناك بعض الملفات مع _test.py باسمهم. هذه اختبارات وحدة لخط الأنابيب ويوصى بإضافة المزيد من اختبارات الوحدة أثناء تنفيذ خطوط الأنابيب الخاصة بك. يمكنك تشغيل وحدة الاختبارات من خلال تقديم اسم وحدة من الملفات اختبار مع -m العلم. يمكنك عادة الحصول على اسم وحدة عن طريق حذف .py تمديد واستبدال / مع . . فمثلا:

{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testNumberOfBucketFeatureBucketCount
INFO:tensorflow:time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
I1204 11:33:54.064224 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
[       OK ] FeaturesTest.testNumberOfBucketFeatureBucketCount
[ RUN      ] FeaturesTest.testTransformedNames
INFO:tensorflow:time(__main__.FeaturesTest.testTransformedNames): 0.0s
I1204 11:33:54.064666 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testTransformedNames): 0.0s
[       OK ] FeaturesTest.testTransformedNames
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK (skipped=1)
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] ModelTest.testBuildKerasModel
2021-12-04 11:33:57.507456: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusolver.so.10'; dlerror: libcusolver.so.10: cannot open shared object file: No such file or directory
2021-12-04 11:33:57.508566: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
I1204 11:33:57.581331 139740839778112 layer_utils.py:191] Model: "model"
I1204 11:33:57.581501 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.581558 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.581596 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.581741 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.581793 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.581883 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.581926 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582010 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.582052 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582189 139740839778112 layer_utils.py:189] dense_features (DenseFeatures)  (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582241 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582280 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582315 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582462 139740839778112 layer_utils.py:189] dense (Dense)                   (None, 1)            2           dense_features[0][0]             
I1204 11:33:57.582518 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582629 139740839778112 layer_utils.py:189] dense_1 (Dense)                 (None, 1)            2           dense[0][0]                      
I1204 11:33:57.582674 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582824 139740839778112 layer_utils.py:189] dense_features_1 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582879 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582921 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582957 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583053 139740839778112 layer_utils.py:189] concatenate (Concatenate)       (None, 35)           0           dense_1[0][0]                    
I1204 11:33:57.583099 139740839778112 layer_utils.py:189]                                                                  dense_features_1[0][0]           
I1204 11:33:57.583143 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583260 139740839778112 layer_utils.py:189] dense_2 (Dense)                 (None, 1)            36          concatenate[0][0]                
I1204 11:33:57.583309 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583389 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze (TFOpLambd (None,)              0           dense_2[0][0]                    
I1204 11:33:57.583432 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.583687 139740839778112 layer_utils.py:267] Total params: 40
I1204 11:33:57.583751 139740839778112 layer_utils.py:268] Trainable params: 40
I1204 11:33:57.583794 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.583832 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
I1204 11:33:57.649701 139740839778112 layer_utils.py:191] Model: "model_1"
I1204 11:33:57.649825 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.649878 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.649932 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.650066 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650120 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650207 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.650259 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650356 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650398 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650552 139740839778112 layer_utils.py:189] dense_features_2 (DenseFeatures (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.650603 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.650644 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.650682 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650812 139740839778112 layer_utils.py:189] dense_3 (Dense)                 (None, 1)            2           dense_features_2[0][0]           
I1204 11:33:57.650864 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651007 139740839778112 layer_utils.py:189] dense_features_3 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.651061 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.651102 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.651146 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651229 139740839778112 layer_utils.py:189] concatenate_1 (Concatenate)     (None, 35)           0           dense_3[0][0]                    
I1204 11:33:57.651274 139740839778112 layer_utils.py:189]                                                                  dense_features_3[0][0]           
I1204 11:33:57.651311 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651462 139740839778112 layer_utils.py:189] dense_4 (Dense)                 (None, 1)            36          concatenate_1[0][0]              
I1204 11:33:57.651547 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651632 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze_1 (TFOpLam (None,)              0           dense_4[0][0]                    
I1204 11:33:57.651675 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.651959 139740839778112 layer_utils.py:267] Total params: 38
I1204 11:33:57.652019 139740839778112 layer_utils.py:268] Trainable params: 38
I1204 11:33:57.652061 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.652098 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
INFO:tensorflow:time(__main__.ModelTest.testBuildKerasModel): 0.84s
I1204 11:33:57.652639 139740839778112 test_util.py:2076] time(__main__.ModelTest.testBuildKerasModel): 0.84s
[       OK ] ModelTest.testBuildKerasModel
[ RUN      ] ModelTest.test_session
[  SKIPPED ] ModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.836s

OK (skipped=1)

الخطوة 4. قم بتشغيل خط أنابيب TFX الأول الخاص بك

والمكونات في خط أنابيب TFX توليد نواتج كل تشغيل كما قطع أثرية ML الفوقية ، وأنها بحاجة ليتم تخزينها في مكان ما. يمكنك استخدام أي مساحة تخزين يمكن لمجموعة KFP الوصول إليها ، وفي هذا المثال سنستخدم Google Cloud Storage (GCS). يجب إنشاء حاوية GCS الافتراضية تلقائيًا. سيكون اسمها <your-project-id>-kubeflowpipelines-default .

دعنا نحمل بيانات العينة الخاصة بنا إلى حاوية GCS حتى نتمكن من استخدامها في خط الأنابيب لاحقًا.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv
BucketNotFoundException: 404 gs://tf-benchmark-dashboard-kubeflowpipelines-default bucket does not exist.

دعونا إنشاء خط أنابيب TFX باستخدام tfx pipeline create الأوامر.

!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image
CLI
Usage: tfx pipeline create [OPTIONS]
Try 'tfx pipeline create --help' for help.

Error: no such option: --build-image

أثناء إنشاء خط أنابيب، Dockerfile سيتم إنشاء لبناء صورة عامل الميناء. لا تنس إضافته إلى نظام التحكم بالمصادر (على سبيل المثال ، git) مع ملفات المصدر الأخرى.

الآن بدء تنفيذ تشغيل مع خط الأنابيب الذي تم إنشاؤه حديثا باستخدام tfx run create الأوامر.

tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

أو يمكنك أيضًا تشغيل خط الأنابيب في لوحة معلومات KFP. سيتم إدراج تشغيل التنفيذ الجديد ضمن التجارب في لوحة معلومات KFP. سيسمح لك النقر فوق التجربة بمراقبة التقدم وتصور القطع الأثرية التي تم إنشاؤها أثناء تشغيل التنفيذ.

ومع ذلك ، نوصي بزيارة KFP Dashboard. يمكنك الوصول إلى KFP Dashboard من قائمة Cloud AI Platform Pipelines في Google Cloud Console. بمجرد زيارة لوحة القيادة ، ستتمكن من العثور على خط الأنابيب والوصول إلى ثروة من المعلومات حول خط الأنابيب. على سبيل المثال، يمكنك أن تجد أشواط الخاص بك ضمن القائمة التجارب، وعند فتح المدى تنفيذ الخاص بك تحت التجارب يمكنك أن تجد جميع الأعمال الفنية الخاصة بك من خط أنابيب تحت القائمة قطع أثرية.

أحد المصادر الرئيسية للفشل هو المشاكل المتعلقة بالإذن. يرجى التأكد من أن مجموعة KFP الخاصة بك لديها أذونات للوصول إلى Google Cloud APIs. يمكن تكوين هذا عند إنشاء مجموعة KFP في GCP أو راجع وثيقة استكشاف الأخطاء وإصلاحها في GCP .

الخطوة 5. إضافة مكونات للتحقق من صحة البيانات.

في هذه الخطوة، سوف إضافة مكونات التحقق من صحة البيانات بما في ذلك StatisticsGen ، SchemaGen ، و ExampleValidator . إذا كنت مهتما في التحقق من صحة البيانات، الرجاء مراجعة تبدأ مع Tensorflow التحقق من صحة البيانات .

انقر نقرا مزدوجا فوق لتغيير الدليل إلى pipeline وانقر نقرا مزدوجا مرة أخرى لفتح pipeline.py . البحث و uncomment 3 خطوط التي تضيف StatisticsGen ، SchemaGen ، و ExampleValidator إلى خط أنابيب. (نصيحة: البحث عن تعليقات تحتوي على TODO(step 5): ). تأكد من حفظ pipeline.py بعد تحريره.

أنت الآن بحاجة إلى تحديث خط الأنابيب الحالي بتعريف خط الأنابيب المعدل. استخدام tfx pipeline update الأوامر لتحديث خط أنابيب الخاص بك، تليها tfx run create الأوامر لإنشاء المدى إعدام جديدة من خطوط الأنابيب التي تم تحديثها.

# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

تحقق من مخرجات خطوط الأنابيب

قم بزيارة لوحة معلومات KFP للعثور على مخرجات خط الأنابيب في الصفحة لتشغيل خط الأنابيب الخاص بك. انقر فوق علامة التبويب تجارب على اليسار، وجميع أشواط في صفحة التجارب. يجب أن تكون قادرًا على العثور على أحدث تشغيل تحت اسم خط الأنابيب الخاص بك.

الخطوة 6. أضف مكونات للتدريب.

في هذه الخطوة، سوف إضافة مكونات لتدريب والتحقق من صحة النموذج بما في ذلك Transform ، Trainer ، Resolver ، Evaluator ، و Pusher .

انقر نقرا مزدوجا فوق لفتح pipeline.py . البحث و uncomment 5 خطوط التي تضيف Transform ، Trainer ، Resolver ، Evaluator و Pusher إلى خط أنابيب. (نصيحة: البحث عن TODO(step 6): )

كما فعلت من قبل ، تحتاج الآن إلى تحديث خط الأنابيب الحالي بتعريف خط الأنابيب المعدل. التعليمات هي نفس الخطوة 5. تحديث خط الأنابيب باستخدام tfx pipeline update ، وخلق شوط التنفيذ باستخدام tfx run create .

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

عند انتهاء تشغيل هذا التنفيذ بنجاح ، تكون قد أنشأت الآن أول خط أنابيب TFX وتشغيله في خطوط أنابيب النظام الأساسي AI!

الخطوة 7. (اختياري) حاول BigQueryExampleGen

الاستعلام الشامل هو serverless، تدرجية عالية، ومستودع البيانات سحابة فعالة من حيث التكلفة. يمكن استخدام BigQuery كمصدر لتدريب الأمثلة في TFX. في هذه الخطوة، وسوف نضيف BigQueryExampleGen إلى خط أنابيب.

انقر نقرا مزدوجا فوق لفتح pipeline.py . التعليق الخروج CsvExampleGen وحرر السطر الذي يخلق مثيل BigQueryExampleGen . تحتاج أيضا إلى غير تعليق على query حجة create_pipeline وظيفة.

نحن بحاجة إلى تحديد أي مشروع GCP لاستخدامها في الاستعلام الشامل، ويتم ذلك عن طريق وضع --project في beam_pipeline_args عند إنشاء خط أنابيب.

انقر نقرا مزدوجا فوق لفتح configs.py . غير تعليق تعريف GOOGLE_CLOUD_REGION ، BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS و BIG_QUERY_QUERY . يجب استبدال قيمة المنطقة في هذا الملف بالقيم الصحيحة لمشروع Google Cloud Platform الخاص بك.

تغيير الدليل مستوى واحد لأعلى. انقر فوق اسم الدليل أعلى قائمة الملفات. اسم الدليل هو اسم خط الأنابيب الذي هو my_pipeline إذا لم تتغير.

انقر نقرا مزدوجا فوق لفتح kubeflow_runner.py . حجتين غير تعليق، query و beam_pipeline_args ، ل create_pipeline وظيفة.

الآن أصبح خط الأنابيب جاهزًا لاستخدام BigQuery كمصدر نموذجي. قم بتحديث خط الأنابيب كما كان من قبل وقم بإنشاء تشغيل تنفيذ جديد كما فعلنا في الخطوتين 5 و 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

الخطوة 8. (اختياري) حاول تدفق البيانات مع KFP

العديد من يستخدم مكونات TFX أباتشي شعاع لتنفيذ خطوط أنابيب بيانات موازية، وهذا يعني أنه يمكنك توزيع البيانات أعباء معالجة باستخدام جوجل الغيمة تدفق البيانات . في هذه الخطوة ، سنقوم بتعيين منسق Kubeflow لاستخدام تدفق البيانات كخلفية لمعالجة البيانات لـ Apache Beam.

انقر نقرا مزدوجا فوق pipeline لتغيير الدليل، وانقر نقرا مزدوجا لفتح configs.py . غير تعليق تعريف GOOGLE_CLOUD_REGION ، و DATAFLOW_BEAM_PIPELINE_ARGS .

تغيير الدليل مستوى واحد لأعلى. انقر فوق اسم الدليل أعلى قائمة الملفات. اسم الدليل هو اسم خط الأنابيب الذي هو my_pipeline إذا لم تتغير.

انقر نقرا مزدوجا فوق لفتح kubeflow_runner.py . غير تعليق beam_pipeline_args . (أيضا التأكد من أن التعليق خارج التيار beam_pipeline_args التي قمت بإضافتها في الخطوة 7.)

الآن أصبح خط الأنابيب جاهزًا لاستخدام Dataflow. قم بتحديث خط الأنابيب وإنشاء تشغيل تنفيذ كما فعلنا في الخطوتين 5 و 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

يمكنك العثور على وظائف تدفق البيانات الخاصة بك في تدفق البيانات في وحدة التحكم الغيمة .

الخطوة 9. (اختياري) حاول سحابة AI منصة التدريب والتنبؤ مع KFP

interoperates TFX مع العديد من الخدمات GCP المدارة، مثل سحابة منصة AI للتدريب والتنبؤ به . يمكنك تعيين الخاص بك Trainer مكون لاستخدام سحابة AI منصة التدريب، والخدمات المدارة لتدريب نماذج ML. وعلاوة على ذلك، عندما تم بناء النموذج الخاص بك وجاهزة للعمل، يمكنك دفع النموذج الخاص بك إلى نظام التشغيل السحابي AI التنبؤ لخدمة. في هذه الخطوة، فإننا سوف نضع لدينا Trainer و Pusher مكون لاستخدام الخدمات السحابية منصة AI.

قبل تحرير الملفات، قد يكون لديك أولا لتمكين منصة AI التدريب وAPI التنبؤ.

انقر نقرا مزدوجا فوق pipeline لتغيير الدليل، وانقر نقرا مزدوجا لفتح configs.py . غير تعليق تعريف GOOGLE_CLOUD_REGION ، GCP_AI_PLATFORM_TRAINING_ARGS و GCP_AI_PLATFORM_SERVING_ARGS . سوف نستخدم عادتنا بنيت صورة حاوية لتدريب نموذجا في سحابة AI منصة التدريب، لذلك يجب علينا وضع masterConfig.imageUri في GCP_AI_PLATFORM_TRAINING_ARGS إلى نفس القيمة كما CUSTOM_TFX_IMAGE أعلاه.

تغيير الدليل مستوى واحد لأعلى، وانقر نقرا مزدوجا لفتح kubeflow_runner.py . غير تعليق ai_platform_training_args و ai_platform_serving_args .

قم بتحديث خط الأنابيب وإنشاء تشغيل تنفيذ كما فعلنا في الخطوتين 5 و 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

يمكنك العثور على وظائف التدريب الخاص بك في سحابة AI منصة وظائف . إذا اكتمل خط أنابيب بنجاح، يمكنك العثور على النموذج الخاص بك في نماذج سحابة منصة AI .

الخطوة 10. استيعاب البيانات الخاصة بك في خط الأنابيب

لقد صنعنا خط أنابيب لنموذج باستخدام مجموعة بيانات Chicago Taxi. حان الوقت الآن لوضع بياناتك في خط الأنابيب.

يمكن تخزين بياناتك في أي مكان يمكن لخط الأنابيب الوصول إليه ، بما في ذلك GCS أو BigQuery. ستحتاج إلى تعديل تعريف خط الأنابيب للوصول إلى بياناتك.

  1. إذا تم تخزين البيانات في الملفات، تعديل DATA_PATH في kubeflow_runner.py أو local_runner.py وتعيينه إلى موقع الملفات الخاصة بك. إذا تم تخزين البيانات في الاستعلام الشامل، تعديل BIG_QUERY_QUERY في pipeline/configs.py إلى صحيح الاستعلام عن البيانات الخاصة بك.
  2. إضافة ميزات في models/features.py .
  3. تعديل models/preprocessing.py ل تحويل البيانات المدخلة للتدريب .
  4. تعديل models/keras/model.py و models/keras/constants.py ل وصف نموذج ML الخاص بك .
    • يمكنك استخدام نموذج قائم على المقدر أيضًا. تغيير RUN_FN ثابت إلى models.estimator.model.run_fn في pipeline/configs.py .

يرجى الاطلاع على دليل المكون المدرب لمزيد من المقدمة.

تنظيف

لتنظيف كافة الموارد جوجل الغيمة المستخدمة في هذا المشروع، يمكنك حذف مشروع جوجل الغيمة التي استخدمتها لتعليمي.

بدلاً من ذلك ، يمكنك تنظيف الموارد الفردية من خلال زيارة كل وحدة تحكم: