Operators 模块

核心类型 (core)

Operator result types.

Immutable decision objects representing filter and rewrite outcomes. Result bundles decisions and applies them to a DataItem.

class datastudio.operators.core.result.FilterDecision[source]

Bases: object

Filter decision for a QA pair or entire item.

qa_idx

QA pair index (0-based), or -1 for a global decision.

rejected

Whether this QA pair (or entire item) should be rejected.

reason

Explanation for the decision.

qa_idx: int
rejected: bool
reason: str = ''
__init__(qa_idx, rejected, reason='')
Parameters:
Return type:

None

class datastudio.operators.core.result.RewriteDecision[source]

Bases: object

Rewrite decision for a single QA pair.

qa_idx

QA pair index (0-based).

new_question

Rewritten question text (None = no change).

new_answer

Rewritten answer text (None = no change).

message

Description of what was changed.

qa_idx: int
new_question: Optional[str] = None
new_answer: Optional[str] = None
message: str = ''
__init__(qa_idx, new_question=None, new_answer=None, message='')
Parameters:
  • qa_idx (int)

  • new_question (str | None)

  • new_answer (str | None)

  • message (str)

Return type:

None

class datastudio.operators.core.result.Result[source]

Bases: object

Operator execution result.

Responsibilities: 1. Store decision lists 2. Apply decisions to DataItem 3. Manage records

Can contain both filter and rewrite decisions, allowing a single operator (e.g., MLLM) to both filter and rewrite in one pass.

item_idx: int
filter_decisions: List[FilterDecision]
rewrite_decisions: List[RewriteDecision]
add_filter(qa_idx, rejected, reason='')[source]

Add a filter decision.

Parameters:
add_rewrite(qa_idx, new_question=None, new_answer=None, message='')[source]

Add a rewrite decision.

Parameters:
  • qa_idx (int)

  • new_question (str | None)

  • new_answer (str | None)

  • message (str)

property has_filter: bool

Whether any QA pair is marked for filtering.

property has_rewrite: bool

Whether any QA pair has rewrite changes.

apply_to(item, op_name)[source]

Apply all decisions to a data item.

Order: rewrite first, then filter (to preserve rewritten content).

Parameters:
  • item (DataItem) – DataItem to modify.

  • op_name (str) – Operator name for records.

Return type:

Tuple[Optional[DataItem], Optional[DataItem]]

Returns:

(kept_item, rejected_item)

__init__(item_idx, filter_decisions=<factory>, rewrite_decisions=<factory>)
Parameters:
Return type:

None

datastudio.operators.core.result.OperatorResult

alias of Result

类层级图

Inheritance diagram of datastudio.operators.core.result.FilterDecision, datastudio.operators.core.result.RewriteDecision, datastudio.operators.core.result.OperatorResult

算子基类

Operators for data filtering, rewriting, and MLLM-powered processing.

Submodules:

filters: Rule-based quality control filters. rewriters: Content transformation rewriters. mllm: MLLM-powered filter and rewrite operators.

class datastudio.operators.FilterDecision[source]

Bases: object

Filter decision for a QA pair or entire item.

qa_idx

QA pair index (0-based), or -1 for a global decision.

rejected

Whether this QA pair (or entire item) should be rejected.

reason

Explanation for the decision.

__init__(qa_idx, rejected, reason='')
Parameters:
Return type:

None

reason: str = ''
qa_idx: int
rejected: bool
class datastudio.operators.RewriteDecision[source]

Bases: object

Rewrite decision for a single QA pair.

qa_idx

QA pair index (0-based).

new_question

Rewritten question text (None = no change).

new_answer

Rewritten answer text (None = no change).

message

Description of what was changed.

__init__(qa_idx, new_question=None, new_answer=None, message='')
Parameters:
  • qa_idx (int)

  • new_question (str | None)

  • new_answer (str | None)

  • message (str)

Return type:

None

message: str = ''
new_answer: Optional[str] = None
new_question: Optional[str] = None
qa_idx: int
class datastudio.operators.Result[source]

Bases: object

Operator execution result.

Responsibilities: 1. Store decision lists 2. Apply decisions to DataItem 3. Manage records

Can contain both filter and rewrite decisions, allowing a single operator (e.g., MLLM) to both filter and rewrite in one pass.

