คู่มือนี้จะอธิบายวิธีใช้ TFX API เพื่อสร้างคอมโพเนนต์ที่กำหนดเองโดยสมบูรณ์ ส่วนประกอบที่กำหนดเองโดยสมบูรณ์ช่วยให้คุณสร้างส่วนประกอบโดยกำหนดข้อกำหนดส่วนประกอบ ตัวดำเนินการ และคลาสอินเทอร์เฟซส่วนประกอบ แนวทางนี้ช่วยให้คุณใช้ซ้ำและขยายส่วนประกอบมาตรฐานเพื่อให้เหมาะกับความต้องการของคุณได้
หากคุณยังใหม่กับไปป์ไลน์ TFX เรียนรู้เพิ่มเติมเกี่ยวกับแนวคิดหลักของไปป์ไลน์ TFX
ตัวดำเนินการแบบกำหนดเองหรือส่วนประกอบแบบกำหนดเอง
หากจำเป็นต้องใช้ตรรกะการประมวลผลแบบกำหนดเองเท่านั้นในขณะที่คุณสมบัติอินพุต เอาท์พุต และการดำเนินการของส่วนประกอบเหมือนกับส่วนประกอบที่มีอยู่ ตัวดำเนินการแบบกำหนดเองก็เพียงพอแล้ว จำเป็นต้องมีส่วนประกอบแบบกำหนดเองทั้งหมดเมื่อคุณสมบัติอินพุต เอาต์พุต หรือการดำเนินการใดๆ แตกต่างจากส่วนประกอบ TFX ที่มีอยู่
จะสร้างส่วนประกอบที่กำหนดเองได้อย่างไร?
การพัฒนาองค์ประกอบที่กำหนดเองโดยสมบูรณ์จำเป็นต้องมี:
- ชุดข้อกำหนดเฉพาะของอินพุตและเอาท์พุตที่กำหนดไว้สำหรับส่วนประกอบใหม่ โดยเฉพาะอย่างยิ่ง ประเภทสำหรับสิ่งประดิษฐ์อินพุตควรสอดคล้องกับประเภทสิ่งประดิษฐ์เอาต์พุตของส่วนประกอบที่สร้างสิ่งประดิษฐ์ และประเภทของสิ่งประดิษฐ์เอาต์พุตควรสอดคล้องกับประเภทสิ่งประดิษฐ์อินพุตของส่วนประกอบที่ใช้สิ่งประดิษฐ์ หากมี
- พารามิเตอร์การดำเนินการที่ไม่ใช่สิ่งประดิษฐ์ที่จำเป็นสำหรับส่วนประกอบใหม่
ส่วนประกอบSpec
คลาส ComponentSpec
กำหนดสัญญาส่วนประกอบโดยการกำหนดส่วนอินพุตและเอาต์พุตให้กับส่วนประกอบตลอดจนพารามิเตอร์ที่ใช้สำหรับการดำเนินการส่วนประกอบ มีสามส่วน:
- INPUTS : พจนานุกรมของพารามิเตอร์ที่พิมพ์สำหรับส่วนอินพุตที่ส่งผ่านไปยังตัวดำเนินการคอมโพเนนต์ โดยปกติแล้ว สิ่งประดิษฐ์อินพุตคือเอาต์พุตจากส่วนประกอบอัปสตรีม ดังนั้นจึงใช้ประเภทเดียวกันร่วมกัน
- OUTPUTS : พจนานุกรมของพารามิเตอร์ที่พิมพ์สำหรับส่วนเอาต์พุตที่ส่วนประกอบสร้างขึ้น
- PARAMETERS : พจนานุกรมของรายการ ExecutionParameter เพิ่มเติมที่จะถูกส่งผ่านไปยังตัวดำเนินการส่วนประกอบ พารามิเตอร์เหล่านี้ไม่ใช่พารามิเตอร์ที่เราต้องการกำหนดอย่างยืดหยุ่นในไปป์ไลน์ DSL และส่งต่อไปสู่การดำเนินการ
นี่คือตัวอย่างของ ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
ผู้ดำเนินการ
จากนั้น ให้เขียนโค้ดตัวดำเนินการสำหรับส่วนประกอบใหม่ โดยพื้นฐานแล้ว จะต้องสร้างคลาสย่อยใหม่ของ base_executor.BaseExecutor
โดยมีฟังก์ชัน Do
แทนที่ ในฟังก์ชัน Do
อาร์กิวเมนต์ input_dict
, output_dict
และ exec_properties
ที่ถูกส่งผ่านในการแมปไปยัง INPUTS
, OUTPUTS
และ PARAMETERS
ที่กำหนดไว้ใน ComponentSpec ตามลำดับ สำหรับ exec_properties
สามารถดึงค่าได้โดยตรงผ่านการค้นหาพจนานุกรม สำหรับสิ่งประดิษฐ์ใน input_dict
และ output_dict
มีฟังก์ชันอำนวยความสะดวกที่มีอยู่ในคลาส artifact_utils ที่สามารถใช้เพื่อดึงอินสแตนซ์สิ่งประดิษฐ์หรือสิ่งประดิษฐ์ uri
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
หน่วยทดสอบตัวดำเนินการแบบกำหนดเอง
การทดสอบหน่วยสำหรับตัวดำเนินการแบบกำหนดเองสามารถสร้างขึ้นได้คล้ายกับ การทดสอบนี้
อินเทอร์เฟซส่วนประกอบ
ตอนนี้ชิ้นส่วนที่ซับซ้อนที่สุดเสร็จสมบูรณ์แล้ว ขั้นตอนต่อไปคือการประกอบชิ้นส่วนเหล่านี้เข้ากับอินเทอร์เฟซส่วนประกอบ เพื่อให้สามารถใช้ส่วนประกอบในไปป์ไลน์ได้ มีหลายขั้นตอน:
- ทำให้อินเทอร์เฟซคอมโพเนนต์เป็นคลาสย่อยของ
base_component.BaseComponent
- กำหนดตัวแปรคลาส
SPEC_CLASS
ด้วยคลาสComponentSpec
ที่กำหนดไว้ก่อนหน้านี้ - กำหนดตัวแปรคลาส
EXECUTOR_SPEC
ด้วยคลาส Executor ที่กำหนดไว้ก่อนหน้านี้ - กำหนดฟังก์ชันตัวสร้าง
__init__()
โดยใช้อาร์กิวเมนต์ของฟังก์ชันเพื่อสร้างอินสแตนซ์ของคลาส ComponentSpec และเรียกใช้ฟังก์ชัน super ด้วยค่านั้น พร้อมด้วยชื่อที่เป็นทางเลือก
เมื่ออินสแตนซ์ของส่วนประกอบถูกสร้างขึ้น ตรรกะการตรวจสอบประเภทในคลาส base_component.BaseComponent
จะถูกเรียกใช้เพื่อให้แน่ใจว่าอาร์กิวเมนต์ที่ส่งผ่านเข้ากันได้กับข้อมูลประเภทที่กำหนดไว้ในคลาส ComponentSpec
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
ประกอบเป็นไปป์ไลน์ TFX
ขั้นตอนสุดท้ายคือการเสียบส่วนประกอบแบบกำหนดเองใหม่เข้ากับไปป์ไลน์ TFX นอกจากการเพิ่มอินสแตนซ์ของส่วนประกอบใหม่แล้ว ยังจำเป็นต้องมีสิ่งต่อไปนี้ด้วย:
- เชื่อมต่อส่วนประกอบต้นน้ำและปลายน้ำของส่วนประกอบใหม่เข้ากับส่วนประกอบอย่างเหมาะสม สิ่งนี้ทำได้โดยการอ้างอิงเอาท์พุตของส่วนประกอบอัพสตรีมในส่วนประกอบใหม่ และอ้างอิงเอาท์พุตของส่วนประกอบใหม่ในส่วนประกอบดาวน์สตรีม
- เพิ่มอินสแตนซ์ส่วนประกอบใหม่ลงในรายการส่วนประกอบเมื่อสร้างไปป์ไลน์
ตัวอย่างด้านล่างเน้นการเปลี่ยนแปลงที่กล่าวมาข้างต้น ตัวอย่างเต็มสามารถพบได้ใน TFX GitHub repo
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
ปรับใช้คอมโพเนนต์ที่กำหนดเองทั้งหมด
นอกจากการเปลี่ยนแปลงโค้ดแล้ว ส่วนที่เพิ่มใหม่ทั้งหมด ( ComponentSpec
, Executor
, อินเทอร์เฟซส่วนประกอบ) จะต้องสามารถเข้าถึงได้ในสภาพแวดล้อมการทำงานของไปป์ไลน์เพื่อที่จะรันไปป์ไลน์ได้อย่างถูกต้อง