Models 模块

API model backends for MLLM inference.

Provides OpenAIAPI (async single-process) and MPOpenAIAPI (multiprocessing) wrappers for OpenAI-compatible endpoints.

class datastudio.models.OpenAIAPI[source]

Bases: BaseAPI

Async OpenAI-compatible API wrapper.

Uses a shared aiohttp.ClientSession with connection pooling and asyncio.Semaphore for concurrency control. Each request handles retries independently with exponential backoff.

__init__(model, key=None, temperature=0.6, top_p=0.95, top_k=20, min_p=0.0, presence_penalty=1.0, repetition_penalty=1.0, api_base=None, port=None, retry=10, wait=3, timeout=(30, 1800), max_tokens=16384, thread_num=8192, return_dict=False, logger=None, max_connections=None, enable_thinking=False, **kwargs)[source]

Initialize OpenAI-compatible API wrapper.

Parameters:
  • model (str) – Model name, e.g., gpt-4, Qwen2-VL-72B.

  • key (str) – API key (uses OPENAI_API_KEY env var if not provided).

  • temperature (float) – Generation temperature.

  • top_p (float) – Nucleus sampling threshold (0~1).

  • top_k (int) – Top-K sampling (0 to disable). vLLM/SGLang extra param.

  • min_p (float) – Minimum probability threshold. vLLM/SGLang extra param.

  • presence_penalty (float) – Penalize tokens already present in the output.

  • repetition_penalty (float) – Penalize repeated tokens. vLLM/SGLang extra param.

  • api_base (str) – API base URL (full URL to chat/completions endpoint).

  • port (int) – Port number for local deployments.

  • retry (int) – Number of retry attempts on API failure.

  • wait (int) – Max wait time between retries (seconds).

  • timeout (tuple) – Request timeout as (connect_timeout, read_timeout) tuple.

  • max_tokens (int) – Maximum tokens in response.

  • thread_num (int) – Maximum number of concurrent requests (semaphore limit).

  • return_dict (bool) – Whether to parse response as dict via load_str_to_dict.

  • logger – Logger instance.

  • max_connections (int) – Max TCP connections in the pool. Defaults to min(thread_num, 16384).

  • enable_thinking (bool) – Whether to enable thinking mode for models that support it (e.g., Qwen3 on vLLM/SGLang). Default False.

  • **kwargs – Additional API parameters passed to payload.

generate(messages, **kwargs)[source]

Main method to generate responses.

Uses asyncio.run to execute the async generation pipeline. Pre-encodes images before async processing.

Parameters:
  • messages (List[Any]) – List of input messages.

  • **kwargs – Additional generation arguments.

Return type:

List[str]

Returns:

List of response strings, in same order as input.

generate_inner(inputs, **kwargs)[source]

Sync version of generate for single request (BaseAPI compatibility).

Parameters:
  • inputs – Preprocessed message list (OpenAI format).

  • **kwargs – Additional parameters.

Returns:

Tuple of (ret_code, answer, response).

shutdown()[source]

Shutdown and release resources.

Session is created and destroyed per generate() call.

class datastudio.models.MPOpenAIAPI[source]

Bases: BaseAPI

Multiprocessing OpenAI-compatible API using fork + COW shared memory.

Distributes requests across multiple worker processes, each running an independent OpenAIAPI with its own async event loop.

Designed for workloads where single-process async saturates CPU on image encoding / JSON serialization, total request count is large, and payload size makes pickle-based IPC infeasible.

Memory model:

Pre-encoded messages are stored in a module-level global before fork(). Workers inherit the parent’s address space via Copy-on-Write and only read shared data, so N workers ≈ 1x memory.

__init__(model, key=None, temperature=0.6, top_p=0.95, top_k=20, min_p=0.0, presence_penalty=1.0, repetition_penalty=1.0, api_base=None, port=None, retry=10, wait=3, timeout=(30, 1800), max_tokens=16384, thread_num=8192, return_dict=False, logger=None, max_connections=None, num_workers=None, worker_concurrency=None, enable_thinking=False, **kwargs)[source]

Initialize multiprocessing OpenAI API wrapper.