__init__(item_idx, filter_decisions=<factory>, rewrite_decisions=<factory>)
Parameters:
Return type:

None

add_filter(qa_idx, rejected, reason='')[source]

Add a filter decision.

Parameters:
add_rewrite(qa_idx, new_question=None, new_answer=None, message='')[source]

Add a rewrite decision.

Parameters:
  • qa_idx (int)

  • new_question (str | None)

  • new_answer (str | None)

  • message (str)

apply_to(item, op_name)[source]

Apply all decisions to a data item.

Order: rewrite first, then filter (to preserve rewritten content).

Parameters:
  • item (DataItem) – DataItem to modify.

  • op_name (str) – Operator name for records.

Return type:

Tuple[Optional[DataItem], Optional[DataItem]]

Returns:

(kept_item, rejected_item)

property has_filter: bool

Whether any QA pair is marked for filtering.

property has_rewrite: bool

Whether any QA pair has rewrite changes.

item_idx: int
filter_decisions: List[FilterDecision]
rewrite_decisions: List[RewriteDecision]
datastudio.operators.OperatorResult

alias of Result

class datastudio.operators.QA[source]

Bases: object

Immutable view of a single QA pair.

idx

QA pair index (0-based).

question

Question text.

answer

Answer text.

ori_answer

Original answer before any rewriting.

ori_question

Original question before any rewriting.

__init__(idx, question, answer, ori_answer=None, ori_question=None)
Parameters:
  • idx (int)

  • question (str)

  • answer (str)

  • ori_answer (str | None)

  • ori_question (str | None)

Return type:

None

ori_answer: Optional[str] = None
ori_question: Optional[str] = None
idx: int
question: str
answer: str
class datastudio.operators.DataItem[source]

Bases: object

Wrapper around a raw data dict with typed QA pair access.

Example:

item = DataItem(raw_data, idx=0)
for qa in item.qa_pairs:
    print(qa.question, qa.answer)
item.set_answer(0, "new answer")
__init__(data, idx=0)[source]

Initialize a DataItem.

Parameters:
  • data (Dict) – Raw data dict (will be modified in place for rewrites).

  • idx (int) – Index in the batch (for tracking).

add_filter_record(op_name, qa_idx, reason)[source]

Add a filter record (for rejected items).

Parameters:
add_full_filter_record(op_name, reason)[source]

Add a filter record for the entire item (not per-QA).

Parameters:
add_keep_record(op_name, qa_idx, reason)[source]

Add a keep record (for kept items with reason).

Parameters:
add_model_record(op_name, model_name)[source]

Record which model was used by an operator to process this item.

Stored as data[“model”] = {op_name: model_name, …}.

Parameters:
  • op_name (str) – Operator name (e.g., “MLLMFilter”).

  • model_name (str) – Model identifier (e.g., “Qwen2-VL-72B”).

add_rewrite_record(op_name, qa_idx, message)[source]

Add a rewrite record.

Parameters:
property conversations: List[Dict]

Get the conversations list.

property data: Dict

Access the underlying data dict.

format_all_qa(with_question=True, with_answer=False, with_original=False)[source]

Format all QA pairs for prompt.

Return type:

str

Parameters:
  • with_question (bool)

  • with_answer (bool)

  • with_original (bool)

format_qa(qa_idx, with_question=True, with_answer=False, with_original=False)[source]

Format a single QA pair for prompt.

Display index is always 0 (standard for single-QA prompts).

Return type:

str

Parameters:
  • qa_idx (int)

  • with_question (bool)

  • with_answer (bool)

  • with_original (bool)

get_answer(qa_idx)[source]

Get answer at index.

Return type:

str

Parameters:

qa_idx (int)

get_qa(qa_idx)[source]

Get a specific QA pair.

Parameters:

qa_idx (int) – Index of the QA pair.

Return type:

QA

Returns:

QA object at the specified index.

Raises:

IndexError – If qa_idx is out of range.

get_question(qa_idx)[source]

Get question at index.

Return type:

str

Parameters:

qa_idx (int)

property has_image: bool

Whether this item has an image.

property image: Any | None

Get the image (PIL Image or None).

property is_rejected: bool

