DataStudio 架构快速理解指南

一、项目定位

DataStudio 是一个多模态数据处理工具包,专门用于:

  • 清洗和过滤训练数据(Filter)

  • 重写和标准化数据格式(Rewriter)

  • 利用大模型(MLLM)进行智能数据处理


二、核心数据流与类调用关系

2.1 整体架构图

┌─────────────────────────────────────────────────────────────────────────────────┐
│                                 run.py                                          │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                              Runner                                      │   │
│  │                                                                          │   │
│  │   __init__():                                                            │   │
│  │     1. cfg = get_cfg()                    # 加载配置                      │   │
│  │     2. self.logger = build_from_cfg(...)  # 构建日志                      │   │
│  │     3. self.model = build_from_cfg(...)   # 构建模型 (OpenAIAPI)          │   │
│  │     4. self.dataloaders = build_dataloaders(...)  # 构建数据加载器         │   │
│  │     5. self.datasaver = build_from_cfg(...)       # 构建数据保存器         │   │
│  │     6. self.pipeline = build_from_cfg(...)        # 构建处理管线          │   │
│  │                                                                          │   │
│  │   run():                                                                 │   │
│  │     for dataloader in self.dataloaders:                                  │   │
│  │         for batch in dataloader:          # 迭代获取批次数据              │   │
│  │             datas = self.pipeline(batch)  # 管线处理                      │   │
│  │             self.datasaver(datas)         # 累积结果                      │   │
│  │         self.datasaver.save()             # 保存到磁盘                    │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────────┘

2.2 数据加载流程 (StandardDataLoader)

┌─────────────────────────────────────────────────────────────────────────────────┐
│                          StandardDataLoader                                     │
│                                                                                 │
│   __init__(data_root, dataset, batch_size, ...):                               │
│     │                                                                           │
│     ├─▶ FormatRegistry.load(data_file)      # 根据扩展名自动选择格式器           │
│     │       │                                                                   │
│     │       ├─▶ JsonFormat.load()           # .json 文件                        │
│     │       └─▶ JsonlFormat.load()          # .jsonl 文件                       │
│     │                                                                           │
│     ├─▶ ShardedLMDBManager.get_instance()   # 获取 LMDB 单例                    │
│     │                                                                           │
│     └─▶ write_images_to_sharded_lmdb()      # 预处理图像到缓存                   │
│             │                                                                   │
│             └─▶ 多线程读取图像 → resize → 写入 LMDB                              │
│                                                                                 │
│   __iter__() / __next__():                                                     │
│     │                                                                           │
│     └─▶ 返回 List[Dict]                     # 每个 Dict 是一条原始数据           │
│             │                                                                   │
│             └─▶ Dict 结构:                                                      │
│                   {                                                             │
│                     "id": "xxx",                                                │
│                     "image": "path/to/img.jpg",                                 │
│                     "image_pil": <PIL.Image>,    # 预加载的图像对象              │
│                     "conversations": [...]                                      │
│                   }                                                             │
└─────────────────────────────────────────────────────────────────────────────────┘

2.3 管线处理流程 (Pipeline → SubPipeline → Operator)