Parameters:
  • model (str) – Model name, e.g., gpt-4, Qwen2-VL-72B.

  • key (str) – API key (uses OPENAI_API_KEY env var if not provided).

  • temperature (float) – Generation temperature.

  • top_p (float) – Nucleus sampling threshold (0~1).

  • top_k (int) – Top-K sampling (0 to disable). vLLM/SGLang extra param.

  • min_p (float) – Minimum probability threshold. vLLM/SGLang extra param.

  • presence_penalty (float) – Penalize tokens already present in the output.

  • repetition_penalty (float) – Penalize repeated tokens. vLLM/SGLang extra param.

  • api_base (str) – API base URL (full URL to chat/completions endpoint).

  • port (int) – Port number for local deployments.

  • retry (int) – Number of retry attempts on API failure.

  • wait (int) – Max wait time between retries (seconds).

  • timeout (tuple) – Request timeout as (connect_timeout, read_timeout) tuple.

  • max_tokens (int) – Maximum tokens in response.

  • thread_num (int) – Total maximum concurrent requests across all workers.

  • return_dict (bool) – Whether to parse response as dict.

  • logger – Logger instance.

  • max_connections (int) – Max TCP connections per worker. Defaults to min(worker_concurrency, 16384).

  • num_workers (int) – Number of worker processes. Defaults to min(cpu_count, 8). Set based on available CPU cores.

  • worker_concurrency (int) – Async concurrency limit per worker. Defaults to thread_num // num_workers.

  • enable_thinking (bool) – Whether to enable thinking mode for models that support it (e.g., Qwen3 on vLLM/SGLang). Default False.

  • **kwargs – Additional API parameters passed to payload.

generate(messages, **kwargs)[source]

Generate responses using fork-based multiprocessing.

Parameters:
  • messages (List[Any]) – List of input messages.

  • **kwargs – Additional generation arguments.

Return type:

List[str]

Returns:

List of response strings, in same order as input.

generate_inner(inputs, **kwargs)[source]

Sync version for single request (BaseAPI compatibility).

Delegates to a temporary OpenAIAPI instance.

Parameters:
  • inputs – Preprocessed message list (OpenAI format).

  • **kwargs – Additional parameters.

Returns:

Tuple of (ret_code, answer, response).

shutdown()[source]

Shutdown and release resources.

Workers are ephemeral (created per generate() call), nothing to clean up.

Base API class for model inference.

Provides common functionality shared by all API backends: message preprocessing, image encoding, retry logic, and parallel request processing.

class datastudio.models.base.BaseAPI[source]

Bases: object

Base API class providing common functionality for all API models.

This class handles: - Message preprocessing and formatting - Image encoding (base64) - Retry logic with exponential backoff - Parallel request processing

allowed_types = ['text', 'image', 'image_url', 'system']
__init__(retry=10, wait=5, timeout=(30, 1800), logger=None, thread_num=384, fail_msg='Failed to obtain answer via API.', openai_format=True, image_key_name='image', return_dict=True, use_system_proxy=False, **kwargs)[source]

Initialize the base API.

Parameters:
  • retry – Number of retry attempts on API failure.

  • wait – Wait time between retries (seconds).

  • timeout – Request timeout as (connect_timeout, read_timeout) tuple.

  • logger – Logger instance.

  • thread_num – Number of parallel threads.

  • fail_msg – Message returned on failure.

  • openai_format – Whether to use OpenAI message format.

  • image_key_name – Key name for image data.

  • return_dict – Whether to parse response as dict.

  • use_system_proxy – Whether to use system proxy.

  • **kwargs – Additional arguments passed to generate_inner.

abstractmethod generate_inner(inputs, **kwargs)[source]

Generate response for given inputs. Must be implemented by subclasses.

encode_image_directly(img_item)[source]

Encode image directly without caching.

Parameters:

img_item – PIL Image object or other image data.

Returns:

Base64 encoded string or original data.

check_content(msgs)[source]

Check input content type.

Parameters:

msgs – Raw input messages.

Returns:

Message type (str/dict/liststr/listdict).

Return type:

str

preproc_content(inputs)[source]

Convert raw input messages to unified dict list format.