Whether this item is marked as rejected.

mark_kept()[source]

Mark this item as not rejected (kept).

mark_rejected()[source]

Mark this item as rejected.

property qa_count: int

Number of QA pairs.

property qa_pairs: List[QA]

Get all QA pairs (cached).

set_answer(qa_idx, value, save_original=True)[source]

Set answer at index.

Parameters:
  • qa_idx (int) – Index of the QA pair.

  • value (str) – New answer value.

  • save_original (bool) – Whether to save the original value.

Raises:

IndexError – If qa_idx is out of range.

set_question(qa_idx, value, save_original=True)[source]

Set question at index.

Parameters:
  • qa_idx (int) – Index of the QA pair.

  • value (str) – New question value.

  • save_original (bool) – Whether to save the original value.

Raises:

IndexError – If qa_idx is out of range.

split(kept_indices, rejected_indices)[source]

Split this item into kept and rejected parts.

Creates two new DataItems: - kept_item: Contains only the kept QA pairs - rejected_item: Contains only the rejected QA pairs

Both items have their metadata (filter_ops, rewrite_ops, etc.) properly re-indexed.

Parameters:
  • kept_indices (List[int]) – Original indices of QA pairs to keep.

  • rejected_indices (List[int]) – Original indices of QA pairs to reject.

Return type:

Tuple[DataItem, DataItem]

Returns:

Tuple of (kept_item, rejected_item).

class datastudio.operators.Operator[source]

Bases: ABC

Base class for all operators.

Subclasses implement process() for single-item logic, or override process_batch() for batch-level optimization (e.g., MLLM operators).

__init__(name=None, logger=None, **kwargs)[source]

Initialize the operator.

Parameters:
  • name (Optional[str]) – Operator name (defaults to class name).

  • logger (Optional[Any]) – Logger instance.

property name: str

Operator name for records.

abstractmethod process(item)[source]

Process a single data item.

Parameters:

item (DataItem) – DataItem to process.

Return type:

Result

Returns:

Result with filter and/or rewrite decisions.

process_batch(items)[source]

Process a batch of items.

Default implementation calls process() for each item. Override for batch optimization (e.g., MLLM operators).

Parameters:

items (List[DataItem]) – List of DataItems.

Return type:

List[Result]

Returns:

List of Results (same order as input).

class datastudio.operators.Filter[source]

Bases: Operator

Base class for filter operators.

Subclasses implement check(item, qa_idx) for per-QA filtering.

Example

class MyFilter(Filter):
def check(self, item: DataItem, qa_idx: int) -> Tuple[bool, str]:
if len(item.get_answer(qa_idx)) < 10:

return True, “answer too short”

return False, “”

check(item, qa_idx)[source]

Check if a QA pair should be rejected.

Parameters:
  • item (DataItem) – DataItem to check.

  • qa_idx (int) – QA pair index.

Return type:

Tuple[bool, str]

Returns:

Tuple of (rejected, reason). rejected=True means filter out.

process(item)[source]

Process by calling check() for each QA pair.

Return type:

Result

Parameters:

item (DataItem)

class datastudio.operators.Rewriter[source]

Bases: Operator

Base class for rewrite operators.

Subclasses implement rewrite(item, qa_idx) for per-QA rewriting.

Example

class MyRewriter(Rewriter):
def rewrite(self, item: DataItem, qa_idx: int) -> Optional[str]:

answer = item.get_answer(qa_idx) stripped = answer.strip() return stripped if stripped != answer else None

process(item)[source]

Process by calling rewrite() for each QA pair.

Return type:

Result

Parameters:

item (DataItem)

rewrite(item, qa_idx)[source]

Rewrite a QA pair’s answer.

Parameters:
  • item (DataItem) – DataItem to rewrite.

  • qa_idx (int) – QA pair index.

Return type:

Optional[str]

Returns:

New answer text, or None if no change.

class datastudio.operators.ConvLengthFilter[source]

Bases: Filter

Filter data items based on conversation length (number of QA pairs).

Example

filter = ConvLengthFilter(min_length=1, max_length=10)

# Check a single item result = filter.process(item)

# Use in pipeline pipeline = Pipeline([filter]) kept, rejected = pipeline(data_list)

