Models 模块
API model backends for MLLM inference.
Provides OpenAIAPI (async single-process) and MPOpenAIAPI
(multiprocessing) wrappers for OpenAI-compatible endpoints.
- class datastudio.models.OpenAIAPI[source]
Bases:
BaseAPIAsync OpenAI-compatible API wrapper.
Uses a shared
aiohttp.ClientSessionwith connection pooling andasyncio.Semaphorefor concurrency control. Each request handles retries independently with exponential backoff.- __init__(model, key=None, temperature=0.6, top_p=0.95, top_k=20, min_p=0.0, presence_penalty=1.0, repetition_penalty=1.0, api_base=None, port=None, retry=10, wait=3, timeout=(30, 1800), max_tokens=16384, thread_num=8192, return_dict=False, logger=None, max_connections=None, enable_thinking=False, **kwargs)[source]
Initialize OpenAI-compatible API wrapper.
- Parameters:
model (
str) – Model name, e.g., gpt-4, Qwen2-VL-72B.key (
str) – API key (uses OPENAI_API_KEY env var if not provided).temperature (
float) – Generation temperature.top_p (
float) – Nucleus sampling threshold (0~1).top_k (
int) – Top-K sampling (0 to disable). vLLM/SGLang extra param.min_p (
float) – Minimum probability threshold. vLLM/SGLang extra param.presence_penalty (
float) – Penalize tokens already present in the output.repetition_penalty (
float) – Penalize repeated tokens. vLLM/SGLang extra param.api_base (
str) – API base URL (full URL to chat/completions endpoint).port (
int) – Port number for local deployments.retry (
int) – Number of retry attempts on API failure.wait (
int) – Max wait time between retries (seconds).timeout (
tuple) – Request timeout as (connect_timeout, read_timeout) tuple.max_tokens (
int) – Maximum tokens in response.thread_num (
int) – Maximum number of concurrent requests (semaphore limit).return_dict (
bool) – Whether to parse response as dict via load_str_to_dict.logger – Logger instance.
max_connections (
int) – Max TCP connections in the pool. Defaults to min(thread_num, 16384).enable_thinking (
bool) – Whether to enable thinking mode for models that support it (e.g., Qwen3 on vLLM/SGLang). Default False.**kwargs – Additional API parameters passed to payload.
- generate(messages, **kwargs)[source]
Main method to generate responses.
Uses asyncio.run to execute the async generation pipeline. Pre-encodes images before async processing.
- class datastudio.models.MPOpenAIAPI[source]
Bases:
BaseAPIMultiprocessing OpenAI-compatible API using fork + COW shared memory.
Distributes requests across multiple worker processes, each running an independent
OpenAIAPIwith its own async event loop.Designed for workloads where single-process async saturates CPU on image encoding / JSON serialization, total request count is large, and payload size makes pickle-based IPC infeasible.
- Memory model:
Pre-encoded messages are stored in a module-level global before
fork(). Workers inherit the parent’s address space via Copy-on-Write and only read shared data, so N workers ≈ 1x memory.
- __init__(model, key=None, temperature=0.6, top_p=0.95, top_k=20, min_p=0.0, presence_penalty=1.0, repetition_penalty=1.0, api_base=None, port=None, retry=10, wait=3, timeout=(30, 1800), max_tokens=16384, thread_num=8192, return_dict=False, logger=None, max_connections=None, num_workers=None, worker_concurrency=None, enable_thinking=False, **kwargs)[source]
Initialize multiprocessing OpenAI API wrapper.
- Parameters:
model (
str) – Model name, e.g., gpt-4, Qwen2-VL-72B.key (
str) – API key (uses OPENAI_API_KEY env var if not provided).temperature (
float) – Generation temperature.top_p (
float) – Nucleus sampling threshold (0~1).top_k (
int) – Top-K sampling (0 to disable). vLLM/SGLang extra param.min_p (
float) – Minimum probability threshold. vLLM/SGLang extra param.presence_penalty (
float) – Penalize tokens already present in the output.repetition_penalty (
float) – Penalize repeated tokens. vLLM/SGLang extra param.api_base (
str) – API base URL (full URL to chat/completions endpoint).port (
int) – Port number for local deployments.retry (
int) – Number of retry attempts on API failure.wait (
int) – Max wait time between retries (seconds).timeout (
tuple) – Request timeout as (connect_timeout, read_timeout) tuple.max_tokens (
int) – Maximum tokens in response.thread_num (
int) – Total maximum concurrent requests across all workers.return_dict (
bool) – Whether to parse response as dict.logger – Logger instance.
max_connections (
int) – Max TCP connections per worker. Defaults to min(worker_concurrency, 16384).num_workers (
int) – Number of worker processes. Defaults to min(cpu_count, 8). Set based on available CPU cores.worker_concurrency (
int) – Async concurrency limit per worker. Defaults to thread_num // num_workers.enable_thinking (
bool) – Whether to enable thinking mode for models that support it (e.g., Qwen3 on vLLM/SGLang). Default False.**kwargs – Additional API parameters passed to payload.
Base API class for model inference.
Provides common functionality shared by all API backends: message preprocessing, image encoding, retry logic, and parallel request processing.
- class datastudio.models.base.BaseAPI[source]
Bases:
objectBase API class providing common functionality for all API models.
This class handles: - Message preprocessing and formatting - Image encoding (base64) - Retry logic with exponential backoff - Parallel request processing
- allowed_types = ['text', 'image', 'image_url', 'system']
- __init__(retry=10, wait=5, timeout=(30, 1800), logger=None, thread_num=384, fail_msg='Failed to obtain answer via API.', openai_format=True, image_key_name='image', return_dict=True, use_system_proxy=False, **kwargs)[source]
Initialize the base API.
- Parameters:
retry – Number of retry attempts on API failure.
wait – Wait time between retries (seconds).
timeout – Request timeout as (connect_timeout, read_timeout) tuple.
logger – Logger instance.
thread_num – Number of parallel threads.
fail_msg – Message returned on failure.
openai_format – Whether to use OpenAI message format.
image_key_name – Key name for image data.
return_dict – Whether to parse response as dict.
use_system_proxy – Whether to use system proxy.
**kwargs – Additional arguments passed to generate_inner.
- abstractmethod generate_inner(inputs, **kwargs)[source]
Generate response for given inputs. Must be implemented by subclasses.
- encode_image_directly(img_item)[source]
Encode image directly without caching.
- Parameters:
img_item – PIL Image object or other image data.
- Returns:
Base64 encoded string or original data.
- check_content(msgs)[source]
Check input content type.
- Parameters:
msgs – Raw input messages.
- Returns:
Message type (str/dict/liststr/listdict).
- Return type:
- preproc_content(inputs)[source]
Convert raw input messages to unified dict list format.
- Parameters:
inputs – Raw input.
- Returns:
Processed input message list.
- Return type:
- prepare_inputs(inputs)[source]
Prepare message format for API interface.
- Parameters:
inputs – Input message list.
- Returns:
(formatted_messages, text_content).
- Return type:
- process_single_message(message, all_kwargs)[source]
Process a single message with retry logic.
- Parameters:
message – Single processed message.
all_kwargs – Arguments passed to generate_inner.
- Returns:
Response content or fail_msg on failure.
- pre_process(msg)[source]
Validate and preprocess raw input messages into API-ready format.
- Parameters:
msg – Raw input message (str, dict, or list).
- Returns:
Preprocessed message list in OpenAI format.
- Return type:
- Raises:
AssertionError – If input type is unsupported.
Async OpenAI-compatible API wrapper.
Uses aiohttp with semaphore-based concurrency control and connection pooling to handle high-throughput batch inference against OpenAI-compatible endpoints.
- class datastudio.models.openai_api.OpenAIAPI[source]
Bases:
BaseAPIAsync OpenAI-compatible API wrapper.
Uses a shared
aiohttp.ClientSessionwith connection pooling andasyncio.Semaphorefor concurrency control. Each request handles retries independently with exponential backoff.- __init__(model, key=None, temperature=0.6, top_p=0.95, top_k=20, min_p=0.0, presence_penalty=1.0, repetition_penalty=1.0, api_base=None, port=None, retry=10, wait=3, timeout=(30, 1800), max_tokens=16384, thread_num=8192, return_dict=False, logger=None, max_connections=None, enable_thinking=False, **kwargs)[source]
Initialize OpenAI-compatible API wrapper.
- Parameters:
model (
str) – Model name, e.g., gpt-4, Qwen2-VL-72B.key (
str) – API key (uses OPENAI_API_KEY env var if not provided).temperature (
float) – Generation temperature.top_p (
float) – Nucleus sampling threshold (0~1).top_k (
int) – Top-K sampling (0 to disable). vLLM/SGLang extra param.min_p (
float) – Minimum probability threshold. vLLM/SGLang extra param.presence_penalty (
float) – Penalize tokens already present in the output.repetition_penalty (
float) – Penalize repeated tokens. vLLM/SGLang extra param.api_base (
str) – API base URL (full URL to chat/completions endpoint).port (
int) – Port number for local deployments.retry (
int) – Number of retry attempts on API failure.wait (
int) – Max wait time between retries (seconds).timeout (
tuple) – Request timeout as (connect_timeout, read_timeout) tuple.max_tokens (
int) – Maximum tokens in response.thread_num (
int) – Maximum number of concurrent requests (semaphore limit).return_dict (
bool) – Whether to parse response as dict via load_str_to_dict.logger – Logger instance.
max_connections (
int) – Max TCP connections in the pool. Defaults to min(thread_num, 16384).enable_thinking (
bool) – Whether to enable thinking mode for models that support it (e.g., Qwen3 on vLLM/SGLang). Default False.**kwargs – Additional API parameters passed to payload.
- generate(messages, **kwargs)[source]
Main method to generate responses.
Uses asyncio.run to execute the async generation pipeline. Pre-encodes images before async processing.