Parameters:

inputs – Raw input.

Returns:

Processed input message list.

Return type:

list

prepare_inputs(inputs)[source]

Prepare message format for API interface.

Parameters:

inputs – Input message list.

Returns:

(formatted_messages, text_content).

Return type:

tuple

process_single_message(message, all_kwargs)[source]

Process a single message with retry logic.

Parameters:
  • message – Single processed message.

  • all_kwargs – Arguments passed to generate_inner.

Returns:

Response content or fail_msg on failure.

pre_process(msg)[source]

Validate and preprocess raw input messages into API-ready format.

Parameters:

msg – Raw input message (str, dict, or list).

Returns:

Preprocessed message list in OpenAI format.

Return type:

list

Raises:

AssertionError – If input type is unsupported.

generate(messages, **kwargs)[source]

Main method to generate responses.

Parameters:
  • messages – Input messages (list of message dicts).

  • **kwargs – Additional arguments.

Returns:

Generated responses for each message.

Return type:

list

shutdown()[source]

Shutdown and release resources.

Async OpenAI-compatible API wrapper.

Uses aiohttp with semaphore-based concurrency control and connection pooling to handle high-throughput batch inference against OpenAI-compatible endpoints.

class datastudio.models.openai_api.OpenAIAPI[source]

Bases: BaseAPI

Async OpenAI-compatible API wrapper.

Uses a shared aiohttp.ClientSession with connection pooling and asyncio.Semaphore for concurrency control. Each request handles retries independently with exponential backoff.

__init__(model, key=None, temperature=0.6, top_p=0.95, top_k=20, min_p=0.0, presence_penalty=1.0, repetition_penalty=1.0, api_base=None, port=None, retry=10, wait=3, timeout=(30, 1800), max_tokens=16384, thread_num=8192, return_dict=False, logger=None, max_connections=None, enable_thinking=False, **kwargs)[source]

Initialize OpenAI-compatible API wrapper.

Parameters:
  • model (str) – Model name, e.g., gpt-4, Qwen2-VL-72B.

  • key (str) – API key (uses OPENAI_API_KEY env var if not provided).

  • temperature (float) – Generation temperature.

  • top_p (float) – Nucleus sampling threshold (0~1).

  • top_k (int) – Top-K sampling (0 to disable). vLLM/SGLang extra param.

  • min_p (float) – Minimum probability threshold. vLLM/SGLang extra param.

  • presence_penalty (float) – Penalize tokens already present in the output.

  • repetition_penalty (float) – Penalize repeated tokens. vLLM/SGLang extra param.

  • api_base (str) – API base URL (full URL to chat/completions endpoint).

  • port (int) – Port number for local deployments.

  • retry (int) – Number of retry attempts on API failure.

  • wait (int) – Max wait time between retries (seconds).

  • timeout (tuple) – Request timeout as (connect_timeout, read_timeout) tuple.

  • max_tokens (int) – Maximum tokens in response.

  • thread_num (int) – Maximum number of concurrent requests (semaphore limit).

  • return_dict (bool) – Whether to parse response as dict via load_str_to_dict.

  • logger – Logger instance.

  • max_connections (int) – Max TCP connections in the pool. Defaults to min(thread_num, 16384).

  • enable_thinking (bool) – Whether to enable thinking mode for models that support it (e.g., Qwen3 on vLLM/SGLang). Default False.

  • **kwargs – Additional API parameters passed to payload.

generate(messages, **kwargs)[source]

Main method to generate responses.

Uses asyncio.run to execute the async generation pipeline. Pre-encodes images before async processing.

Parameters:
  • messages (List[Any]) – List of input messages.

  • **kwargs – Additional generation arguments.

Return type:

List[str]

Returns:

List of response strings, in same order as input.

generate_inner(inputs, **kwargs)[source]

Sync version of generate for single request (BaseAPI compatibility).

Parameters:
  • inputs – Preprocessed message list (OpenAI format).

  • **kwargs – Additional parameters.

Returns:

Tuple of (ret_code, answer, response).

shutdown()[source]

Shutdown and release resources.

Session is created and destroyed per generate() call.