__init__(min_length=1, max_length=36, logger=None, **kwargs)[source]

Initialize the filter.

Parameters:
  • min_length (int) – Minimum number of QA pairs (inclusive).

  • max_length (int) – Maximum number of QA pairs (inclusive).

  • logger (Optional[Any]) – Logger instance.

check(item, qa_idx)[source]

Not used - this filter operates at item level.

Return type:

Tuple[bool, str]

Parameters:
process(item)[source]

Check if conversation length is within bounds.

This is an item-level filter, so we override process() directly.

Return type:

Result

Parameters:

item (DataItem)

class datastudio.operators.ImageSizeFilter[source]

Bases: Filter

Filter data items based on image dimensions.

Rejects images that are too small or have extreme aspect ratios.

Example

filter = ImageSizeFilter(min_size=100, max_ratio=10.0) pipeline = Pipeline([filter]) kept, rejected = pipeline(data_list)

__init__(min_size=28, max_ratio=20.0, require_image=False, logger=None, **kwargs)[source]

Initialize the filter.

Parameters:
  • min_size (int) – Minimum dimension (width or height).

  • max_ratio (float) – Maximum aspect ratio (larger / smaller).

  • require_image (bool) – If True, reject items without images.

  • logger (Optional[Any]) – Logger instance.

check(item, qa_idx)[source]

Not used - item-level filter.

Return type:

Tuple[bool, str]

Parameters:
process(item)[source]

Check if image meets size requirements (item-level).

Return type:

Result

Parameters:

item (DataItem)

class datastudio.operators.ImageAspectRatioFilter[source]

Bases: Filter

Filter data items based on image aspect ratio.

Removes items containing images with extreme aspect ratios.

__init__(max_aspect_ratio=64.0, logger=None, **kwargs)[source]

Initialize the filter.

Parameters:
  • max_aspect_ratio (float) – Maximum allowed aspect ratio (width/height or height/width).

  • logger (Optional[Any]) – Logger instance.

check(item, qa_idx)[source]

Not used - item-level filter.

Return type:

Tuple[bool, str]

Parameters:
process(item)[source]

Check if all images have acceptable aspect ratios (item-level).

Return type:

Result

Parameters:

item (DataItem)

class datastudio.operators.ImageExtFilter[source]

Bases: Filter

Filter data items based on image file extensions.

Removes items containing images with excluded file extensions.

__init__(excluded_exts=None, logger=None, **kwargs)[source]

Initialize the filter.

Parameters:
  • excluded_exts (Optional[List[str]]) – List of excluded extensions (with dot, e.g., [‘.gif’, ‘’]). Defaults to [‘.gif’, ‘’].

  • logger (Optional[Any]) – Logger instance.

check(item, qa_idx)[source]

Not used - item-level filter.

Return type:

Tuple[bool, str]

Parameters:
process(item)[source]

Check if any images have excluded extensions (item-level).

Return type:

Result

Parameters:

item (DataItem)

class datastudio.operators.LengthAnomalyFilter[source]

Bases: Filter

Filter data items based on text length anomalies.

Detects and filters QA pairs with abnormally short or long text. Supports per-QA-pair filtering (partial filtering).

__init__(min_length=1, max_length=8192, check_question=False, check_answer=False, check_ori_answer=False, use_tokenizer=False, logger=None, **kwargs)[source]

Initialize the filter.

Parameters:
  • min_length (int) – Minimum allowed length.

  • max_length (int) – Maximum allowed length.

  • check_question (bool) – Whether to check question length.

  • check_answer (bool) – Whether to check answer length.

  • check_ori_answer (bool) – Whether to check original answer length.

  • use_tokenizer (bool) – If True, use tiktoken for length calculation.

  • logger (Optional[Any]) – Logger instance.

check(item, qa_idx)[source]

Check for text length anomalies in a QA pair.

Return type:

Tuple[bool, str]

Parameters:
class datastudio.operators.ResponseTagFilter[source]

Bases: Filter

Filter data items that have image/video tags in responses.

Responses should not contain <image> or <video> tags as these indicate potential data quality issues.

__init__(logger=None, **kwargs)[source]

Initialize the filter.

Parameters:

logger (Optional[Any]) – Logger instance.

check(item, qa_idx)[source]

