Apache Beam จัดเตรียมเฟรมเวิร์กสำหรับการรันงานการประมวลผลข้อมูลแบบแบตช์และการสตรีมที่ทำงานบนกลไกการดำเนินการที่หลากหลาย ไลบรารี TFX หลายแห่งใช้ Beam เพื่อรันงาน ซึ่งทำให้มีความสามารถในการปรับขนาดในคลัสเตอร์การประมวลผลในระดับสูง Beam มีการรองรับเอ็นจิ้นการประมวลผลหรือ "รันเนอร์" ที่หลากหลาย รวมถึงไดเร็กรันเนอร์ที่ทำงานบนโหนดประมวลผลเดียว และมีประโยชน์มากสำหรับการพัฒนา การทดสอบ หรือการปรับใช้ขนาดเล็ก Beam จัดเตรียมเลเยอร์นามธรรมซึ่งช่วยให้ TFX สามารถทำงานบนรันเนอร์ที่รองรับโดยไม่ต้องแก้ไขโค้ด TFX ใช้ Beam Python API ดังนั้นจึงจำกัดเฉพาะรันเนอร์ที่ Python API รองรับ
การปรับใช้และความสามารถในการขยายขนาด
เนื่องจากข้อกำหนดด้านปริมาณงานเพิ่มขึ้น Beam จึงสามารถปรับขนาดไปสู่การใช้งานขนาดใหญ่มากทั่วทั้งคลัสเตอร์การประมวลผลขนาดใหญ่ สิ่งนี้ถูกจำกัดด้วยความสามารถในการปรับขนาดของนักวิ่งต้นแบบเท่านั้น โดยทั่วไปแล้วนักวิ่งในการปรับใช้ขนาดใหญ่จะถูกปรับใช้กับระบบการจัดการคอนเทนเนอร์เช่น Kubernetes หรือ Apache Mesos สำหรับการปรับใช้แอปพลิเคชันอัตโนมัติ การปรับขนาด และการจัดการ
ดูเอกสารประกอบของ Apache Beam สำหรับข้อมูลเพิ่มเติมเกี่ยวกับ Apache Beam
สำหรับผู้ใช้ Google Cloud นั้น Dataflow คือตัวดำเนินการที่แนะนำ ซึ่งมีแพลตฟอร์มแบบไร้เซิร์ฟเวอร์และคุ้มค่าผ่านการปรับขนาดทรัพยากรอัตโนมัติ การปรับสมดุลงานแบบไดนามิก การผสานรวมเชิงลึกกับบริการอื่นๆ ของ Google Cloud การรักษาความปลอดภัยในตัว และการตรวจสอบ
รหัส Python ที่กำหนดเองและการพึ่งพา
ความซับซ้อนที่โดดเด่นประการหนึ่งของการใช้ Beam ในไปป์ไลน์ TFX คือการจัดการโค้ดที่กำหนดเองและ/หรือการขึ้นต่อกันที่จำเป็นจากโมดูล Python เพิ่มเติม ต่อไปนี้คือตัวอย่างกรณีที่อาจเป็นปัญหา:
- preprocessing_fn จำเป็นต้องอ้างอิงถึงโมดูล Python ของผู้ใช้เอง
- ตัวแยกแบบกำหนดเองสำหรับส่วนประกอบ Evaluator
- โมดูลที่กำหนดเองซึ่งมีคลาสย่อยจากส่วนประกอบ TFX
TFX อาศัยการสนับสนุนของ Beam ใน การจัดการการพึ่งพา Python Pipeline เพื่อจัดการการพึ่งพา Python ขณะนี้มีสองวิธีในการจัดการสิ่งนี้:
- การจัดเตรียมรหัส Python และการพึ่งพาเป็นแพ็คเกจซอร์ส
- [กระแสข้อมูลเท่านั้น] การใช้อิมเมจคอนเทนเนอร์เป็นผู้ปฏิบัติงาน
สิ่งเหล่านี้จะกล่าวถึงต่อไป
จัดเตรียมโค้ด Python และการพึ่งพาเป็นแพ็คเกจซอร์ส
ขอแนะนำสำหรับผู้ใช้ที่:
- คุ้นเคยกับแพ็คเกจ Python และ
- ใช้ซอร์สโค้ด Python เท่านั้น (เช่น ไม่มีโมดูล C หรือไลบรารีที่แบ่งใช้)
โปรดปฏิบัติตามหนึ่งในเส้นทางใน การจัดการการพึ่งพาไพพ์ไลน์ของ Python เพื่อระบุสิ่งนี้โดยใช้หนึ่งในลำแสงต่อไปนี้:
- --setup_file
- --extra_package
- --requirements_file
หมายเหตุ: ในกรณีใดๆ ข้างต้น โปรดตรวจสอบให้แน่ใจว่า tfx
เวอร์ชันเดียวกันนั้นอยู่ในรายการการอ้างอิง
[กระแสข้อมูลเท่านั้น] การใช้อิมเมจคอนเทนเนอร์สำหรับผู้ปฏิบัติงาน
TFX 0.26.0 ขึ้นไปมีการรองรับแบบทดลองสำหรับการใช้ อิมเมจคอนเทนเนอร์แบบกำหนดเอง สำหรับผู้ปฏิบัติงาน Dataflow
เพื่อที่จะใช้สิ่งนี้ คุณต้อง:
- สร้างอิมเมจ Docker ซึ่งมีทั้ง
tfx
และโค้ดที่กำหนดเองของผู้ใช้และการอ้างอิงที่ติดตั้งไว้ล่วงหน้า- สำหรับผู้ใช้ที่ (1) ใช้
tfx>=0.26
และ (2) ใช้ python 3.7 เพื่อพัฒนาไปป์ไลน์ วิธีที่ง่ายที่สุดในการทำเช่นนี้คือการขยายเวอร์ชันที่สอดคล้องกันของอิมเมจtensorflow/tfx
อย่างเป็นทางการ:
- สำหรับผู้ใช้ที่ (1) ใช้
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.
ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
- พุชอิมเมจที่สร้างไปยังรีจิสทรีอิมเมจคอนเทนเนอร์ซึ่งโปรเจ็กต์ที่ Dataflow เข้าถึงได้
- ผู้ใช้ Google Cloud สามารถพิจารณาใช้ Cloud Build ซึ่งดำเนินการตามขั้นตอนข้างต้นโดยอัตโนมัติ
- ระบุ
beam_pipeline_args
ต่อไปนี้:
beam_pipeline_args.extend([
'--runner=DataflowRunner',
'--project={project-id}',
'--worker_harness_container_image={image-ref}',
'--experiments=use_runner_v2',
])
TODO(b/171733562): ลบ use_runner_v2 เมื่อเป็นค่าเริ่มต้นสำหรับ Dataflow
สิ่งที่ต้องทำ (b/179738639): สร้างเอกสารสำหรับวิธีทดสอบคอนเทนเนอร์แบบกำหนดเองในเครื่องหลังจาก https://issues.apache.org/jira/browse/BEAM-5440
อาร์กิวเมนต์ไปป์ไลน์ของบีม
ส่วนประกอบ TFX หลายอย่างอาศัย Beam สำหรับการประมวลผลข้อมูลแบบกระจาย มีการกำหนดค่าด้วย beam_pipeline_args
ซึ่งระบุไว้ระหว่างการสร้างไปป์ไลน์:
my_pipeline = Pipeline(
...,
beam_pipeline_args=[...])
TFX 0.30 และสูงกว่าเพิ่มอินเทอร์เฟซ with_beam_pipeline_args
สำหรับการขยาย args ลำแสงระดับไปป์ไลน์ต่อส่วนประกอบ:
example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])