DataStudio Architecture Guide
1. Project Overview
DataStudio is a multimodal data processing toolkit designed for:
Cleaning and filtering training data (Filter)
Rewriting and standardizing data formats (Rewriter)
Intelligent data processing using MLLMs
2. Core Data Flow and Class Relationships
2.1 Overall Architecture
┌─────────────────────────────────────────────────────────────────────────────────┐
│ run.py │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Runner │ │
│ │ │ │
│ │ __init__(): │ │
│ │ 1. cfg = get_cfg() # Load config │ │
│ │ 2. self.logger = build_from_cfg(...) # Build logger │ │
│ │ 3. self.model = build_from_cfg(...) # Build model (OpenAIAPI) │ │
│ │ 4. self.dataloaders = build_dataloaders(...) # Build data loaders │ │
│ │ 5. self.datasaver = build_from_cfg(...) # Build data saver │ │
│ │ 6. self.pipeline = build_from_cfg(...) # Build pipeline │ │
│ │ │ │
│ │ run(): │ │
│ │ for dataloader in self.dataloaders: │ │
│ │ for batch in dataloader: # Iterate batches │ │
│ │ datas = self.pipeline(batch) # Pipeline processing │ │
│ │ self.datasaver(datas) # Accumulate results │ │
│ │ self.datasaver.save() # Save to disk │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘
2.2 Data Loading (StandardDataLoader)
┌─────────────────────────────────────────────────────────────────────────────────┐
│ StandardDataLoader │
│ │
│ __init__(data_root, dataset, batch_size, ...): │
│ │ │
│ ├─▶ FormatRegistry.load(data_file) # Auto-select format by extension │
│ │ │ │
│ │ ├─▶ JsonFormat.load() # .json files │
│ │ └─▶ JsonlFormat.load() # .jsonl files │
│ │ │
│ ├─▶ ShardedLMDBManager.get_instance() # Get LMDB singleton │
│ │ │
│ └─▶ write_images_to_sharded_lmdb() # Pre-process images to cache │
│ │ │
│ └─▶ Multi-threaded read → resize → write to LMDB │
│ │
│ __iter__() / __next__(): │
│ │ │
│ └─▶ Returns List[Dict] # Each Dict is one raw data entry │
│ │ │
│ └─▶ Dict structure: │
│ { │
│ "id": "xxx", │
│ "image": "path/to/img.jpg", │
│ "image_pil": <PIL.Image>, # Pre-loaded image object │
│ "conversations": [...] │
│ } │
└─────────────────────────────────────────────────────────────────────────────────┘
2.3 Pipeline Processing (Pipeline → SubPipeline → Operator)
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Pipeline │
│ │
│ __call__(datas: List[Dict]) -> List[Dict]: │
│ │ │
│ │ # Execute SubPipelines in priority order │
│ │ │
│ for sub_pipeline, name in self.sub_pipelines: # By priority │
│ │ │
│ ├─▶ result = sub_pipeline(datas) │
│ │ │ │
│ │ │ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │ SubPipeline.__call__() │ │
│ │ │ │ │ │
│ │ │ │ # Step 1: Wrap raw data │ │
│ │ │ │ items = [DataItem(d, i) for i, d in enumerate(datas)] │ │
│ │ │ │ │ │
│ │ │ │ # Step 2: Execute each Operator sequentially │ │
│ │ │ │ for op in self.operators: │ │
│ │ │ │ │ │ │
│ │ │ │ ├─▶ results = op.process_batch(items) │ │
│ │ │ │ │ │ │
│ │ │ │ │ # Step 3: Apply results to data │ │
│ │ │ │ └─▶ for item, result in zip(items, results): │ │
│ │ │ │ └─▶ kept, rejected = result.apply_to(item) │ │
│ │ │ │ │ │
│ │ │ │ # Step 4: Separate kept and rejected data │ │
│ │ │ │ items = [i for i in result if not i.is_rejected] │ │
│ │ │ │ rejected.extend([i for i in result if i.is_rejected]) │ │
│ │ │ │ │ │
│ │ │ │ return [item.data for item in items + rejected] │ │
│ │ │ └─────────────────────────────────────────────────────────┘ │
│ │ │ │
│ │ └─▶ List[Dict] # Contains kept and rejected │
│ │ │
│ ├─▶ datas = [d for d in result if not d.get("rejected")] │
│ └─▶ all_rejected.extend([d for d in result if d.get("rejected")]) │
│ │
│ return datas + all_rejected │
└─────────────────────────────────────────────────────────────────────────────────┘
2.4 Operator Execution Details
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Operator Types and Execution Flow │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Filter (Rule-based) │ │
│ │ │ │
│ │ process(item: DataItem) -> Result: │ │
│ │ result = Result(item_idx=item.idx) │ │
│ │ for qa in item.qa_pairs: │ │
│ │ rejected, reason = self.check(item, qa.idx) # Subclass impl │ │
│ │ if rejected: │ │
│ │ result.add_filter(qa.idx, rejected=True, reason=reason) │ │
│ │ return result │ │
│ │ │ │
│ │ # Subclasses only need to implement check(): │ │
│ │ def check(item, qa_idx) -> (bool, str): │ │
│ │ # True, "reason" = filter out │ │
│ │ # False, "" = keep │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Rewriter (Rule-based) │ │
│ │ │ │
│ │ process(item: DataItem) -> Result: │ │
│ │ result = Result(item_idx=item.idx) │ │
│ │ for qa in item.qa_pairs: │ │
│ │ new_content = self.rewrite(item, qa.idx) # Subclass impl │ │
│ │ if new_content is not None: │ │
│ │ result.add_rewrite(qa.idx, new_answer=new_content) │ │
│ │ return result │ │
│ │ │ │
│ │ # Subclasses only need to implement rewrite(): │ │
│ │ def rewrite(item, qa_idx) -> Optional[str]: │ │
│ │ # Return new answer = modify │ │
│ │ # Return None = no change │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ MLLMOperator (LLM-powered) │ │
│ │ │ │
│ │ process_batch(items: List[DataItem]) -> List[Result]: │ │
│ │ │ │ │
│ │ ├─▶ requests = self._build_requests(items) │ │
│ │ │ └─▶ RequestBuilder.build_requests(item, batch_qa) │ │
│ │ │ ├─▶ Read prompt template file │ │
│ │ │ ├─▶ Format QA content into prompt │ │
│ │ │ └─▶ Build payload: [{type: "text/image", ...}] │ │
│ │ │ │ │
│ │ ├─▶ responses = self.model.generate(payloads) │ │
│ │ │ └─▶ OpenAIAPI.generate() │ │
│ │ │ ├─▶ Multi-threaded concurrent requests │ │
│ │ │ ├─▶ Automatic retry on failure │ │
│ │ │ └─▶ Returns List[str] (JSON strings) │ │
│ │ │ │ │
│ │ └─▶ for req, resp in zip(requests, responses): │ │
│ │ parsed = RequestBuilder.parse_response(resp) │ │
│ │ self._add_decision(result, qa_idx, parsed) # Subclass impl│ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘
2.5 Data Saving (StandardDataSaver)
┌─────────────────────────────────────────────────────────────────────────────────┐
│ StandardDataSaver │
│ │
│ __call__(datas: List[Dict]): │
│ └─▶ Classify and accumulate by rejected field │
│ │
│ save(): │
│ ├─▶ SimpleSaver.save(rejected_datas, is_rejected=True) │
│ │ ├─▶ Group data by source_file │
│ │ ├─▶ Determine output path: output_dir/{source}/rejected/{filename} │
│ │ └─▶ FormatRegistry.save(data, path) # Keep original format │
│ │ │
│ └─▶ SimpleSaver.save(datas, is_rejected=False) │
│ ├─▶ Group data by source_file │
│ ├─▶ Determine output path: output_dir/{source}/{filename} │
│ └─▶ FormatRegistry.save(data, path) │
│ │
│ save_yaml(): │
│ └─▶ YamlConfigGenerator.generate() # Generate dataset config YAML │
└─────────────────────────────────────────────────────────────────────────────────┘
2.6 Core Data Structure Transformations
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Data Structure Transformation Flow │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ Raw JSON/JSONL │
│ ───────────────── │
│ { │
│ "id": "001", │
│ "image": "img.jpg", │
│ "conversations": [ │
│ {"from": "human", "value": "Q1"}, │
│ {"from": "gpt", "value": "A1"}, │
│ {"from": "human", "value": "Q2"}, │
│ {"from": "gpt", "value": "A2"} │
│ ] │
│ } │
│ │ │
│ │ DataLoader loads │
│ ▼ │
│ Dict (in memory) │
│ ───────────── │
│ { │
│ "id": "001", │
│ "image": "img.jpg", │
│ "image_pil": <PIL.Image>, # Pre-loaded image │
│ "source_file": "/path/to/file", # Source file path │
│ "conversations": [...] │
│ } │
│ │ │
│ │ SubPipeline wraps │
│ ▼ │
│ DataItem │
│ ──────── │
│ DataItem( │
│ _data = {...}, # Reference to raw Dict │
│ idx = 0, # Index within batch │
│ _qa_pairs = [ # Parsed QA pairs │
│ QA(idx=0, question="Q1", answer="A1"), │
│ QA(idx=1, question="Q2", answer="A2"), │
│ ] │
│ ) │
│ │ │
│ │ Operator processes │
│ ▼ │
│ Result │
│ ────── │
│ Result( │
│ item_idx = 0, │
│ filter_decisions = [ │
│ FilterDecision(qa_idx=1, rejected=True, reason="Too short"), │
│ ], │
│ rewrite_decisions = [ │
│ RewriteDecision(qa_idx=0, new_answer="Modified A1"), │
│ ] │
│ ) │
│ │ │
│ │ Result.apply_to(item) │
│ ▼ │
│ Processed Dict │
│ ───────────── │
│ # Kept data # Rejected data │
│ { { │
│ "id": "001", "id": "001", │
│ "image": "img.jpg", "image": "img.jpg", │
│ "conversations": [ "conversations": [ │
│ {"from": "human", "value": "Q1"}, {"from": "human", "value": "Q2"}, │
│ {"from": "gpt", "value": "Mod A1"}, {"from": "gpt", "value": "A2"} │
│ ], ], │
│ "ori_answer": {"0": "A1"}, "rejected": True, │
│ "rewrite_ops": { "filter_ops": { │
│ "MyRewriter": {"0": "rewritten"} "MyFilter": {"0": "Too short"} │
│ } } │
│ } } │
└─────────────────────────────────────────────────────────────────────────────────┘
2.7 Class Dependency Diagram
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Class Dependencies │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ Runner │ │
│ └────┬────┘ │
│ ┌──────────────────────┼──────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │StandardData │ │ Pipeline │ │StandardDataSaver │ │
│ │ Loader │ └─────┬──────┘ └────────┬─────────┘ │
│ └───────┬───────┘ │ │ │
│ │ │ │ │
│ │ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ShardedLMDB │ │SubPipe │ │SubPipe │ │Simple │ │YamlGen │ │
│ │ Manager │ │ line 1 │ │ line N │ │ Saver │ │erator │ │
│ └───────────────┘ └───┬────┘ └────────┘ └────────┘ └────────┘ │
│ │ │
│ ┌─────┴─────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Operator │ │ Operator │ │
│ └────┬─────┘ └──────────┘ │
│ │ │
│ ┌─────────┼─────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────┐ ┌────────────┐ │
│ │ Filter │ │Rewri │ │MLLMOperator│ │
│ │ │ │ ter │ │ │ │
│ └────┬─────┘ └──┬───┘ └─────┬──────┘ │
│ │ │ │ │
│ │ │ ┌─────┴─────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────┐ ┌───────┐ ┌──────────────┐ │
│ │DataItem │ │Resul│ │Request│ │ OpenAIAPI │ │
│ │ QA │ │ t │ │Builder│ │ (Model) │ │
│ └─────────┘ └─────┘ └───────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
3. Key Class Reference
Class |
File |
Purpose |
|---|---|---|
|
|
Main entry point, coordinates all components |
|
|
Loads data, manages LMDB cache |
|
|
Saves processing results |
|
|
Main pipeline, executes sub-pipelines by priority |
|
|
Sub-pipeline, executes operators sequentially |
|
|
Data wrapper, provides QA access |
|
|
Filter base class |
|
|
Rewriter base class |
|
|
Operator execution result |
|
|
MLLM operator base class |
|
|
Builds MLLM requests |
4. Directory Structure
datastudio/
├── datasets/ # Data loading/saving
│ ├── data_loader.py # StandardDataLoader - load data + LMDB image cache
│ ├── data_saver.py # StandardDataSaver - save processing results
│ ├── formatters/ # JSON/JSONL format handlers
│ └── config/ # YAML config loading
│
├── operators/ # Data processing operators
│ ├── core/ # Core abstractions
│ │ ├── data_item.py # DataItem, QA - data wrapper
│ │ ├── operator.py # Operator, Filter, Rewriter - base classes
│ │ └── result.py # Result, FilterDecision, RewriteDecision
│ ├── filters/ # Rule-based filters
│ │ ├── conv_length.py # Conversation turn count filter
│ │ ├── image_size.py # Image size filter
│ │ └── text_repeat.py # Text repetition detection
│ ├── rewriters/ # Rule-based rewriters
│ │ ├── norm_prompt.py # Normalize prompts
│ │ ├── remove_think.py# Remove think content
│ │ └── split.py # Split multi-turn conversations
│ └── mllm/ # MLLM operators
│ ├── base.py # MLLMOperator base class
│ ├── filter.py # MLLMFilter - LLM-powered filtering
│ ├── rewriter.py # MLLMRewriter - LLM-powered rewriting
│ └── request.py # RequestBuilder - build model requests
│
├── pipelines/ # Pipeline orchestration
│ ├── pipeline.py # Pipeline - main pipeline
│ └── sub_pipeline.py # SubPipeline - sub-pipeline
│
├── models/ # Model wrappers
│ ├── base.py # BaseAPI - base class (retry, parallelism)
│ └── openai_api.py # OpenAIAPI - OpenAI-compatible API
│
└── utils/ # Utilities
├── registry.py # Component registry
├── checkpoint.py # Checkpoint resume
├── database.py # LMDB image cache
└── vision.py # Image processing
5. Core Design Principles
Operators don’t modify data directly — They return Result objects; the Pipeline applies changes uniformly
Config-driven — All components are instantiated via config, ensuring reproducibility
Registry pattern — Components are registered via
@OPERATORS.register_module()Checkpoint resume — Supports checkpoints for interruptible processing of large datasets
Image caching — LMDB sharded storage avoids repeated image reads