Operators 模块
核心类型 (core)
Operator result types.
Immutable decision objects representing filter and rewrite outcomes.
Result bundles decisions and applies them to a DataItem.
- class datastudio.operators.core.result.FilterDecision[source]
Bases:
objectFilter decision for a QA pair or entire item.
- qa_idx
QA pair index (0-based), or -1 for a global decision.
- rejected
Whether this QA pair (or entire item) should be rejected.
- reason
Explanation for the decision.
- class datastudio.operators.core.result.RewriteDecision[source]
Bases:
objectRewrite decision for a single QA pair.
- qa_idx
QA pair index (0-based).
- new_question
Rewritten question text (None = no change).
- new_answer
Rewritten answer text (None = no change).
- message
Description of what was changed.
- class datastudio.operators.core.result.Result[source]
Bases:
objectOperator execution result.
Responsibilities: 1. Store decision lists 2. Apply decisions to DataItem 3. Manage records
Can contain both filter and rewrite decisions, allowing a single operator (e.g., MLLM) to both filter and rewrite in one pass.
-
filter_decisions:
List[FilterDecision]
-
rewrite_decisions:
List[RewriteDecision]
- add_rewrite(qa_idx, new_question=None, new_answer=None, message='')[source]
Add a rewrite decision.
- apply_to(item, op_name)[source]
Apply all decisions to a data item.
Order: rewrite first, then filter (to preserve rewritten content).
- __init__(item_idx, filter_decisions=<factory>, rewrite_decisions=<factory>)
- Parameters:
item_idx (int)
filter_decisions (List[FilterDecision])
rewrite_decisions (List[RewriteDecision])
- Return type:
None
-
filter_decisions:
类层级图
算子基类
Operators for data filtering, rewriting, and MLLM-powered processing.
- Submodules:
filters: Rule-based quality control filters. rewriters: Content transformation rewriters. mllm: MLLM-powered filter and rewrite operators.
- class datastudio.operators.FilterDecision[source]
Bases:
objectFilter decision for a QA pair or entire item.
- qa_idx
QA pair index (0-based), or -1 for a global decision.
- rejected
Whether this QA pair (or entire item) should be rejected.
- reason
Explanation for the decision.
- __init__(qa_idx, rejected, reason='')
- class datastudio.operators.RewriteDecision[source]
Bases:
objectRewrite decision for a single QA pair.
- qa_idx
QA pair index (0-based).
- new_question
Rewritten question text (None = no change).
- new_answer
Rewritten answer text (None = no change).
- message
Description of what was changed.
- __init__(qa_idx, new_question=None, new_answer=None, message='')
- class datastudio.operators.Result[source]
Bases:
objectOperator execution result.
Responsibilities: 1. Store decision lists 2. Apply decisions to DataItem 3. Manage records
Can contain both filter and rewrite decisions, allowing a single operator (e.g., MLLM) to both filter and rewrite in one pass.
- __init__(item_idx, filter_decisions=<factory>, rewrite_decisions=<factory>)
- Parameters:
item_idx (int)
filter_decisions (List[FilterDecision])
rewrite_decisions (List[RewriteDecision])
- Return type:
None
- add_rewrite(qa_idx, new_question=None, new_answer=None, message='')[source]
Add a rewrite decision.
- apply_to(item, op_name)[source]
Apply all decisions to a data item.
Order: rewrite first, then filter (to preserve rewritten content).
-
filter_decisions:
List[FilterDecision]
-
rewrite_decisions:
List[RewriteDecision]
- class datastudio.operators.QA[source]
Bases:
objectImmutable view of a single QA pair.
- idx
QA pair index (0-based).
- question
Question text.
- answer
Answer text.
- ori_answer
Original answer before any rewriting.
- ori_question
Original question before any rewriting.
- __init__(idx, question, answer, ori_answer=None, ori_question=None)
- class datastudio.operators.DataItem[source]
Bases:
objectWrapper around a raw data dict with typed QA pair access.
Example:
item = DataItem(raw_data, idx=0) for qa in item.qa_pairs: print(qa.question, qa.answer) item.set_answer(0, "new answer")
- add_full_filter_record(op_name, reason)[source]
Add a filter record for the entire item (not per-QA).
- add_model_record(op_name, model_name)[source]
Record which model was used by an operator to process this item.
Stored as data[“model”] = {op_name: model_name, …}.
- format_all_qa(with_question=True, with_answer=False, with_original=False)[source]
Format all QA pairs for prompt.
- format_qa(qa_idx, with_question=True, with_answer=False, with_original=False)[source]
Format a single QA pair for prompt.
Display index is always 0 (standard for single-QA prompts).
- get_qa(qa_idx)[source]
Get a specific QA pair.
- Parameters:
qa_idx (
int) – Index of the QA pair.- Return type:
- Returns:
QA object at the specified index.
- Raises:
IndexError – If qa_idx is out of range.
- set_answer(qa_idx, value, save_original=True)[source]
Set answer at index.
- Parameters:
- Raises:
IndexError – If qa_idx is out of range.
- set_question(qa_idx, value, save_original=True)[source]
Set question at index.
- Parameters:
- Raises:
IndexError – If qa_idx is out of range.
- split(kept_indices, rejected_indices)[source]
Split this item into kept and rejected parts.
Creates two new DataItems: - kept_item: Contains only the kept QA pairs - rejected_item: Contains only the rejected QA pairs
Both items have their metadata (filter_ops, rewrite_ops, etc.) properly re-indexed.
- class datastudio.operators.Operator[source]
Bases:
ABCBase class for all operators.
Subclasses implement
process()for single-item logic, or overrideprocess_batch()for batch-level optimization (e.g., MLLM operators).
- class datastudio.operators.Filter[source]
Bases:
OperatorBase class for filter operators.
Subclasses implement check(item, qa_idx) for per-QA filtering.
Example
- class MyFilter(Filter):
- def check(self, item: DataItem, qa_idx: int) -> Tuple[bool, str]:
- if len(item.get_answer(qa_idx)) < 10:
return True, “answer too short”
return False, “”
- class datastudio.operators.Rewriter[source]
Bases:
OperatorBase class for rewrite operators.
Subclasses implement rewrite(item, qa_idx) for per-QA rewriting.
Example
- class MyRewriter(Rewriter):
- def rewrite(self, item: DataItem, qa_idx: int) -> Optional[str]:
answer = item.get_answer(qa_idx) stripped = answer.strip() return stripped if stripped != answer else None
- class datastudio.operators.ConvLengthFilter[source]
Bases:
FilterFilter data items based on conversation length (number of QA pairs).
Example
filter = ConvLengthFilter(min_length=1, max_length=10)
# Check a single item result = filter.process(item)
# Use in pipeline pipeline = Pipeline([filter]) kept, rejected = pipeline(data_list)
- class datastudio.operators.ImageSizeFilter[source]
Bases:
FilterFilter data items based on image dimensions.
Rejects images that are too small or have extreme aspect ratios.
Example
filter = ImageSizeFilter(min_size=100, max_ratio=10.0) pipeline = Pipeline([filter]) kept, rejected = pipeline(data_list)
- __init__(min_size=28, max_ratio=20.0, require_image=False, logger=None, **kwargs)[source]
Initialize the filter.
- class datastudio.operators.ImageAspectRatioFilter[source]
Bases:
FilterFilter data items based on image aspect ratio.
Removes items containing images with extreme aspect ratios.
- class datastudio.operators.ImageExtFilter[source]
Bases:
FilterFilter data items based on image file extensions.
Removes items containing images with excluded file extensions.
- class datastudio.operators.LengthAnomalyFilter[source]
Bases:
FilterFilter data items based on text length anomalies.
Detects and filters QA pairs with abnormally short or long text. Supports per-QA-pair filtering (partial filtering).
- __init__(min_length=1, max_length=8192, check_question=False, check_answer=False, check_ori_answer=False, use_tokenizer=False, logger=None, **kwargs)[source]
Initialize the filter.
- Parameters:
min_length (
int) – Minimum allowed length.max_length (
int) – Maximum allowed length.check_question (
bool) – Whether to check question length.check_answer (
bool) – Whether to check answer length.check_ori_answer (
bool) – Whether to check original answer length.use_tokenizer (
bool) – If True, use tiktoken for length calculation.
- class datastudio.operators.ResponseTagFilter[source]
Bases:
FilterFilter data items that have image/video tags in responses.
Responses should not contain <image> or <video> tags as these indicate potential data quality issues.
- class datastudio.operators.TextRepeatFilter[source]
Bases:
FilterFilter data items with repetitive text patterns.
Detects and filters QA pairs containing consecutive repetitive patterns. Supports per-QA-pair filtering (partial filtering).
- class datastudio.operators.RemoveThinkRewriter[source]
Bases:
RewriterRemove <think>…</think> tags from answers.
These tags often contain model reasoning that shouldn’t be in the final output.
Example
rewriter = RemoveThinkRewriter() pipeline = Pipeline([rewriter]) result, _ = pipeline(data_list)
- class datastudio.operators.NormThinkRewriter[source]
Bases:
RewriterNormalize <think> tag format in answers.
Ensures consistent format: - Lowercase tags - Newline after opening tag - Newline before closing tag
Example
Input: “<THINK>reasoning</THINK>answer” Output: “<think>nreasoningn</think>nanswer”
- THINK_VARIANTS = [(re.compile('<THINK>', re.IGNORECASE), '<think>'), (re.compile('</THINK>', re.IGNORECASE), '</think>'), (re.compile('<think>(?!\\n)'), '<think>\n'), (re.compile('(?<!\\n)</think>'), '\n</think>'), (re.compile('</think>(?!\\n)(?=\\S)'), '</think>\n')]
- class datastudio.operators.AddNoThinkRewriter[source]
Bases:
RewriterAdd empty <think> tags to responses that don’t have them.
This ensures consistent format for responses that should have think tags but are missing them.
- class datastudio.operators.NormImageTagRewriter[source]
Bases:
RewriterNormalize image tag positions in conversations.
Moves all <image> tags to the beginning of the first human message, ensuring consistent format across all data items.
Warning
This rewriter is designed for single-image or multi-image scenarios where all images logically belong to the first question. It does NOT support interleaved image-text conversations (e.g. images scattered across multiple turns), as it will forcibly move all <image> tags to the first message and break the original image-text correspondence.
- class datastudio.operators.NormPromptRewriter[source]
Bases:
RewriterNormalize hint/instruction prompt formatting.
Ensures hints like
"Answer concisely"are properly separated with newlines from the main question text.
- class datastudio.operators.NormMultiTurnPromptRewriter[source]
Bases:
RewriterNormalize instruction patterns in multi-turn conversations.
If the first turn contains instruction patterns (like “Answer concisely”), removes similar patterns from subsequent turns to avoid redundancy.
- class datastudio.operators.RemoveAnswerRewriter[source]
Bases:
RewriterRemove [ANSWER 0] prefix from responses.
Some data may have “[ANSWER 0]” prefixes in responses that should be removed for clean output.
- class datastudio.operators.RemoveReasonRewriter[source]
Bases:
RewriterRemove “Reasoning:” prefix from responses.
Some data may have “Reasoning:” prefixes in responses that should be removed for clean output.
- class datastudio.operators.SplitRewriter[source]
Bases:
OperatorSplit multi-turn conversations into single-turn conversations.
Each QA pair becomes a separate data item, preserving all metadata and adjusting indices accordingly.
Note: This is a special Operator that returns multiple items, handled specially by Pipeline.
- expand_items(items)[source]
Expand multi-turn items into single-turn items.
This should be called directly instead of through Pipeline for splitting operations.
- class datastudio.operators.RequestBuilder[source]
Bases:
objectMLLM request builder.
Responsibilities: 1. Store request_builder configuration 2. Build requests (build_request, build_requests) 3. Format content (format_content) 4. Parse model responses (parse_response)
- Config format:
- request_builder = dict(
type=”RequestBuilder”, prompt=”prompts/filter/xxx.txt”, system_prompt=”prompts/grounding/grounding_system.txt”, key_templates={“result”: “q{idx}”, “reason”: “q{idx}_reason”}, with_image=True, with_answer=False,
)
- prompt
Prompt text or path to .txt file (user message).
- system_prompt
System prompt text or path to .txt file (system message).
- key_templates
Dict mapping field names to key templates with {idx}.
- with_image/with_question/with_answer/with_original
Request flags.
- __init__(prompt=None, system_prompt=None, key_templates=<object object>, with_image=None, with_question=None, with_answer=None, with_original=None, **kwargs)[source]
Initialize request builder.
Instance values override class defaults. For key_templates, explicitly passing None disables JSON parsing.
- build_requests_selective(item, qa_indices)[source]
Build requests for specific QA indices only.
Used by SelectiveMLLMRewriter for conditional rewriting.
- class datastudio.operators.MLLMOperator[source]
Bases:
OperatorBase class for MLLM-powered operators.
Builds requests via
RequestBuilder, callsmodel.generate(), and delegates response parsing to subclasses via_add_decision().- __init__(model, request_builder=None, batch_qa=False, logger=None, **kwargs)[source]
Initialize the MLLM operator.
- class datastudio.operators.MLLMFilter[source]
Bases:
MLLMOperatorFilter data items using MLLM quality assessment.
Expected response format:
{"q0": true/false, "q0_reason": "..."}wheretruemeans reject andfalsemeans keep.Example config:
request_builder = dict( type="RequestBuilder", prompt="prompts/filter/xxx.txt", key_templates={"result": "q{idx}", "reason": "q{idx}_reason"}, with_image=True, )
- __init__(model, request_builder=None, batch_qa=False, logger=None, **kwargs)[source]
Initialize the MLLM operator.
- class datastudio.operators.MLLMRewriter[source]
Bases:
MLLMOperatorRewrite data using MLLM.
Supports both structured (JSON dict) and plain text responses. Set
key_templates=Nonein the request builder for plain text mode.Example config:
request_builder = dict( type="RequestBuilder", prompt="prompts/rewriter/xxx.txt", key_templates={"result": "q{idx}_answer"}, with_image=True, with_answer=True, )
- class datastudio.operators.SelectiveMLLMRewriter[source]
Bases:
MLLMRewriterMLLM rewriter that only processes items matching a condition.
Example:
rewriter = SelectiveMLLMRewriter( model=model, request_builder=dict(type="RequestBuilder", prompt="prompts/translate.txt"), should_rewrite_fn=is_mixed_language, )