┌─────────────────────────────────────────────────────────────────────────────────┐
│                               Pipeline                                          │
│                                                                                 │
│   __call__(datas: List[Dict]) -> List[Dict]:                                   │
│     │                                                                           │
│     │  # 按 priority 排序后依次执行 SubPipeline                                  │
│     │                                                                           │
│     for sub_pipeline, name in self.sub_pipelines:  # 按优先级顺序               │
│         │                                                                       │
│         ├─▶ result = sub_pipeline(datas)                                       │
│         │       │                                                               │
│         │       │  ┌─────────────────────────────────────────────────────────┐ │
│         │       │  │              SubPipeline.__call__()                     │ │
│         │       │  │                                                         │ │
│         │       │  │  # Step 1: 包装原始数据                                  │ │
│         │       │  │  items = [DataItem(d, i) for i, d in enumerate(datas)]  │ │
│         │       │  │                                                         │ │
│         │       │  │  # Step 2: 依次执行每个 Operator                         │ │
│         │       │  │  for op in self.operators:                              │ │
│         │       │  │      │                                                  │ │
│         │       │  │      ├─▶ results = op.process_batch(items)              │ │
│         │       │  │      │       │                                          │ │
│         │       │  │      │       │  ┌───────────────────────────────────┐   │ │
│         │       │  │      │       │  │  Operator.process_batch()         │   │ │
│         │       │  │      │       │  │                                   │   │ │
│         │       │  │      │       │  │  # 默认实现:逐个处理              │   │ │
│         │       │  │      │       │  │  return [self.process(item)       │   │ │
│         │       │  │      │       │  │          for item in items]       │   │ │
│         │       │  │      │       │  │                                   │   │ │
│         │       │  │      │       │  │  # MLLM 算子重写此方法实现批处理   │   │ │
│         │       │  │      │       │  └───────────────────────────────────┘   │ │
│         │       │  │      │       │                                          │ │
│         │       │  │      │       └─▶ List[Result]                           │ │
│         │       │  │      │                                                  │ │
│         │       │  │      │  # Step 3: 应用结果到数据                         │ │
│         │       │  │      │                                                  │ │
│         │       │  │      └─▶ for item, result in zip(items, results):       │ │
│         │       │  │              │                                          │ │
│         │       │  │              └─▶ kept, rejected = result.apply_to(item) │ │
│         │       │  │                      │                                  │ │
│         │       │  │                      │  ┌─────────────────────────────┐ │ │
│         │       │  │                      │  │  Result.apply_to()          │ │ │
│         │       │  │                      │  │                             │ │ │
│         │       │  │                      │  │  1. _apply_rewrites()       │ │ │
│         │       │  │                      │  │     → item.set_answer()     │ │ │
│         │       │  │                      │  │                             │ │ │
│         │       │  │                      │  │  2. _apply_filters()        │ │ │
│         │       │  │                      │  │     → item.mark_rejected()  │ │ │
│         │       │  │                      │  │     → item.split()          │ │ │
│         │       │  │                      │  │                             │ │ │
│         │       │  │                      │  │  return (kept, rejected)    │ │ │
│         │       │  │                      │  └─────────────────────────────┘ │ │
│         │       │  │                      │                                  │ │
│         │       │  │                      └─▶ (DataItem | None, DataItem | None)│
│         │       │  │                                                         │ │
│         │       │  │  # Step 4: 分离保留和拒绝的数据                          │ │
│         │       │  │  items = [i for i in result if not i.is_rejected]       │ │
│         │       │  │  rejected.extend([i for i in result if i.is_rejected])  │ │
│         │       │  │                                                         │ │
│         │       │  │  return [item.data for item in items + rejected]        │ │
│         │       │  └─────────────────────────────────────────────────────────┘ │
│         │       │                                                               │
│         │       └─▶ List[Dict]  # 包含 kept 和 rejected                        │
│         │                                                                       │
│         ├─▶ datas = [d for d in result if not d.get("rejected")]  # 继续处理   │
│         └─▶ all_rejected.extend([d for d in result if d.get("rejected")])      │
│                                                                                 │
│     return datas + all_rejected                                                │
└─────────────────────────────────────────────────────────────────────────────────┘

2.4 Operator 执行细节

