Source code for datastudio.operators.core.result

"""Operator result types.

Immutable decision objects representing filter and rewrite outcomes.
:class:`Result` bundles decisions and applies them to a :class:`DataItem`.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Optional, Tuple

if TYPE_CHECKING:
    from .data_item import DataItem


[docs] @dataclass(frozen=True) class FilterDecision: """Filter decision for a QA pair or entire item. Attributes: qa_idx: QA pair index (0-based), or -1 for a global decision. rejected: Whether this QA pair (or entire item) should be rejected. reason: Explanation for the decision. """ qa_idx: int rejected: bool reason: str = ""
[docs] @dataclass(frozen=True) class RewriteDecision: """Rewrite decision for a single QA pair. Attributes: qa_idx: QA pair index (0-based). new_question: Rewritten question text (None = no change). new_answer: Rewritten answer text (None = no change). message: Description of what was changed. """ qa_idx: int new_question: Optional[str] = None new_answer: Optional[str] = None message: str = ""
[docs] @dataclass class Result: """ Operator execution result. Responsibilities: 1. Store decision lists 2. Apply decisions to DataItem 3. Manage records Can contain both filter and rewrite decisions, allowing a single operator (e.g., MLLM) to both filter and rewrite in one pass. """ item_idx: int filter_decisions: List[FilterDecision] = field(default_factory=list) rewrite_decisions: List[RewriteDecision] = field(default_factory=list) # === Add decisions ===
[docs] def add_filter(self, qa_idx: int, rejected: bool, reason: str = ""): """Add a filter decision.""" self.filter_decisions.append(FilterDecision(qa_idx, rejected, reason))
[docs] def add_rewrite( self, qa_idx: int, new_question: Optional[str] = None, new_answer: Optional[str] = None, message: str = "", ): """Add a rewrite decision.""" self.rewrite_decisions.append( RewriteDecision(qa_idx, new_question, new_answer, message) )
# === Properties === @property def has_filter(self) -> bool: """Whether any QA pair is marked for filtering.""" return any(d.rejected for d in self.filter_decisions) @property def has_rewrite(self) -> bool: """Whether any QA pair has rewrite changes.""" return any( d.new_question is not None or d.new_answer is not None for d in self.rewrite_decisions ) # === Apply to DataItem ===
[docs] def apply_to( self, item: "DataItem", op_name: str, ) -> Tuple[Optional["DataItem"], Optional["DataItem"]]: """ Apply all decisions to a data item. Order: rewrite first, then filter (to preserve rewritten content). Args: item: DataItem to modify. op_name: Operator name for records. Returns: (kept_item, rejected_item) """ # 1. Apply rewrites self._apply_rewrites(item, op_name) # 2. Apply filters return self._apply_filters(item, op_name)
def _apply_rewrites(self, item: "DataItem", op_name: str): """Apply rewrite decisions to modify item content.""" for d in self.rewrite_decisions: if d.new_answer is not None: item.set_answer(d.qa_idx, d.new_answer, save_original=True) if d.new_question is not None: item.set_question(d.qa_idx, d.new_question, save_original=True) if d.message: item.add_rewrite_record(op_name, d.qa_idx, d.message) def _apply_filters( self, item: "DataItem", op_name: str, ) -> Tuple[Optional["DataItem"], Optional["DataItem"]]: """ Apply filter decisions. Returns: - (item, None): Keep everything - (None, item): Reject everything - (kept, rejected): Split item """ # No filter decisions = keep everything if not self.filter_decisions: item.mark_kept() return item, None # Check for global decision (qa_idx=-1) - rejects entire item for d in self.filter_decisions: if d.qa_idx == -1 and d.rejected: item.mark_rejected() item.add_filter_record(op_name, -1, d.reason) return None, item # Get rejected indices (excluding global decisions) rejected_indices = [ d.qa_idx for d in self.filter_decisions if d.rejected and d.qa_idx >= 0 ] # No QA rejected = keep everything if not rejected_indices: item.mark_kept() # Record keep reasons if any for d in self.filter_decisions: if d.reason and not d.rejected: item.add_keep_record(op_name, d.qa_idx, d.reason) return item, None # Build kept indices all_indices = set(range(item.qa_count)) kept_indices = sorted(all_indices - set(rejected_indices)) # All QA rejected = reject entire item if not kept_indices: item.mark_rejected() for d in self.filter_decisions: if d.rejected: item.add_filter_record(op_name, d.qa_idx, d.reason) return None, item # Partial rejection = split the item kept_item, rejected_item = item.split(kept_indices, rejected_indices) # Record reasons (with re-indexed qa_idx) for d in self.filter_decisions: if d.rejected and d.qa_idx in rejected_indices: new_idx = rejected_indices.index(d.qa_idx) rejected_item.add_filter_record(op_name, new_idx, d.reason) elif d.reason and not d.rejected and d.qa_idx in kept_indices: new_idx = kept_indices.index(d.qa_idx) kept_item.add_keep_record(op_name, new_idx, d.reason) return kept_item, rejected_item
# Backward compatibility alias OperatorResult = Result