快速开始
本文档帮助你从零开始使用 DataStudio 处理多模态数据。
1. 安装
环境要求
Python >= 3.10
足够的磁盘空间(LMDB 图像缓存需要额外存储)
安装步骤
# 克隆仓库
git clone https://github.com/Open-Bee/DataStudio.git
cd DataStudio
# 创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate
# 安装
pip install -e .
# 验证
python -c "import datastudio; print(datastudio.__version__)"
可选依赖
# Weights & Biases 监控
pip install wandb
wandb login
快速体验
安装完成后,可以直接运行内置示例验证环境(无需额外数据):
# 规则过滤示例(纯 CPU,无需 MLLM)
python run.py -c configs/examples/rule_filter_only.py
# 文本规范化示例(移除 think 标签等)
python run.py -c configs/examples/text_normalization.py
更多示例请参考 示例指南,包含 5 个覆盖不同场景的完整配置。
2. 数据格式
DataStudio 使用标准的多模态对话格式,支持 JSON 和 JSONL 文件。
基本格式
{
"id": "sample_001",
"image": "/path/to/image.jpg",
"conversations": [
{"from": "human", "value": "<image>\n这张图片里有什么?"},
{"from": "gpt", "value": "图片展示了一个美丽的日落。"}
]
}
字段说明:
字段 |
必需 |
说明 |
|---|---|---|
|
否 |
样本唯一标识 |
|
否 |
图像路径,单图为字符串,多图为列表 |
|
是 |
对话列表, |
多图格式
{
"id": "multi_img_001",
"image": ["/path/to/img1.jpg", "/path/to/img2.jpg"],
"conversations": [
{"from": "human", "value": "<image>\n<image>\n比较这两张图片。"},
{"from": "gpt", "value": "第一张是白天,第二张是夜晚。"}
]
}
Schema 兼容
DataStudio 同时支持 conversations 格式和 messages 格式(OpenAI 风格),加载时自动转换为内部标准格式。
3. 数据集 YAML 配置
数据集通过 YAML 文件描述,一个 YAML 可包含多个数据源:
# dataset.yaml
datasets:
- json_path: /data/dataset_a.jsonl
source: dataset_a
- json_path: /data/dataset_b.json
source: dataset_b
字段说明:
字段 |
说明 |
|---|---|
|
数据文件路径(三者等价) |
|
数据源名称,用于输出时分目录保存 |
4. 核心概念
算子类型
DataStudio 提供两类算子,每类都有规则版本和 MLLM 版本:
类型 |
作用 |
核心方法 |
|---|---|---|
Filter |
判断是否保留样本 |
|
Rewriter |
修改内容(不删除样本) |
|
MLLMFilter |
调用 MLLM 进行质量过滤 |
通过 |
MLLMRewriter |
调用 MLLM 进行内容重写 |
通过 |
流水线结构
Pipeline 由多个 SubPipeline 组成,按 priority(数值越小越先执行)依次执行。每个 SubPipeline 内的算子顺序执行,被前序算子拒绝的样本不会传入后续算子。
Pipeline
├── SubPipeline (priority=0) ← 先执行
│ ├── ConvLengthFilter
│ └── ImageSizeFilter
├── SubPipeline (priority=1)
│ └── MLLMFilter
└── SubPipeline (priority=2) ← 后执行
└── MLLMRewriter
配置继承
配置文件通过 _base_ 字段实现继承,@/ 前缀表示从 configs/ 目录解析:
_base_ = [
"@/_base_/models/local_api_model.py",
"@/_base_/dataset.py",
"@/_base_/filters/filter_rule_base_for_question.py",
]
_base_列表中的配置文件按顺序合并子配置中的同名字段会覆盖基础配置
dict类型的字段会递归合并
预置的基础配置:
文件 |
说明 |
|---|---|
|
DataLoader 和 DataSaver 默认参数 |
|
本地 API 模型默认参数 |
|
常用问题过滤规则组合(priority=0) |
|
常用答案过滤规则组合(priority=50) |
|
重写器基础模板(priority=1) |
5. 内置算子
过滤器
算子 |
说明 |
关键参数 |
|---|---|---|
|
按对话轮数过滤 |
|
|
按图像尺寸过滤 |
|
|
按宽高比过滤 |
|
|
按图像格式过滤 |
— |
|
文本长度异常检测 |
|
|
按响应标签过滤 |
— |
|
文本重复检测 |
|
|
MLLM 质量过滤 |
|
重写器
算子 |
说明 |
|---|---|
|
移除 |
|
标准化 think 标签格式 |
|
添加 |
|
标准化 |
|
标准化提示词格式 |
|
标准化多轮对话提示词 |
|
移除答案内容 |
|
移除推理前缀 |
|
多轮对话拆分为单轮 |
|
MLLM 内容重写 |
|
条件触发的 MLLM 重写 |
6. 配置文件详解
最小配置示例
以下示例展示了一个仅使用规则算子的流水线:
# configs/my_first_pipeline.py
_base_ = ["@/_base_/dataset.py"]
work_dir = "./work_dirs/my_first_run"
logger = dict(type="Logger", log_file="logs/run.log")
dataset_yaml = "/path/to/dataset.yaml"
dataloader = dict(
dataset=dataset_yaml,
batch_size=1000,
use_image=False, # 纯文本处理,不加载图像
)
datasaver = dict(
dataset=dataset_yaml,
output_dir="./output",
save_yaml_name="my_output", # 输出 YAML 文件名前缀
)
pipeline = dict(
type="Pipeline",
operations={
"rule_filters": dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="ConvLengthFilter", min_length=1, max_length=20),
dict(type="LengthAnomalyFilter", min_length=2, max_length=4096,
check_question=True, check_answer=True, use_tokenizer=True),
dict(type="TextRepeatFilter", check_question=True, check_answer=True),
]),
priority=0,
),
"rewriters": dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="RemoveThinkRewriter"),
dict(type="NormImageTagRewriter"),
]),
priority=1,
),
}
)
包含 MLLM 算子的配置
使用 MLLM 算子时,需要配置 model 字段并部署一个 OpenAI API 兼容的推理服务:
# configs/my_mllm_pipeline.py
_base_ = [
"@/_base_/models/local_api_model.py",
"@/_base_/dataset.py",
"@/_base_/filters/filter_rule_base_for_question.py",
]
work_dir = "./work_dirs/mllm_run"
logger = dict(type="Logger", log_file="logs/mllm_run.log")
dataset_yaml = "/path/to/dataset.yaml"
dataloader = dict(
dataset=dataset_yaml,
batch_size=5000,
use_image=True,
cache_dir="~/cache/images_lmdb", # LMDB 图像缓存目录
resize_image_size=2048, # 图像最大边长
)
datasaver = dict(
dataset=dataset_yaml,
output_dir="./output",
save_yaml_name="mllm_processed",
)
model = dict(
model="Qwen3-VL-30B-A3B-Instruct",
api_base="http://127.0.0.1",
port=8000,
thread_num=512, # 并发请求数
return_dict=True, # JSON 输出模式
)
# MLLM 过滤器的请求构建器
filter_request = dict(
type="RequestBuilder",
prompt="prompts/filter/question_image_consist_v2.txt", # 提示词文件
key_templates={"result": "q{idx}", "reason": "q{idx}_reason"}, # 响应解析模板
with_image=True,
with_question=True,
)
# 在已有的规则过滤器(继承自 _base_)之后添加 MLLM 过滤器
mllm_filter = dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="MLLMFilter", request_builder=filter_request),
]),
priority=10,
)
pipeline = dict(
type="Pipeline",
operations={
"filter_rule_base_for_question": filter_rule_base_for_question, # 来自 _base_
"mllm_filter": mllm_filter,
}
)
7. 运行流水线
基本运行
python run.py -c configs/my_pipeline.py
仅缓存图像
首次处理包含图像的数据时,可以选择先运行缓存模式将图像写入 LMDB,后续处理直接读取缓存:
python run.py -c configs/my_pipeline.py --cache-images
注意: 预缓存不是必须的。只要配置了
cache_dir,图像会在 pipeline 运行过程中自动边跑边缓存。--cache-images模式适用于希望在正式处理前先完成缓存的场景。
断点续传
DataStudio 基于 checkpoint 自动保存处理进度。如果运行中断,直接重新运行相同命令即可从上次位置恢复:
# 中断后重新运行,自动恢复
python run.py -c configs/my_pipeline.py
输出结构
处理完成后,输出目录结构如下:
output/
├── dataset_a/ # 按 source 分目录
│ └── data.jsonl # 保留的数据
├── dataset_b/
│ ├── data.jsonl
│ └── rejected/ # 被过滤的数据
│ └── data.jsonl
├── mllm_processed.yaml # 保留数据的 YAML 索引
└── mllm_processed_rejected.yaml # 过滤数据的 YAML 索引
输出 YAML 可直接作为下游任务的输入配置。
8. 模型后端
DataStudio 通过 OpenAIAPI 调用任何兼容 OpenAI Chat Completions API 的服务:
model = dict(
type="OpenAIAPI",
model="Qwen3-VL-30B-A3B-Instruct", # 模型名称
api_base="http://127.0.0.1", # API 地址
port=8000, # 端口
key="sk-123456", # API 密钥
thread_num=512, # 最大并发数
return_dict=True, # True: JSON 输出; False: 纯文本输出
max_tokens=16384, # 最大输出 token 数
temperature=0.6,
retry=10, # 失败重试次数
timeout=(30, 1800), # (连接超时, 读取超时) 秒
)
对于 CPU 密集型场景(大量高分辨率图像编码),可使用 MPOpenAIAPI 通过多进程并行处理:
model = dict(
type="MPOpenAIAPI",
model="Qwen3-VL-30B-A3B-Instruct",
thread_num=512,
num_workers=4, # 进程数
)
MPOpenAIAPI 通过 fork 创建多个工作进程,利用 Copy-on-Write 共享预编码的消息数据,内存开销接近单进程。
9. MLLM 算子详解
RequestBuilder
RequestBuilder 控制如何将数据组装为模型请求、如何解析模型响应:
request_builder = dict(
type="RequestBuilder",
prompt="prompts/filter/question_image_consist_v2.txt", # 提示词:文件路径或直接文本
system_prompt=None, # 系统提示词(可选)
key_templates={ # 响应字段映射(JSON 模式)
"result": "q{idx}",
"reason": "q{idx}_reason",
},
with_image=True, # 是否包含图像
with_question=True, # 是否包含问题文本
with_answer=False, # 是否包含答案文本
with_original=False, # 是否包含原始答案(重写前)
)
提示词文件(.txt)中可以使用 {question}、{answer}、{ori_answer} 占位符,运行时自动替换。
key_templates:
设置为
dict时,模型需要返回 JSON(需配合return_dict=True),{idx}会被替换为 QA 对索引设置为
None时,模型返回纯文本,直接作为重写结果
MLLMFilter
MLLM 过滤器期望模型返回 JSON,格式为:
{"q0": true, "q0_reason": "答案与图像不一致"}
true 表示拒绝,false 表示保留。
dict(
type="MLLMFilter",
request_builder=dict(
type="RequestBuilder",
prompt="prompts/filter/question_image_consist_v2.txt",
key_templates={"result": "q{idx}", "reason": "q{idx}_reason"},
with_image=True,
with_question=True,
),
)
MLLMRewriter
MLLM 重写器可以工作在 JSON 模式或纯文本模式:
# JSON 模式:适合批量处理,配合结构化输出
dict(
type="MLLMRewriter",
request_builder=dict(
type="RequestBuilder",
prompt="你的重写提示词,或 .txt 文件路径",
key_templates={"answer": "q{idx}_answer"},
with_image=True,
),
rewrite_type="answer", # "answer" 或 "question"
)
# 纯文本模式:模型输出直接作为新答案
dict(
type="MLLMRewriter",
request_builder=dict(
type="RequestBuilder",
prompt=None, # 不使用模板,直接发送原始问题
key_templates=None, # 纯文本响应
with_image=True,
),
rewrite_type="answer",
)
SelectiveMLLMRewriter
仅对满足条件的 QA 对执行重写,减少不必要的 API 调用:
dict(
type="SelectiveMLLMRewriter",
request_builder=rewriter_request,
should_rewrite_fn="lambda q, a: len(a) < 100", # 仅重写短答案
)
10. 自定义算子
自定义过滤器
继承 Filter 类,实现 check 方法:
from datastudio.operators import Filter, DataItem
from datastudio.utils.registry import OPERATORS
from typing import Tuple
@OPERATORS.register_module()
class AnswerLengthFilter(Filter):
"""按答案长度过滤。"""
def __init__(self, min_length: int = 10, max_length: int = 10000, **kwargs):
super().__init__(**kwargs)
self.min_length = min_length
self.max_length = max_length
def check(self, item: DataItem, qa_idx: int) -> Tuple[bool, str]:
answer = item.get_answer(qa_idx)
length = len(answer)
if length < self.min_length:
return True, f"答案过短: {length} < {self.min_length}"
if length > self.max_length:
return True, f"答案过长: {length} > {self.max_length}"
return False, ""
check 方法的返回值:
(True, reason)— 拒绝该 QA 对(False, "")— 保留该 QA 对
自定义重写器
继承 Rewriter 类,实现 rewrite 方法:
from datastudio.operators import Rewriter, DataItem
from datastudio.utils.registry import OPERATORS
from typing import Optional
@OPERATORS.register_module()
class CleanWhitespaceRewriter(Rewriter):
"""清理答案中的多余空白字符。"""
def rewrite(self, item: DataItem, qa_idx: int) -> Optional[str]:
answer = item.get_answer(qa_idx)
cleaned = " ".join(answer.split())
return cleaned if cleaned != answer else None
rewrite 方法的返回值:
返回新答案字符串 — 替换原答案(原答案自动保存到
ori_answer)返回
None— 不修改
然后在配置文件中使用:dict(type="AnswerLengthFilter", min_length=10)
11. 性能调优
LMDB 图像缓存
处理带图像的数据时,启用 LMDB 缓存可以避免重复读取和解码图像:
dataloader = dict(
cache_dir="~/cache/images_lmdb", # 缓存目录
use_lmdb_cache=True, # 默认开启
resize_image_size=2048, # 缓存时的最大边长
)
首次运行可先缓存图像(非必须,pipeline 运行时也会自动边跑边缓存):
python run.py -c my_config.py --cache-images
并发调优
thread_num 控制同时发往推理服务的最大请求数:
model = dict(
thread_num=512, # 根据推理服务的承载能力调整
)
参考值:
单卡 A100:256-1024
多卡 / 大集群:2048-8192
批量大小
batch_size 控制每批读取的数据量。较大的批次可以提高 MLLM 算子的吞吐效率:
dataloader = dict(
batch_size=5000, # 推荐范围:1000-50000
)
12. 完整示例
以下是一个包含规则过滤 + MLLM 过滤 + MLLM 重写 + 一致性检查 + 格式规范化的完整配置,与实际的 configs/examples/honeypipe.py 保持一致。
更多可直接运行的示例请参考 示例指南,包含 5 个配置和内置演示数据。
# configs/honeypipe.py
_base_ = [
"@/_base_/models/local_api_model.py",
"@/_base_/dataset.py",
]
work_dir = "./work_dirs/honeypipe"
logger = dict(type="Logger", log_file="logs/honeypipe.log")
dataset_yaml = "/path/to/dataset.yaml"
dataloader = dict(
dataset=dataset_yaml,
batch_size=5000,
use_image=True,
cache_dir="./cache/images_lmdb",
resize_image_size=2048,
)
datasaver = dict(
dataset=dataset_yaml,
output_dir="./output/full",
save_yaml_name="full_processed",
)
model = dict(
model="Qwen3-VL-30B-A3B-Instruct",
api_base="http://127.0.0.1",
port=8000,
thread_num=1024,
return_dict=True,
max_tokens=4096,
)
# --- 阶段 1:规则过滤(priority=0,最先执行)---
rule_filters = dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="ConvLengthFilter", min_length=1, max_length=20),
dict(type="ImageSizeFilter", min_size=14),
dict(type="ImageAspectRatioFilter", max_aspect_ratio=200),
dict(type="ImageExtFilter"),
dict(type="LengthAnomalyFilter", min_length=2, max_length=4096,
check_question=True, check_answer=True, use_tokenizer=True),
dict(type="TextRepeatFilter", check_question=True, check_answer=True),
]),
priority=0,
)
# --- 阶段 2:MLLM 过滤(priority=10)---
mllm_filter = dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="MLLMFilter", request_builder=dict(
type="RequestBuilder",
prompt="prompts/filter/question_image_consist_v2.txt",
key_templates={"result": "q{idx}", "reason": "q{idx}_reason"},
with_image=True,
with_question=True,
)),
]),
priority=10,
)
# --- 阶段 3:MLLM 重写(priority=20)---
# 重写器使用纯文本模式(return_dict=False),需要独立覆盖 model
mllm_rewriter_model = dict(
type="OpenAIAPI",
model="Qwen3-VL-30B-A3B-Instruct",
api_base="http://127.0.0.1",
port=8000,
thread_num=1024,
return_dict=False, # 纯文本输出,用于重写
max_tokens=4096,
)
mllm_rewriter = dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="MLLMRewriter",
model=mllm_rewriter_model,
request_builder=dict(
type="RequestBuilder",
prompt=None,
key_templates=None,
with_image=True,
with_question=True,
),
rewrite_type="answer",
),
]),
priority=20,
)
# --- 阶段 4:一致性检查(priority=30)---
# 重写后比较新旧答案,过滤不一致的样本
consist_filter = dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="MLLMFilter", request_builder=dict(
type="RequestBuilder",
prompt="prompts/filter/compare_answer_v2.txt",
key_templates={"result": "q{idx}", "reason": "q{idx}_reason"},
with_image=True,
with_answer=True,
with_original=True,
)),
]),
priority=30,
)
# --- 阶段 5:格式规范化(priority=40,最后执行)---
norm_rewriters = dict(
cfg=dict(type="SubPipeline", operators=[
dict(type="RemoveThinkRewriter"),
dict(type="NormImageTagRewriter"),
]),
priority=40,
)
pipeline = dict(
type="Pipeline",
operations={
"rule_filters": rule_filters,
"mllm_filter": mllm_filter,
"mllm_rewriter": mllm_rewriter,
"consist_filter": consist_filter,
"norm_rewriters": norm_rewriters,
}
)
运行:
python run.py -c configs/honeypipe.py
13. 部署推理服务
DataStudio 兼容任何 OpenAI Chat Completions API 格式的服务。推荐使用 vLLM 或 SGLang 部署:
# vLLM 示例
python -m vllm.entrypoints.openai.api_server \
--model Qwen/Qwen3-VL-30B-A3B-Instruct \
--port 8000 \
--tensor-parallel-size 4
# SGLang 示例
python -m sglang.launch_server \
--model Qwen/Qwen3-VL-30B-A3B-Instruct \
--port 8000 \
--tp 4
14. 辅助工具
DataStudio 在 tools/ 目录下提供了两个辅助工具:
LLMRouter
OpenAI API 兼容的反向代理路由器,用于管理多个 LLM 后端。核心特性:
负载均衡:支持加权随机、最少连接数(P2C)、最少等待(P2C + Prometheus)
RPM 限流:滑动窗口计数,按后端独立控制
自动故障转移:异步健康检查,连续失败自动标记不健康
配置热加载:监听 YAML 配置变更,增量更新后端
cd tools/LLMRouter
go build -o llmrouter ./cmd
./llmrouter -config config.yaml
DataVis
多模态数据可视化与分析平台(React + FastAPI):
分页浏览和搜索处理后的数据
按来源、语言、是否含图片、对话轮次、是否被拒绝等多维度过滤
统计分析:token 分布、对话轮次分布、图片数量分布、消息长度分布
支持 JSONL / JSON / YAML 格式数据文件
cd tools/DataVis
bash start.sh