┌─────────────────────────────────────────────────────────────────────────────────┐
│                        Operator 类型与执行流程                                   │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                         Filter (规则过滤器)                              │   │
│  │                                                                          │   │
│  │   process(item: DataItem) -> Result:                                    │   │
│  │     result = Result(item_idx=item.idx)                                  │   │
│  │     for qa in item.qa_pairs:                                            │   │
│  │         rejected, reason = self.check(item, qa.idx)  # 子类实现          │   │
│  │         if rejected:                                                    │   │
│  │             result.add_filter(qa.idx, rejected=True, reason=reason)     │   │
│  │     return result                                                       │   │
│  │                                                                          │   │
│  │   # 子类只需实现 check():                                                │   │
│  │   def check(item, qa_idx) -> (bool, str):                               │   │
│  │       # True, "原因" = 过滤掉                                            │   │
│  │       # False, "" = 保留                                                 │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                        Rewriter (规则重写器)                             │   │
│  │                                                                          │   │
│  │   process(item: DataItem) -> Result:                                    │   │
│  │     result = Result(item_idx=item.idx)                                  │   │
│  │     for qa in item.qa_pairs:                                            │   │
│  │         new_content = self.rewrite(item, qa.idx)  # 子类实现             │   │
│  │         if new_content is not None:                                     │   │
│  │             result.add_rewrite(qa.idx, new_answer=new_content)          │   │
│  │     return result                                                       │   │
│  │                                                                          │   │
│  │   # 子类只需实现 rewrite():                                              │   │
│  │   def rewrite(item, qa_idx) -> Optional[str]:                           │   │
│  │       # 返回新答案 = 修改                                                │   │
│  │       # 返回 None = 不修改                                               │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                       MLLMOperator (大模型算子)                          │   │
│  │                                                                          │   │
│  │   process_batch(items: List[DataItem]) -> List[Result]:                 │   │
│  │     │                                                                    │   │
│  │     ├─▶ requests = self._build_requests(items)                          │   │
│  │     │       │                                                            │   │
│  │     │       └─▶ RequestBuilder.build_requests(item, batch_qa)           │   │
│  │     │               │                                                    │   │
│  │     │               ├─▶ 读取 prompt 模板文件                              │   │
│  │     │               ├─▶ 格式化 QA 内容到 prompt                           │   │
│  │     │               └─▶ 构建 payload: [{type: "text/image", value: ...}] │   │
│  │     │                                                                    │   │
│  │     ├─▶ responses = self.model.generate(payloads)                       │   │
│  │     │       │                                                            │   │
│  │     │       └─▶ OpenAIAPI.generate()                                    │   │
│  │     │               │                                                    │   │
│  │     │               ├─▶ 多线程并发请求                                    │   │
│  │     │               ├─▶ 自动重试失败请求                                  │   │
│  │     │               └─▶ 返回 List[str] (JSON 字符串)                      │   │
│  │     │                                                                    │   │
│  │     └─▶ for req, resp in zip(requests, responses):                      │   │
│  │             parsed = RequestBuilder.parse_response(resp)                │   │
│  │             self._add_decision(result, qa_idx, parsed)  # 子类实现       │   │
│  │                                                                          │   │
│  │   # MLLMFilter._add_decision(): 根据 parsed["result"] 添加 FilterDecision│   │
│  │   # MLLMRewriter._add_decision(): 根据 parsed["answer"] 添加 RewriteDecision│
│  └─────────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────────┘

2.5 数据保存流程 (StandardDataSaver)

┌─────────────────────────────────────────────────────────────────────────────────┐
│                          StandardDataSaver                                      │
│                                                                                 │
│   __call__(datas: List[Dict]):                                                 │
│     │                                                                           │
│     └─▶ 按 rejected 字段分类累积到 self.datas / self.rejected_datas            │
│                                                                                 │
│   save():                                                                       │
│     │                                                                           │
│     ├─▶ SimpleSaver.save(rejected_datas, is_rejected=True)                     │
│     │       │                                                                   │
│     │       ├─▶ 按 source_file 分组数据                                         │
│     │       ├─▶ 确定输出路径: output_dir/{source}/rejected/{filename}           │
│     │       └─▶ FormatRegistry.save(data, path)  # 保持原格式                   │
│     │                                                                           │
│     └─▶ SimpleSaver.save(datas, is_rejected=False)                             │
│             │                                                                   │
│             ├─▶ 按 source_file 分组数据                                         │
│             ├─▶ 确定输出路径: output_dir/{source}/{filename}                    │
│             └─▶ FormatRegistry.save(data, path)                                │
│                                                                                 │
│   save_yaml():                                                                  │
│     │                                                                           │
│     └─▶ YamlConfigGenerator.generate()  # 生成数据集配置 YAML                   │
└─────────────────────────────────────────────────────────────────────────────────┘

2.6 核心数据结构转换

┌─────────────────────────────────────────────────────────────────────────────────┐
│                            数据结构转换流程                                      │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│   原始 JSON/JSONL                                                               │
│   ─────────────────                                                             │
│   {                                                                             │
│     "id": "001",                                                                │
│     "image": "img.jpg",                                                         │
│     "conversations": [                                                          │
│       {"from": "human", "value": "Q1"},                                        │
│       {"from": "gpt", "value": "A1"},                                          │
│       {"from": "human", "value": "Q2"},                                        │
│       {"from": "gpt", "value": "A2"}                                           │
│     ]                                                                           │
│   }                                                                             │
│         │                                                                       │
│         │  DataLoader 加载                                                      │
│         ▼                                                                       │
│   Dict (内存中)                                                                 │
│   ─────────────                                                                 │
│   {                                                                             │
│     "id": "001",                                                                │
│     "image": "img.jpg",                                                         │
│     "image_pil": <PIL.Image>,        # 预加载的图像                             │
│     "source_file": "/path/to/file",  # 来源文件                                 │
│     "conversations": [...]                                                      │
│   }                                                                             │
│         │                                                                       │
│         │  SubPipeline 包装                                                     │
│         ▼                                                                       │
│   DataItem                                                                      │
│   ────────                                                                      │
│   DataItem(                                                                     │
│     _data = {...},                   # 原始 Dict 引用                           │
│     idx = 0,                         # 批次内索引                               │
│     _qa_pairs = [                    # 解析后的 QA 对                           │
│       QA(idx=0, question="Q1", answer="A1"),                                   │
│       QA(idx=1, question="Q2", answer="A2"),                                   │
│     ]                                                                           │
│   )                                                                             │
│         │                                                                       │
│         │  Operator 处理                                                        │
│         ▼                                                                       │
│   Result                                                                        │
│   ──────                                                                        │
│   Result(                                                                       │
│     item_idx = 0,                                                               │
│     filter_decisions = [                                                        │
│       FilterDecision(qa_idx=1, rejected=True, reason="太短"),                  │
│     ],                                                                          │
│     rewrite_decisions = [                                                       │
│       RewriteDecision(qa_idx=0, new_answer="修改后的A1"),                       │
│     ]                                                                           │
│   )                                                                             │
│         │                                                                       │
│         │  Result.apply_to(item)                                                │
│         ▼                                                                       │
│   处理后的 Dict                                                                  │
│   ─────────────                                                                 │
│   # 保留的数据 (kept)                     # 被过滤的数据 (rejected)              │
│   {                                      {                                      │
│     "id": "001",                           "id": "001",                         │
│     "image": "img.jpg",                    "image": "img.jpg",                  │
│     "conversations": [                     "conversations": [                   │
│       {"from": "human", "value": "Q1"},      {"from": "human", "value": "Q2"}, │
│       {"from": "gpt", "value": "修改后A1"},  {"from": "gpt", "value": "A2"}     │
│     ],                                     ],                                   │
│     "ori_answer": {"0": "A1"},             "rejected": True,                   │
│     "rewrite_ops": {                       "filter_ops": {                      │
│       "MyRewriter": {"0": "rewritten"}       "MyFilter": {"0": "太短"}          │
│     }                                      }                                    │
│   }                                      }                                      │
│         │                                                                       │
│         │  DataSaver 保存                                                       │
│         ▼                                                                       │
│   output/                                                                       │
│   ├── source_name/                                                              │
│   │   └── data.jsonl                 # 保留的数据                               │
│   └── source_name/rejected/                                                     │
│       └── data.jsonl                 # 被过滤的数据                              │
└─────────────────────────────────────────────────────────────────────────────────┘

2.7 类依赖关系图

┌─────────────────────────────────────────────────────────────────────────────────┐
│                              类依赖关系                                          │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│                              ┌─────────┐                                        │
│                              │ Runner  │                                        │
│                              └────┬────┘                                        │
│           ┌──────────────────────┼──────────────────────┐                       │
│           │                      │                      │                       │
│           ▼                      ▼                      ▼                       │
│   ┌───────────────┐      ┌────────────┐      ┌──────────────────┐              │
│   │StandardData   │      │  Pipeline  │      │StandardDataSaver │              │
│   │   Loader      │      └─────┬──────┘      └────────┬─────────┘              │
│   └───────┬───────┘            │                      │                         │
│           │                    │                      │                         │
│           │              ┌─────┴─────┐          ┌─────┴─────┐                   │
│           │              │           │          │           │                   │
│           ▼              ▼           ▼          ▼           ▼                   │
│   ┌───────────────┐  ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐            │
│   │ShardedLMDB    │  │SubPipe │  │SubPipe │  │Simple  │  │YamlGen │            │
│   │   Manager     │  │ line 1 │  │ line N │  │ Saver  │  │erator  │            │
│   └───────────────┘  └───┬────┘  └────────┘  └────────┘  └────────┘            │
│                          │                                                      │
│                    ┌─────┴─────────────────┐                                    │
│                    │                       │                                    │
│                    ▼                       ▼                                    │
│              ┌──────────┐           ┌──────────┐                                │
│              │ Operator │           │ Operator │                                │
│              └────┬─────┘           └──────────┘                                │
│                   │                                                             │
│         ┌─────────┼─────────┐                                                   │
│         │         │         │                                                   │
│         ▼         ▼         ▼                                                   │
│   ┌──────────┐ ┌──────┐ ┌────────────┐                                         │
│   │  Filter  │ │Rewri │ │MLLMOperator│                                         │
│   │          │ │ ter  │ │            │                                         │
│   └────┬─────┘ └──┬───┘ └─────┬──────┘                                         │
│        │          │           │                                                 │
│        │          │     ┌─────┴─────┐                                           │
│        │          │     │           │                                           │
│        ▼          ▼     ▼           ▼                                           │
│   ┌─────────┐ ┌─────┐ ┌───────┐ ┌──────────────┐                               │
│   │DataItem │ │Resul│ │Request│ │  OpenAIAPI   │                               │
│   │   QA    │ │  t  │ │Builder│ │   (Model)    │                               │
│   └─────────┘ └─────┘ └───────┘ └──────────────┘                               │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

三、核心概念

3.1 数据格式

输入数据是 JSON/JSONL 格式的对话数据:

{
    "id": "unique_id",
    "image": "path/to/image.jpg",
    "conversations": [
        {"from": "human", "value": "这张图片里有什么?"},
        {"from": "gpt", "value": "图片显示..."}
    ]
}

核心抽象

  • DataItem: 单条数据的包装类,提供便捷的 QA 对访问

  • QA: 一个问答对(human + gpt 各一条消息)

# DataItem 使用示例
item = DataItem(raw_data, idx=0)
for qa in item.qa_pairs:
    print(f"Q: {qa.question}")
    print(f"A: {qa.answer}")

3.2 Operator(算子)

所有数据处理逻辑都是 Operator,分为两类:

类型

作用

返回

Filter

判断数据是否应该被过滤

(rejected: bool, reason: str)

Rewriter

修改数据内容

new_answer: strNone

# Filter 示例
class MyFilter(Filter):
    def check(self, item: DataItem, qa_idx: int) -> Tuple[bool, str]:
        if len(item.get_answer(qa_idx)) < 10:
            return True, "答案太短"  # 过滤掉
        return False, ""  # 保留

# Rewriter 示例
class MyRewriter(Rewriter):
    def rewrite(self, item: DataItem, qa_idx: int) -> Optional[str]:
        answer = item.get_answer(qa_idx)
        return answer.strip()  # 返回新答案

3.3 Result(结果)

Operator 不直接修改数据,而是返回 Result

Result
├── filter_decisions: List[FilterDecision]   # 过滤决策
   └── FilterDecision(qa_idx, rejected, reason)
└── rewrite_decisions: List[RewriteDecision] # 重写决策
    └── RewriteDecision(qa_idx, new_answer, message)

Pipeline 统一调用 result.apply_to(item) 来应用修改。

3.4 Pipeline(管线)

Pipeline
├── SubPipeline 1 (priority=0)  ◀── 先执行
│   ├── ConvLengthFilter
│   └── ImageSizeFilter
├── SubPipeline 2 (priority=1)
│   └── MLLMFilter (图文一致性)
├── SubPipeline 3 (priority=2)
│   └── MLLMRewriter (重写答案)
└── SubPipeline 4 (priority=3)  ◀── 后执行
    └── ResponseTagFilter

四、目录结构速查

datastudio/
├── datasets/              # 数据加载/保存
│   ├── data_loader.py     # StandardDataLoader - 加载数据 + LMDB 图像缓存
│   ├── data_saver.py      # StandardDataSaver - 保存处理结果
│   ├── formatters/        # JSON/JSONL 格式处理器
│   └── config/            # YAML 配置加载
│
├── operators/             # 数据处理算子
│   ├── core/              # 核心抽象
│   │   ├── data_item.py   # DataItem, QA - 数据封装
│   │   ├── operator.py    # Operator, Filter, Rewriter - 基类
│   │   └── result.py      # Result, FilterDecision, RewriteDecision
│   ├── filters/           # 规则过滤器
│   │   ├── conv_length.py # 对话轮数过滤
│   │   ├── image_size.py  # 图像尺寸过滤
│   │   └── text_repeat.py # 文本重复检测
│   ├── rewriters/         # 规则重写器
│   │   ├── norm_prompt.py # 标准化 prompt
│   │   ├── remove_think.py# 移除思考内容
│   │   └── split.py       # 拆分多轮对话
│   └── mllm/              # MLLM 算子
│       ├── base.py        # MLLMOperator 基类
│       ├── filter.py      # MLLMFilter - 用大模型过滤
│       ├── rewriter.py    # MLLMRewriter - 用大模型重写
│       └── request.py     # RequestBuilder - 构建模型请求
│
├── pipelines/             # 管线编排
│   ├── pipeline.py        # Pipeline - 主管线
│   └── sub_pipeline.py    # SubPipeline - 子管线
│
├── models/                # 模型封装
│   ├── base.py            # BaseAPI - 基类(重试、并行)
│   └── openai_api.py      # OpenAIAPI - OpenAI 兼容 API
│
└── utils/                 # 工具函数
    ├── registry.py        # 组件注册表
    ├── checkpoint.py      # 断点续传
    ├── database.py        # LMDB 图像缓存
    └── vision.py          # 图像处理

五、配置文件解读

配置文件是 Python 文件(不是 YAML),支持继承:

# configs/my_config.py

# 继承基础配置
_base_ = [
    "@/_base_/models/local_api_model.py",
    "@/_base_/dataset.py",
]

# 工作目录
work_dir = "work_dirs/my_experiment"

# 日志
logger = dict(type="Logger", log_file="logs/run.txt")

# 数据集配置
dataset_yaml = "/path/to/dataset.yaml"
dataloader = dict(
    dataset=dataset_yaml,
    batch_size=512,
    use_image=True,
    cache_dir="~/cache/images_lmdb",
)

# 数据保存
datasaver = dict(
    dataset=dataset_yaml,
    output_dir="output",
)

# 模型(用于 MLLM 算子)
model = dict(model="Qwen3-VL-8B-Instruct", thread_num=512)

# 管线定义
pipeline = dict(
    type="Pipeline",
    operations={
        # 规则过滤(优先级 0,最先执行)
        "basic_filter": dict(
            cfg=dict(
                type="SubPipeline",
                operators=[
                    dict(type="ConvLengthFilter", min_length=1, max_length=20),
                    dict(type="ImageSizeFilter", min_size=100),
                ],
            ),
            priority=0,
        ),
        # MLLM 过滤(优先级 1)
        "mllm_filter": dict(
            cfg=dict(
                type="SubPipeline",
                operators=[
                    dict(
                        type="MLLMFilter",
                        request_builder=dict(
                            type="RequestBuilder",
                            prompt="prompts/filter/question_image_consist_v2.txt",
                            with_image=True,
                        ),
                    ),
                ],
            ),
            priority=1,
        ),
    },
)

六、运行方式

# 运行处理
python run.py -c configs/my_config.py

# 输出结构
work_dirs/my_experiment/
├── config.yaml          # 保存的配置
├── logs/run.txt         # 日志
└── checkpoint.json      # 断点信息

output/
├── source_name/                # 按 source 分目录   ├── data.jsonl              # 保留的数据   └── rejected/               # 被过滤的数据       └── data.jsonl
├── output.yaml                 # 保留数据的 YAML 索引
└── output_rejected.yaml        # 过滤数据的 YAML 索引

七、关键类速查表

文件

作用

Runner

run.py

主入口,协调各组件

StandardDataLoader

datasets/data_loader.py

加载数据,管理 LMDB 缓存

StandardDataSaver

datasets/data_saver.py

保存处理结果

Pipeline

pipelines/pipeline.py

主管线,按优先级执行子管线

SubPipeline

pipelines/sub_pipeline.py

子管线,顺序执行算子

DataItem

operators/core/data_item.py

数据封装,提供 QA 访问

Filter

operators/core/operator.py

过滤器基类

Rewriter

operators/core/operator.py

重写器基类

Result

operators/core/result.py

算子执行结果

MLLMOperator

operators/mllm/base.py

MLLM 算子基类

RequestBuilder

operators/mllm/request.py

构建 MLLM 请求


八、扩展指南

添加新的 Filter

# datastudio/operators/filters/my_filter.py
from datastudio.operators.core import Filter, DataItem
from datastudio.utils.registry import OPERATORS

@OPERATORS.register_module()
class MyCustomFilter(Filter):
    def __init__(self, threshold: float = 0.5, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
    
    def check(self, item: DataItem, qa_idx: int) -> Tuple[bool, str]:
        answer = item.get_answer(qa_idx)
        if some_condition(answer, self.threshold):
            return True, "不符合条件"
        return False, ""

添加新的 Rewriter

# datastudio/operators/rewriters/my_rewriter.py
from datastudio.operators.core import Rewriter, DataItem
from datastudio.utils.registry import OPERATORS

@OPERATORS.register_module()
class MyCustomRewriter(Rewriter):
    def rewrite(self, item: DataItem, qa_idx: int) -> Optional[str]:
        answer = item.get_answer(qa_idx)
        new_answer = transform(answer)
        return new_answer if new_answer != answer else None

在配置中使用

pipeline = dict(
    type="Pipeline",
    operations={
        "my_step": dict(
            cfg=dict(
                type="SubPipeline",
                operators=[
                    dict(type="MyCustomFilter", threshold=0.8),
                    dict(type="MyCustomRewriter"),
                ],
            ),
            priority=0,
        ),
    },
)

九、核心设计原则

  1. 算子不直接修改数据 - 返回 Result,由 Pipeline 统一应用

  2. 配置驱动 - 所有组件通过配置实例化,支持复现

  3. 注册表模式 - 组件通过 @OPERATORS.register_module() 注册

  4. 断点续传 - 支持 checkpoint,大数据集处理可中断恢复

  5. 图像缓存 - LMDB 分片存储,避免重复读取图像


十、常见问题

Q: 如何只运行部分数据测试? A: 修改 dataloader.batch_size 为小值,或在数据文件中只保留少量样本

Q: 如何跳过图像处理? A: 设置 dataloader.use_image=False

Q: 如何查看处理日志? A: 查看 work_dir/logs/ 目录下的日志文件

Q: 如何从断点恢复? A: 直接重新运行相同命令,会自动读取 checkpoint 继续处理