Source code for datastudio.datasets.formatters

"""Unified data loading/saving with automatic schema conversion.

Example::

    from datastudio.datasets.formatters import FormatRegistry
    data = FormatRegistry.load('/path/to/data.jsonl')
    FormatRegistry.save(data, '/path/to/output.jsonl')
"""

import os

from .base import BaseFormat
from .schema_converter import (
    SchemaConverter,
    SchemaType,
    denormalize_data,
    normalize_data,
)


[docs] class FormatRegistry: """Registry for data format handlers with automatic schema conversion. Detects file format by extension, converts between ``conversations`` and ``messages`` schemas automatically, and preserves original schema metadata for round-trip fidelity. """ _formats: dict = {} # ext -> format_class
[docs] @classmethod def register(cls, format_class): """Register a format handler (decorator). Args: format_class: Format class to register. Returns: The registered format class. """ for ext in format_class.extensions(): cls._formats[ext.lower()] = format_class return format_class
[docs] @classmethod def get(cls, file_path: str) -> BaseFormat: """Get format handler instance based on file extension. Args: file_path: Path to data file. Returns: BaseFormat: Format handler instance. Raises: ValueError: If file extension is not supported. """ ext = os.path.splitext(file_path)[1].lower() if ext not in cls._formats: raise ValueError( f"Unsupported format: '{ext}'. Available: {list(cls._formats.keys())}" ) return cls._formats[ext]()
[docs] @classmethod def load( cls, file_path: str, add_source_file: bool = True, remove_rejected: bool = True, auto_normalize: bool = True, ) -> list: """Load data from file. Args: file_path: Path to data file. add_source_file: Whether to add 'file_path' field to each item. remove_rejected: Whether to remove 'filtered'/'rejected' fields. auto_normalize: Whether to auto-convert to standard schema (default: True). Original schema is stored in '_original_schema' for round-trip. Returns: list: List of data dictionaries in standard schema. Note: This method modifies the loaded data items in place by: - Converting to standard schema if auto_normalize=True - Adding 'file_path' field if add_source_file=True - Removing 'filtered' and 'rejected' fields if remove_rejected=True If you need to preserve the original data, make a deep copy after loading. """ data = cls.get(file_path).load(file_path) # Auto-normalize to standard schema if auto_normalize: data = normalize_data(data) for item in data: if add_source_file: item["file_path"] = file_path if remove_rejected: item.pop("filtered", None) # Legacy field item.pop("rejected", None) # Current field return data
[docs] @classmethod def save( cls, data: list, file_path: str, auto_denormalize: bool = True, **kwargs, ) -> None: """Save data to file. Args: data: List of data dictionaries. file_path: Output file path. auto_denormalize: Whether to auto-convert back to original schema (default: True). **kwargs: Format-specific options. """ # Auto-denormalize to original schema if auto_denormalize: data = denormalize_data(data) cls.get(file_path).save(data, file_path, **kwargs)
[docs] @classmethod def supported_extensions(cls) -> list: """Get all supported file extensions. Returns: list: List of supported extensions. """ return list(cls._formats.keys())
# Register built-in formats from .json_formatter import JsonFormat # noqa: E402 from .jsonl_formatter import JsonlFormat # noqa: E402 FormatRegistry.register(JsonFormat) FormatRegistry.register(JsonlFormat) __all__ = [ "BaseFormat", "FormatRegistry", "JsonFormat", "JsonlFormat", "SchemaConverter", "SchemaType", "normalize_data", "denormalize_data", ]