Pipelines 模块

Pipeline orchestration for composing sub-pipelines with priority.

class datastudio.pipelines.SubPipeline[source]

Bases: object

Execute a sequence of operators on data items.

Runs each operator in order, splitting items into kept and rejected after each step. Rejected items do not continue to subsequent operators.

Parameters:
  • operators (List[Operator]) – List of operators to execute in order.

  • name (Optional[str]) – Optional name for the sub-pipeline.

  • priority (int) – Execution priority (lower = earlier).

  • logger (Optional[Any]) – Logger instance.

Example:

sub_pipeline = SubPipeline([
    ConvLengthFilter(min_length=1, max_length=10),
    RemoveThinkRewriter(),
])
result = sub_pipeline(data_list)
__init__(operators, name=None, priority=0, logger=None)[source]
Parameters:
class datastudio.pipelines.Pipeline[source]

Bases: object

Composes sub-pipelines and executes them by priority (lower = earlier).

Operators are resolved from the unified OPERATORS registry, so sub-pipelines can freely mix filters and rewriters.

sub_pipelines

List of (sub_pipeline, name) tuples, sorted by priority.

__init__(operations, logger, model=None, stats_collector=None)[source]

Initialize the pipeline.

Parameters:
  • operations (Dict[str, Dict]) – Dict of operation configs with cfg and priority.

  • logger (Any) – Logger instance.

  • model (Optional[Any]) – Optional model instance for MLLM operators.

  • stats_collector (Optional[StatsCollector]) – Optional statistics collector for tracking operator stats.

datastudio.pipelines.wrap_items(datas)[source]

Wrap raw dicts into DataItems.

Return type:

List[DataItem]

Parameters:

datas (List[Dict])

datastudio.pipelines.unwrap_items(items)[source]

Extract raw dicts from DataItems.

Return type:

List[Dict]

Parameters:

items (List[DataItem])

Main pipeline that composes sub-pipelines with priority-based ordering.

Example config:

pipeline = dict(
    type='Pipeline',
    operations={
        'basic_filters': dict(
            cfg=dict(type='SubPipeline', operators=[
                dict(type='ConvLengthFilter', min_length=1),
                dict(type='RemoveThinkRewriter'),
            ]),
            priority=0,
        ),
        'mllm_filter': dict(
            cfg=dict(type='SubPipeline', operators=[
                dict(type='MLLMFilter', ...),
            ]),
            priority=1,
        ),
    }
)
class datastudio.pipelines.pipeline.Pipeline[source]

Bases: object

Composes sub-pipelines and executes them by priority (lower = earlier).

Operators are resolved from the unified OPERATORS registry, so sub-pipelines can freely mix filters and rewriters.

sub_pipelines

List of (sub_pipeline, name) tuples, sorted by priority.

__init__(operations, logger, model=None, stats_collector=None)[source]

Initialize the pipeline.

Parameters:
  • operations (Dict[str, Dict]) – Dict of operation configs with cfg and priority.

  • logger (Any) – Logger instance.

  • model (Optional[Any]) – Optional model instance for MLLM operators.

  • stats_collector (Optional[StatsCollector]) – Optional statistics collector for tracking operator stats.

Sub-pipeline for sequential operator execution.

datastudio.pipelines.sub_pipeline.wrap_items(datas)[source]

Wrap raw dicts into DataItems.

Return type:

List[DataItem]

Parameters:

datas (List[Dict])

datastudio.pipelines.sub_pipeline.unwrap_items(items)[source]

Extract raw dicts from DataItems.

Return type:

List[Dict]

Parameters:

items (List[DataItem])

class datastudio.pipelines.sub_pipeline.SubPipeline[source]

Bases: object

Execute a sequence of operators on data items.

Runs each operator in order, splitting items into kept and rejected after each step. Rejected items do not continue to subsequent operators.

Parameters:
  • operators (List[Operator]) – List of operators to execute in order.

  • name (Optional[str]) – Optional name for the sub-pipeline.

  • priority (int) – Execution priority (lower = earlier).

  • logger (Optional[Any]) – Logger instance.

Example:

sub_pipeline = SubPipeline([
    ConvLengthFilter(min_length=1, max_length=10),
    RemoveThinkRewriter(),
])
result = sub_pipeline(data_list)
__init__(operators, name=None, priority=0, logger=None)[source]
Parameters: