DataStudio Architecture Guide

1. Project Overview

DataStudio is a multimodal data processing toolkit designed for:

  • Cleaning and filtering training data (Filter)

  • Rewriting and standardizing data formats (Rewriter)

  • Intelligent data processing using MLLMs


2. Core Data Flow and Class Relationships

2.1 Overall Architecture

┌─────────────────────────────────────────────────────────────────────────────────┐
│                                 run.py                                          │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                              Runner                                      │   │
│  │                                                                          │   │
│  │   __init__():                                                            │   │
│  │     1. cfg = get_cfg()                    # Load config                  │   │
│  │     2. self.logger = build_from_cfg(...)  # Build logger                 │   │
│  │     3. self.model = build_from_cfg(...)   # Build model (OpenAIAPI)      │   │
│  │     4. self.dataloaders = build_dataloaders(...)  # Build data loaders   │   │
│  │     5. self.datasaver = build_from_cfg(...)       # Build data saver     │   │
│  │     6. self.pipeline = build_from_cfg(...)        # Build pipeline       │   │
│  │                                                                          │   │
│  │   run():                                                                 │   │
│  │     for dataloader in self.dataloaders:                                  │   │
│  │         for batch in dataloader:          # Iterate batches              │   │
│  │             datas = self.pipeline(batch)  # Pipeline processing          │   │
│  │             self.datasaver(datas)         # Accumulate results           │   │
│  │         self.datasaver.save()             # Save to disk                 │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────────┘

2.2 Data Loading (StandardDataLoader)

┌─────────────────────────────────────────────────────────────────────────────────┐
│                          StandardDataLoader                                     │
│                                                                                 │
│   __init__(data_root, dataset, batch_size, ...):                               │
│     │                                                                           │
│     ├─▶ FormatRegistry.load(data_file)      # Auto-select format by extension  │
│     │       │                                                                   │
│     │       ├─▶ JsonFormat.load()           # .json files                      │
│     │       └─▶ JsonlFormat.load()          # .jsonl files                     │
│     │                                                                           │
│     ├─▶ ShardedLMDBManager.get_instance()   # Get LMDB singleton               │
│     │                                                                           │
│     └─▶ write_images_to_sharded_lmdb()      # Pre-process images to cache      │
│             │                                                                   │
│             └─▶ Multi-threaded read → resize → write to LMDB                   │
│                                                                                 │
│   __iter__() / __next__():                                                     │
│     │                                                                           │
│     └─▶ Returns List[Dict]                  # Each Dict is one raw data entry  │
│             │                                                                   │
│             └─▶ Dict structure:                                                │
│                   {                                                             │
│                     "id": "xxx",                                                │
│                     "image": "path/to/img.jpg",                                 │
│                     "image_pil": <PIL.Image>,    # Pre-loaded image object      │
│                     "conversations": [...]                                      │
│                   }                                                             │
└─────────────────────────────────────────────────────────────────────────────────┘

2.3 Pipeline Processing (Pipeline → SubPipeline → Operator)

┌─────────────────────────────────────────────────────────────────────────────────┐
│                               Pipeline                                          │
│                                                                                 │
│   __call__(datas: List[Dict]) -> List[Dict]:                                   │
│     │                                                                           │
│     │  # Execute SubPipelines in priority order                                │
│     │                                                                           │
│     for sub_pipeline, name in self.sub_pipelines:  # By priority               │
│         │                                                                       │
│         ├─▶ result = sub_pipeline(datas)                                       │
│         │       │                                                               │
│         │       │  ┌─────────────────────────────────────────────────────────┐ │
│         │       │  │              SubPipeline.__call__()                     │ │
│         │       │  │                                                         │ │
│         │       │  │  # Step 1: Wrap raw data                                │ │
│         │       │  │  items = [DataItem(d, i) for i, d in enumerate(datas)]  │ │
│         │       │  │                                                         │ │
│         │       │  │  # Step 2: Execute each Operator sequentially           │ │
│         │       │  │  for op in self.operators:                              │ │
│         │       │  │      │                                                  │ │
│         │       │  │      ├─▶ results = op.process_batch(items)              │ │
│         │       │  │      │                                                  │ │
│         │       │  │      │  # Step 3: Apply results to data                 │ │
│         │       │  │      └─▶ for item, result in zip(items, results):       │ │
│         │       │  │              └─▶ kept, rejected = result.apply_to(item) │ │
│         │       │  │                                                         │ │
│         │       │  │  # Step 4: Separate kept and rejected data              │ │
│         │       │  │  items = [i for i in result if not i.is_rejected]       │ │
│         │       │  │  rejected.extend([i for i in result if i.is_rejected])  │ │
│         │       │  │                                                         │ │
│         │       │  │  return [item.data for item in items + rejected]        │ │
│         │       │  └─────────────────────────────────────────────────────────┘ │
│         │       │                                                               │
│         │       └─▶ List[Dict]  # Contains kept and rejected                   │
│         │                                                                       │
│         ├─▶ datas = [d for d in result if not d.get("rejected")]               │
│         └─▶ all_rejected.extend([d for d in result if d.get("rejected")])      │
│                                                                                 │
│     return datas + all_rejected                                                │
└─────────────────────────────────────────────────────────────────────────────────┘

2.4 Operator Execution Details

┌─────────────────────────────────────────────────────────────────────────────────┐
│                        Operator Types and Execution Flow                        │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                         Filter (Rule-based)                             │   │
│  │                                                                          │   │
│  │   process(item: DataItem) -> Result:                                    │   │
│  │     result = Result(item_idx=item.idx)                                  │   │
│  │     for qa in item.qa_pairs:                                            │   │
│  │         rejected, reason = self.check(item, qa.idx)  # Subclass impl   │   │
│  │         if rejected:                                                    │   │
│  │             result.add_filter(qa.idx, rejected=True, reason=reason)     │   │
│  │     return result                                                       │   │
│  │                                                                          │   │
│  │   # Subclasses only need to implement check():                          │   │
│  │   def check(item, qa_idx) -> (bool, str):                               │   │
│  │       # True, "reason" = filter out                                     │   │
│  │       # False, "" = keep                                                │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                        Rewriter (Rule-based)                            │   │
│  │                                                                          │   │
│  │   process(item: DataItem) -> Result:                                    │   │
│  │     result = Result(item_idx=item.idx)                                  │   │
│  │     for qa in item.qa_pairs:                                            │   │
│  │         new_content = self.rewrite(item, qa.idx)  # Subclass impl      │   │
│  │         if new_content is not None:                                     │   │
│  │             result.add_rewrite(qa.idx, new_answer=new_content)          │   │
│  │     return result                                                       │   │
│  │                                                                          │   │
│  │   # Subclasses only need to implement rewrite():                        │   │
│  │   def rewrite(item, qa_idx) -> Optional[str]:                           │   │
│  │       # Return new answer = modify                                      │   │
│  │       # Return None = no change                                         │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                       MLLMOperator (LLM-powered)                        │   │
│  │                                                                          │   │
│  │   process_batch(items: List[DataItem]) -> List[Result]:                 │   │
│  │     │                                                                    │   │
│  │     ├─▶ requests = self._build_requests(items)                          │   │
│  │     │       └─▶ RequestBuilder.build_requests(item, batch_qa)           │   │
│  │     │               ├─▶ Read prompt template file                       │   │
│  │     │               ├─▶ Format QA content into prompt                   │   │
│  │     │               └─▶ Build payload: [{type: "text/image", ...}]      │   │
│  │     │                                                                    │   │
│  │     ├─▶ responses = self.model.generate(payloads)                       │   │
│  │     │       └─▶ OpenAIAPI.generate()                                    │   │
│  │     │               ├─▶ Multi-threaded concurrent requests              │   │
│  │     │               ├─▶ Automatic retry on failure                      │   │
│  │     │               └─▶ Returns List[str] (JSON strings)                │   │
│  │     │                                                                    │   │
│  │     └─▶ for req, resp in zip(requests, responses):                      │   │
│  │             parsed = RequestBuilder.parse_response(resp)                │   │
│  │             self._add_decision(result, qa_idx, parsed)  # Subclass impl│   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────────┘

2.5 Data Saving (StandardDataSaver)

┌─────────────────────────────────────────────────────────────────────────────────┐
│                          StandardDataSaver                                      │
│                                                                                 │
│   __call__(datas: List[Dict]):                                                 │
│     └─▶ Classify and accumulate by rejected field                              │
│                                                                                 │
│   save():                                                                       │
│     ├─▶ SimpleSaver.save(rejected_datas, is_rejected=True)                     │
│     │       ├─▶ Group data by source_file                                      │
│     │       ├─▶ Determine output path: output_dir/{source}/rejected/{filename} │
│     │       └─▶ FormatRegistry.save(data, path)  # Keep original format        │
│     │                                                                           │
│     └─▶ SimpleSaver.save(datas, is_rejected=False)                             │
│             ├─▶ Group data by source_file                                      │
│             ├─▶ Determine output path: output_dir/{source}/{filename}          │
│             └─▶ FormatRegistry.save(data, path)                                │
│                                                                                 │
│   save_yaml():                                                                  │
│     └─▶ YamlConfigGenerator.generate()  # Generate dataset config YAML         │
└─────────────────────────────────────────────────────────────────────────────────┘

2.6 Core Data Structure Transformations

┌─────────────────────────────────────────────────────────────────────────────────┐
│                        Data Structure Transformation Flow                       │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   Raw JSON/JSONL                                                                │
│   ─────────────────                                                             │
│   {                                                                             │
│     "id": "001",                                                                │
│     "image": "img.jpg",                                                         │
│     "conversations": [                                                          │
│       {"from": "human", "value": "Q1"},                                        │
│       {"from": "gpt", "value": "A1"},                                          │
│       {"from": "human", "value": "Q2"},                                        │
│       {"from": "gpt", "value": "A2"}                                           │
│     ]                                                                           │
│   }                                                                             │
│         │                                                                       │
│         │  DataLoader loads                                                     │
│         ▼                                                                       │
│   Dict (in memory)                                                              │
│   ─────────────                                                                 │
│   {                                                                             │
│     "id": "001",                                                                │
│     "image": "img.jpg",                                                         │
│     "image_pil": <PIL.Image>,        # Pre-loaded image                        │
│     "source_file": "/path/to/file",  # Source file path                        │
│     "conversations": [...]                                                      │
│   }                                                                             │
│         │                                                                       │
│         │  SubPipeline wraps                                                    │
│         ▼                                                                       │
│   DataItem                                                                      │
│   ────────                                                                      │
│   DataItem(                                                                     │
│     _data = {...},                   # Reference to raw Dict                   │
│     idx = 0,                         # Index within batch                      │
│     _qa_pairs = [                    # Parsed QA pairs                         │
│       QA(idx=0, question="Q1", answer="A1"),                                   │
│       QA(idx=1, question="Q2", answer="A2"),                                   │
│     ]                                                                           │
│   )                                                                             │
│         │                                                                       │
│         │  Operator processes                                                   │
│         ▼                                                                       │
│   Result                                                                        │
│   ──────                                                                        │
│   Result(                                                                       │
│     item_idx = 0,                                                               │
│     filter_decisions = [                                                        │
│       FilterDecision(qa_idx=1, rejected=True, reason="Too short"),             │
│     ],                                                                          │
│     rewrite_decisions = [                                                       │
│       RewriteDecision(qa_idx=0, new_answer="Modified A1"),                     │
│     ]                                                                           │
│   )                                                                             │
│         │                                                                       │
│         │  Result.apply_to(item)                                                │
│         ▼                                                                       │
│   Processed Dict                                                                │
│   ─────────────                                                                 │
│   # Kept data                              # Rejected data                     │
│   {                                      {                                      │
│     "id": "001",                           "id": "001",                         │
│     "image": "img.jpg",                    "image": "img.jpg",                  │
│     "conversations": [                     "conversations": [                   │
│       {"from": "human", "value": "Q1"},      {"from": "human", "value": "Q2"}, │
│       {"from": "gpt", "value": "Mod A1"},    {"from": "gpt", "value": "A2"}    │
│     ],                                     ],                                   │
│     "ori_answer": {"0": "A1"},             "rejected": True,                   │
│     "rewrite_ops": {                       "filter_ops": {                      │
│       "MyRewriter": {"0": "rewritten"}       "MyFilter": {"0": "Too short"}    │
│     }                                      }                                    │
│   }                                      }                                      │
└─────────────────────────────────────────────────────────────────────────────────┘

2.7 Class Dependency Diagram

┌─────────────────────────────────────────────────────────────────────────────────┐
│                           Class Dependencies                                    │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│                              ┌─────────┐                                        │
│                              │ Runner  │                                        │
│                              └────┬────┘                                        │
│           ┌──────────────────────┼──────────────────────┐                       │
│           │                      │                      │                       │
│           ▼                      ▼                      ▼                       │
│   ┌───────────────┐      ┌────────────┐      ┌──────────────────┐              │
│   │StandardData   │      │  Pipeline  │      │StandardDataSaver │              │
│   │   Loader      │      └─────┬──────┘      └────────┬─────────┘              │
│   └───────┬───────┘            │                      │                         │
│           │                    │                      │                         │
│           │              ┌─────┴─────┐          ┌─────┴─────┐                   │
│           │              │           │          │           │                   │
│           ▼              ▼           ▼          ▼           ▼                   │
│   ┌───────────────┐  ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐            │
│   │ShardedLMDB    │  │SubPipe │  │SubPipe │  │Simple  │  │YamlGen │            │
│   │   Manager     │  │ line 1 │  │ line N │  │ Saver  │  │erator  │            │
│   └───────────────┘  └───┬────┘  └────────┘  └────────┘  └────────┘            │
│                          │                                                      │
│                    ┌─────┴─────────────────┐                                    │
│                    │                       │                                    │
│                    ▼                       ▼                                    │
│              ┌──────────┐           ┌──────────┐                                │
│              │ Operator │           │ Operator │                                │
│              └────┬─────┘           └──────────┘                                │
│                   │                                                             │
│         ┌─────────┼─────────┐                                                   │
│         │         │         │                                                   │
│         ▼         ▼         ▼                                                   │
│   ┌──────────┐ ┌──────┐ ┌────────────┐                                         │
│   │  Filter  │ │Rewri │ │MLLMOperator│                                         │
│   │          │ │ ter  │ │            │                                         │
│   └────┬─────┘ └──┬───┘ └─────┬──────┘                                         │
│        │          │           │                                                 │
│        │          │     ┌─────┴─────┐                                           │
│        │          │     │           │                                           │
│        ▼          ▼     ▼           ▼                                           │
│   ┌─────────┐ ┌─────┐ ┌───────┐ ┌──────────────┐                               │
│   │DataItem │ │Resul│ │Request│ │  OpenAIAPI   │                               │
│   │   QA    │ │  t  │ │Builder│ │   (Model)    │                               │
│   └─────────┘ └─────┘ └───────┘ └──────────────┘                               │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

3. Key Class Reference

Class

File

Purpose

Runner

run.py

Main entry point, coordinates all components

StandardDataLoader

datasets/data_loader.py

Loads data, manages LMDB cache

StandardDataSaver

datasets/data_saver.py

Saves processing results

Pipeline

pipelines/pipeline.py

Main pipeline, executes sub-pipelines by priority

SubPipeline

pipelines/sub_pipeline.py

Sub-pipeline, executes operators sequentially

DataItem

operators/core/data_item.py

Data wrapper, provides QA access

Filter

operators/core/operator.py

Filter base class

Rewriter

operators/core/operator.py

Rewriter base class

Result

operators/core/result.py

Operator execution result

MLLMOperator

operators/mllm/base.py

MLLM operator base class

RequestBuilder

operators/mllm/request.py

Builds MLLM requests


4. Directory Structure

datastudio/
├── datasets/              # Data loading/saving
│   ├── data_loader.py     # StandardDataLoader - load data + LMDB image cache
│   ├── data_saver.py      # StandardDataSaver - save processing results
│   ├── formatters/        # JSON/JSONL format handlers
│   └── config/            # YAML config loading
│
├── operators/             # Data processing operators
│   ├── core/              # Core abstractions
│   │   ├── data_item.py   # DataItem, QA - data wrapper
│   │   ├── operator.py    # Operator, Filter, Rewriter - base classes
│   │   └── result.py      # Result, FilterDecision, RewriteDecision
│   ├── filters/           # Rule-based filters
│   │   ├── conv_length.py # Conversation turn count filter
│   │   ├── image_size.py  # Image size filter
│   │   └── text_repeat.py # Text repetition detection
│   ├── rewriters/         # Rule-based rewriters
│   │   ├── norm_prompt.py # Normalize prompts
│   │   ├── remove_think.py# Remove think content
│   │   └── split.py       # Split multi-turn conversations
│   └── mllm/              # MLLM operators
│       ├── base.py        # MLLMOperator base class
│       ├── filter.py      # MLLMFilter - LLM-powered filtering
│       ├── rewriter.py    # MLLMRewriter - LLM-powered rewriting
│       └── request.py     # RequestBuilder - build model requests
│
├── pipelines/             # Pipeline orchestration
│   ├── pipeline.py        # Pipeline - main pipeline
│   └── sub_pipeline.py    # SubPipeline - sub-pipeline
│
├── models/                # Model wrappers
│   ├── base.py            # BaseAPI - base class (retry, parallelism)
│   └── openai_api.py      # OpenAIAPI - OpenAI-compatible API
│
└── utils/                 # Utilities
    ├── registry.py        # Component registry
    ├── checkpoint.py      # Checkpoint resume
    ├── database.py        # LMDB image cache
    └── vision.py          # Image processing

5. Core Design Principles

  1. Operators don’t modify data directly — They return Result objects; the Pipeline applies changes uniformly

  2. Config-driven — All components are instantiated via config, ensuring reproducibility

  3. Registry pattern — Components are registered via @OPERATORS.register_module()

  4. Checkpoint resume — Supports checkpoints for interruptible processing of large datasets

  5. Image caching — LMDB sharded storage avoids repeated image reads