Check if response contains image/video tags.

Return type:

Tuple[bool, str]

Parameters:
class datastudio.operators.TextRepeatFilter[source]

Bases: Filter

Filter data items with repetitive text patterns.

Detects and filters QA pairs containing consecutive repetitive patterns. Supports per-QA-pair filtering (partial filtering).

__init__(check_question=True, check_answer=True, check_ori_answer=True, logger=None, **kwargs)[source]

Initialize the filter.

Parameters:
  • check_question (bool) – Whether to check questions.

  • check_answer (bool) – Whether to check answers.

  • check_ori_answer (bool) – Whether to check original answers.

  • logger (Optional[Any]) – Logger instance.

check(item, qa_idx)[source]

Check for repetitive patterns in a QA pair.

Return type:

Tuple[bool, str]

Parameters:
class datastudio.operators.RemoveThinkRewriter[source]

Bases: Rewriter

Remove <think>…</think> tags from answers.

These tags often contain model reasoning that shouldn’t be in the final output.

Example

rewriter = RemoveThinkRewriter() pipeline = Pipeline([rewriter]) result, _ = pipeline(data_list)

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

rewrite(item, qa_idx)[source]

Remove think tags from an answer.

Return type:

Optional[str]

Parameters:
class datastudio.operators.NormThinkRewriter[source]

Bases: Rewriter

Normalize <think> tag format in answers.

Ensures consistent format: - Lowercase tags - Newline after opening tag - Newline before closing tag

Example

Input: “<THINK>reasoning</THINK>answer” Output: “<think>nreasoningn</think>nanswer”

THINK_VARIANTS = [(re.compile('<THINK>', re.IGNORECASE), '<think>'), (re.compile('</THINK>', re.IGNORECASE), '</think>'), (re.compile('<think>(?!\\n)'), '<think>\n'), (re.compile('(?<!\\n)</think>'), '\n</think>'), (re.compile('</think>(?!\\n)(?=\\S)'), '</think>\n')]
__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

rewrite(item, qa_idx)[source]

Normalize think tags in an answer.

Return type:

Optional[str]

Parameters:
class datastudio.operators.AddNoThinkRewriter[source]

Bases: Rewriter

Add empty <think> tags to responses that don’t have them.

This ensures consistent format for responses that should have think tags but are missing them.

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

rewrite(item, qa_idx)[source]

Add empty think tags to a response missing them.

Return type:

Optional[str]

Parameters:
class datastudio.operators.NormImageTagRewriter[source]

Bases: Rewriter

Normalize image tag positions in conversations.

Moves all <image> tags to the beginning of the first human message, ensuring consistent format across all data items.

Warning

This rewriter is designed for single-image or multi-image scenarios where all images logically belong to the first question. It does NOT support interleaved image-text conversations (e.g. images scattered across multiple turns), as it will forcibly move all <image> tags to the first message and break the original image-text correspondence.

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

process(item)[source]

Normalize image tags - move all to first question (item-level).

Return type:

Result

Parameters:

item (DataItem)

rewrite(item, qa_idx)[source]

Not used - this rewriter operates at item level.

Return type:

Optional[str]

Parameters:
class datastudio.operators.NormPromptRewriter[source]

Bases: Rewriter

Normalize hint/instruction prompt formatting.

Ensures hints like "Answer concisely" are properly separated with newlines from the main question text.

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

process(item)[source]

Normalize prompt formatting in questions (modifies question).

Return type:

Result

Parameters:

item (DataItem)

rewrite(item, qa_idx)[source]

Not used - this rewriter modifies questions.

Return type:

Optional[str]

Parameters:
class datastudio.operators.NormMultiTurnPromptRewriter[source]

Bases: Rewriter

Normalize instruction patterns in multi-turn conversations.

If the first turn contains instruction patterns (like “Answer concisely”), removes similar patterns from subsequent turns to avoid redundancy.

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

process(item)[source]

Remove redundant instruction patterns from multi-turn (item-level).

Return type:

Result

Parameters:

item (DataItem)

rewrite(item, qa_idx)[source]

Not used - item-level rewriter.

Return type:

Optional[str]

Parameters:
class datastudio.operators.RemoveAnswerRewriter[source]

Bases: Rewriter

