Pipelines 模块
Pipeline orchestration for composing sub-pipelines with priority.
- class datastudio.pipelines.SubPipeline[source]
Bases:
objectExecute a sequence of operators on data items.
Runs each operator in order, splitting items into kept and rejected after each step. Rejected items do not continue to subsequent operators.
- Parameters:
Example:
sub_pipeline = SubPipeline([ ConvLengthFilter(min_length=1, max_length=10), RemoveThinkRewriter(), ]) result = sub_pipeline(data_list)
- class datastudio.pipelines.Pipeline[source]
Bases:
objectComposes sub-pipelines and executes them by priority (lower = earlier).
Operators are resolved from the unified
OPERATORSregistry, so sub-pipelines can freely mix filters and rewriters.- sub_pipelines
List of (sub_pipeline, name) tuples, sorted by priority.
Main pipeline that composes sub-pipelines with priority-based ordering.
Example config:
pipeline = dict(
type='Pipeline',
operations={
'basic_filters': dict(
cfg=dict(type='SubPipeline', operators=[
dict(type='ConvLengthFilter', min_length=1),
dict(type='RemoveThinkRewriter'),
]),
priority=0,
),
'mllm_filter': dict(
cfg=dict(type='SubPipeline', operators=[
dict(type='MLLMFilter', ...),
]),
priority=1,
),
}
)
- class datastudio.pipelines.pipeline.Pipeline[source]
Bases:
objectComposes sub-pipelines and executes them by priority (lower = earlier).
Operators are resolved from the unified
OPERATORSregistry, so sub-pipelines can freely mix filters and rewriters.- sub_pipelines
List of (sub_pipeline, name) tuples, sorted by priority.
Sub-pipeline for sequential operator execution.
- class datastudio.pipelines.sub_pipeline.SubPipeline[source]
Bases:
objectExecute a sequence of operators on data items.
Runs each operator in order, splitting items into kept and rejected after each step. Rejected items do not continue to subsequent operators.
- Parameters:
Example:
sub_pipeline = SubPipeline([ ConvLengthFilter(min_length=1, max_length=10), RemoveThinkRewriter(), ]) result = sub_pipeline(data_list)