Remove [ANSWER 0] prefix from responses.

Some data may have “[ANSWER 0]” prefixes in responses that should be removed for clean output.

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

rewrite(item, qa_idx)[source]

Remove [ANSWER 0] prefix from an answer.

Return type:

Optional[str]

Parameters:
class datastudio.operators.RemoveReasonRewriter[source]

Bases: Rewriter

Remove “Reasoning:” prefix from responses.

Some data may have “Reasoning:” prefixes in responses that should be removed for clean output.

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

rewrite(item, qa_idx)[source]

Remove Reasoning: prefix from an answer.

Return type:

Optional[str]

Parameters:
class datastudio.operators.SplitRewriter[source]

Bases: Operator

Split multi-turn conversations into single-turn conversations.

Each QA pair becomes a separate data item, preserving all metadata and adjusting indices accordingly.

Note: This is a special Operator that returns multiple items, handled specially by Pipeline.

__init__(logger=None, **kwargs)[source]

Initialize the rewriter.

Parameters:

logger (Optional[Any]) – Logger instance.

expand_items(items)[source]

Expand multi-turn items into single-turn items.

This should be called directly instead of through Pipeline for splitting operations.

Return type:

List[DataItem]

Parameters:

items (List[DataItem])

process(item)[source]

Split is handled specially - we mark items for expansion.

The actual splitting is done in expand_items().

Return type:

Result

Parameters:

item (DataItem)

process_batch(items)[source]

Process batch - returns keep result for each item.

Return type:

List[Result]

Parameters:

items (List[DataItem])

class datastudio.operators.RequestBuilder[source]

Bases: object

MLLM request builder.

Responsibilities: 1. Store request_builder configuration 2. Build requests (build_request, build_requests) 3. Format content (format_content) 4. Parse model responses (parse_response)

Config format:
request_builder = dict(

type=”RequestBuilder”, prompt=”prompts/filter/xxx.txt”, system_prompt=”prompts/grounding/grounding_system.txt”, key_templates={“result”: “q{idx}”, “reason”: “q{idx}_reason”}, with_image=True, with_answer=False,

)

prompt

Prompt text or path to .txt file (user message).

system_prompt

System prompt text or path to .txt file (system message).

key_templates

Dict mapping field names to key templates with {idx}.

with_image/with_question/with_answer/with_original

Request flags.

__init__(prompt=None, system_prompt=None, key_templates=<object object>, with_image=None, with_question=None, with_answer=None, with_original=None, **kwargs)[source]

Initialize request builder.

Instance values override class defaults. For key_templates, explicitly passing None disables JSON parsing.

Parameters:
  • prompt (str | None)

  • system_prompt (str | None)

  • key_templates (Dict[str, str] | None | object)

  • with_image (bool | None)

  • with_question (bool | None)

  • with_answer (bool | None)

  • with_original (bool | None)

build_request(item, qa_idx, copy_img=False)[source]

Build a single request.

Parameters:
  • item (DataItem) – DataItem to build request for.

  • qa_idx (int) – QA pair index (-1 for batch_qa mode).

  • copy_img (bool) – Whether to deep copy the image (for multiple requests per item).

Returns:

payload, item, qa_idx

Return type:

Dict with keys

build_requests(item, batch_qa=False)[source]

Build all requests for a DataItem.

Parameters:
  • item (DataItem) – DataItem to build requests for.

  • batch_qa (bool) – If True, combine all QA pairs into one request.

Return type:

List[Dict]

Returns:

List of request dicts.

build_requests_selective(item, qa_indices)[source]

Build requests for specific QA indices only.

Used by SelectiveMLLMRewriter for conditional rewriting.

Parameters:
  • item (DataItem) – DataItem to build requests for.

  • qa_indices (List[int]) – List of QA indices to build requests for.

Return type:

List[Dict]

Returns:

List of request dicts.

format_content(item, qa_idx)[source]

Format request content.

Parameters:
  • item (DataItem) – DataItem to format.

  • qa_idx (int) – QA pair index (-1 for all QA).

Return type:

str

Returns:

Formatted prompt string.

key_templates: Dict[str, str] = {'reason': 'q{idx}_reason', 'result': 'q{idx}'}
parse_response(response, qa_idx, logger=None)[source]

Parse model response.

Parameters:
  • response (Any) – Raw response from model.

  • qa_idx (int) – QA index (used to format key_templates).

  • logger (Optional[Any]) – Optional logger for warnings.

Returns:

…, “reason”: …} Returns {“result”: None, “parse_error”: True} if parsing fails.

Return type:

Dict with parsed fields, e.g. {“result”

prompt: str = ''
property prompt_text: str

Load and return the prompt text (lazy).

system_prompt: str = ''
property system_prompt_text: str

Load and return the system prompt text (lazy).

with_answer: bool = False
with_image: bool = True
with_original: bool = False
with_question: bool = True
class datastudio.operators.MLLMOperator[source]

Bases: Operator

Base class for MLLM-powered operators.

Builds requests via RequestBuilder, calls model.generate(), and delegates response parsing to subclasses via _add_decision().

__init__(model, request_builder=None, batch_qa=False, logger=None, **kwargs)[source]

Initialize the MLLM operator.

Parameters:
  • model (Any) – MLLM model with generate() method.

  • request_builder (Optional[Dict]) – RequestBuilder config dict.

  • batch_qa (bool) – If True, combine all QA pairs into one request. Model should return {q0: …, q1: …, …}. If False (default), each QA pair is a separate request.

  • logger (Optional[Any]) – Logger instance.

process(item)[source]

Process a single item (delegates to process_batch).

Return type:

Result

Parameters:

item (DataItem)

process_batch(items)[source]

Process a batch of items.

Flow: 1. Build requests using RequestBuilder 2. Execute model 3. Parse responses and aggregate into Results

Return type:

List[Result]

Parameters:

items (List[DataItem])

class datastudio.operators.MLLMFilter[source]

Bases: MLLMOperator

Filter data items using MLLM quality assessment.

Expected response format: {"q0": true/false, "q0_reason": "..."} where true means reject and false means keep.

Example config:

request_builder = dict(
    type="RequestBuilder",
    prompt="prompts/filter/xxx.txt",
    key_templates={"result": "q{idx}", "reason": "q{idx}_reason"},
    with_image=True,
)
__init__(model, request_builder=None, batch_qa=False, logger=None, **kwargs)[source]

Initialize the MLLM operator.

Parameters:
  • model (Any) – MLLM model with generate() method.

  • request_builder (Optional[Dict]) – RequestBuilder config dict.

  • batch_qa (bool) – If True, combine all QA pairs into one request. Model should return {q0: …, q1: …, …}. If False (default), each QA pair is a separate request.

  • logger (Optional[Any]) – Logger instance.

class datastudio.operators.MLLMRewriter[source]

Bases: MLLMOperator

Rewrite data using MLLM.

Supports both structured (JSON dict) and plain text responses. Set key_templates=None in the request builder for plain text mode.

Example config:

request_builder = dict(
    type="RequestBuilder",
    prompt="prompts/rewriter/xxx.txt",
    key_templates={"result": "q{idx}_answer"},
    with_image=True,
    with_answer=True,
)
__init__(model, request_builder=None, rewrite_type='answer', batch_qa=False, logger=None, **kwargs)[source]

Initialize the MLLM rewriter.

Parameters:
  • model (Any) – MLLM model instance.

  • request_builder (Optional[Dict]) – RequestBuilder config dict.

  • rewrite_type (str) – What to rewrite - “answer” or “question”.

  • batch_qa (bool) – If True, combine all QA pairs into one request.

  • logger (Optional[Any]) – Logger instance.

class datastudio.operators.SelectiveMLLMRewriter[source]

Bases: MLLMRewriter

MLLM rewriter that only processes items matching a condition.

Example:

rewriter = SelectiveMLLMRewriter(
    model=model,
    request_builder=dict(type="RequestBuilder", prompt="prompts/translate.txt"),
    should_rewrite_fn=is_mixed_language,
)
__init__(should_rewrite_fn=None, **kwargs)[source]

Initialize selective rewriter.

Parameters:
  • should_rewrite_fn (Optional[Callable[[str, str], bool]]) – Function(question, answer) -> bool. Returns True if this QA should be rewritten.

  • **kwargs – Arguments passed to MLLMRewriter.