Ascend-SACT/Kimi-K2.6-vllm-ascend-x86
模型介绍文件和版本Pull Requests讨论分析
下载使用量0

Kimi-K2.6 在 vLLM-Ascend 0.18.0rc1 上的部署指南

1.项目概述

Moonshot AI 在 Kimi-K2.5 发布 3 个月后,紧接着推出了 Kimi-K2.6 版本。该版本与 K2.5 的模型参数及权重文件大小完全相同。K2.6 对底层训练算法和专家路由机制进行了优化,推理运行更为稳定。其智能体调度能力得到大幅提升,支持更多子智能体并行工作,长任务执行上限和运行效率均显著提高。模型的代码开发能力进步明显,长周期工程编码、多语言程序编写能力有所增强,多项编程评测指标也随之提升。同时,通用逻辑推理和复杂问题解答能力得到优化,长上下文信息处理不易出现偏差。在多模态方面,重点优化了图文与代码的融合能力,整体部署兼容性保持不变,可适配现有推理框架。

本文详细介绍了最新的 Kimi-K2.6 模型基于 2026 年 4 月最新发布的 vllm/vllm-ascend 0.18 版本以及对应的 docker 统一镜像 3.5 版本,在 Ascend+X86 硬件环境下基于 vLLM 推理引擎的部署适配指南。具体适配过程与 Kimi-2.5 版本一致,只需更新一个 vllm 的文件即可。

2.运行环境

1). 硬件环境

硬件配置规格要求
NPUAscend 910B2
CPUX86 架构
NPU 卡数16 张
适配模型Kimi-K2.5

2). 镜像与软件环境

统一镜像:ascend-docker 3.5 最新版 核心软件版本

软件名版本号
CANN8.5.1
Python3.11.15
torch2.9.0+cpu
torch_npu2.9.0
transformers5.3.0
vllm0.18.0+empty
vllm_ascend0.18.0rc1

3.拉取统一镜像(A2平台)

docker pull swr.cn-north-4.myhuaweicloud.com/ascend-sact/ascend-a2-ubuntu:latest

4.模型权重下载

modelscope download --model moonshotai/Kimi-K2.6  --local_dir xx/Kimi-K2.6

5.检查vllm和vllm-ascend版本

conda activate ascend-infer
pip list | grep vllm

查询结果vllm和vllm_ascend为以下版本即可

软件版本安装路径
vllm0.18.0+empty/workspace/vllm
vllm_ascend0.18.0rc1/workspace/vllm-ascend

说明:vllm与vllm_ascend版本必须严格对应,官方已适配Kimi-K2.5-W4A8版,但Kimi-K2.5原生模型仅需小幅修改即可适配;低版本vllm_ascend需修改较多配置文件,推荐优先使用vllm与vllm_ascend 0.18版本。

6.适配补丁kimi_k25.py

针对Ascend+X86环境,vllm/vllm-ascend 0.18已适配kimi-k2.5-w4a8量化版本,而对于kimi-k2.5或kimi-k2.6原生版本,启动会出现如下报错: NotImplementedError: No compressed-tensors compatible scheme was found for quant_type=W4A16, layer_type=linear. 其原因是模型config.json中没有包含针对kimi-k2.5或kimi-k2.6原生版本的相应配置。

需要针对vllm 0.18版本的Kimi-K2.5和Kimi-K2.6模型对配置文件kimi_k25.py进行修改适配,增加对应的W4A16的定义和声明。

在统一镜像3.5中已默认安装vllm0.18版本,路径为/workspace/vllm, 需按照以下方案修改/workspace/vllm/vllm/model_executor/models/kimi_k25.py文件, 请注意文件名均为小写。

方案一:手工按以下两步修改

1. 添加导入(在第29行后)

# Import Ascend-specific quantization config if available
try:
    from vllm_ascend.quantization.compressed_tensors_config import AscendCompressedTensorsConfig
    _HAS_ASCEND_QUANT = True
except ImportError:
    AscendCompressedTensorsConfig = None
    _HAS_ASCEND_QUANT = False

2. 修改_maybe_ignore_quant_config方法(第405-408行)

def _maybe_ignore_quant_config(self, quant_config: QuantizationConfig):
    if isinstance(quant_config, CompressedTensorsConfig):
        return None
    # Handle Ascend-specific compressed tensors config
    try:
        from vllm_ascend.quantization.compressed_tensors_config import AscendCompressedTensorsConfig
        if isinstance(quant_config, AscendCompressedTensorsConfig):
            return None
    except ImportError:
        pass
    return quant_config

方案二:用以下代码完整替换kimi_k25.py

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# ruff: noqa: E501
"""
Kimi-K2.5 Model Implementation for vLLM.

Kimi-K2.5 extends Kimi-K2 with vision support

This module defines:
- KimiK25ProcessingInfo/KimiK25MultiModalProcessor: Processing logic
- KimiK25ForConditionalGeneration: Main model class
"""

from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass
from typing import Annotated, Any, Literal

import torch
from torch import nn
from transformers import BatchFeature
from transformers.processing_utils import ProcessorMixin

from vllm.config import VllmConfig
from vllm.config.multimodal import BaseDummyOptions
from vllm.logger import init_logger
from vllm.model_executor.layers.quantization import QuantizationConfig
from vllm.model_executor.layers.quantization.compressed_tensors.compressed_tensors import (
    CompressedTensorsConfig,
)
# Import Ascend-specific quantization config if available
try:
    from vllm_ascend.quantization.compressed_tensors_config import AscendCompressedTensorsConfig
    _HAS_ASCEND_QUANT = True
except ImportError:
    AscendCompressedTensorsConfig = None
    _HAS_ASCEND_QUANT = False

from vllm.model_executor.models.interfaces import (
    SupportsEagle,
    SupportsEagle3,
    SupportsMultiModal,
    SupportsPP,
    SupportsQuant,
)
from vllm.model_executor.models.kimi_k25_vit import (
    KimiK25MultiModalProjector,
    MoonViT3dPretrainedModel,
    vision_tower_forward,
)
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.multimodal.inputs import (
    MultiModalDataDict,
    MultiModalFieldConfig,
    MultiModalKwargsItems,
    NestedTensors,
    VisionChunk,
    VisionChunkImage,
    VisionChunkVideo,
)
from vllm.multimodal.parse import MultiModalDataItems, VisionChunkProcessorItems
from vllm.multimodal.processing import (
    BaseDummyInputsBuilder,
    BaseMultiModalProcessor,
    BaseProcessingInfo,
    InputProcessingContext,
    PromptReplacement,
    PromptUpdate,
)
from vllm.platforms import current_platform
from vllm.sequence import IntermediateTensors
from vllm.transformers_utils.configs import KimiK25Config
from vllm.transformers_utils.processor import cached_get_image_processor
from vllm.utils.tensor_schema import TensorSchema, TensorShape

from .utils import (
    AutoWeightsLoader,
    WeightsMapper,
    init_vllm_registered_model,
    maybe_prefix,
)

logger = init_logger(__name__)


# Dummy input dimensions for profiling.
@dataclass
class MaxImageTokenMeta:
    width: int = 3000
    height: int = 3000


class KimiK25MediaPixelInputs(TensorSchema):
    """
    Media input schema for K2-VL model.

    Dimensions:
        - np: Number of patches (flattened from all media items)
        - ps: Patch size
        - nm: Number of media items
    """

    type: Literal["pixel_values"] = "pixel_values"

    pixel_values: Annotated[
        torch.Tensor | list[torch.Tensor],
        TensorShape("np", 3, "ps", "ps"),
    ]

    grid_thws: Annotated[torch.Tensor, TensorShape("nm", 3)]


class MoonshotKimiVAutoProcessor(ProcessorMixin):
    attributes = ["tokenizer"]
    tokenizer_class = "AutoTokenizer"

    def __init__(
        self, media_processor=None, tokenizer=None, media_token_id: int | None = None
    ):
        super().__init__(tokenizer)
        self.media_processor = media_processor
        self.media_token_id = media_token_id
        assert self.media_token_id is not None

    # We do not support str input for text here
    def __call__(
        self,
        vision_chunks: list[VisionChunk] | None = None,
        *,
        text: list[int] | str,
        **kwargs,
    ) -> BatchFeature:
        """
        Args:
            vision_chunks: List of VisionChunk items to be processed.
                For image: VisionChunkImage with type='image', image=PIL.Image
                For video_chunk: VisionChunkVideo with type='video_chunk', video_chunk=list[PIL.Image]
            text: The token ids to be fed to a model (required).
        Returns:
            [`BatchFeature`]: A [`BatchFeature`] with the following fields:

            - **input_ids** -- list of token ids to be fed to a model.
            - **pixel_values** -- Pixel values to be fed to a model. Returned when `vision_chunks` is not `None`.
            - **grid_thws** -- list of image 3D grid in LLM. Returned when `vision_chunks` is not `None`.
        """
        mm_inputs = {}
        input_ids = self.tokenizer.encode(text) if isinstance(text, str) else text
        if vision_chunks is not None:
            assert isinstance(vision_chunks, list)
            mm_inputs = self.media_processor.preprocess(vision_chunks)

            num_tokens_per_chunk = [
                self.media_processor.media_tokens_calculator(chunk)
                for chunk in vision_chunks
            ]

            new_input_ids = []
            for token in input_ids:
                if token == self.media_token_id:
                    new_input_ids.extend(
                        [self.media_token_id] * num_tokens_per_chunk.pop(0)
                    )
                else:
                    new_input_ids.append(token)
            input_ids = new_input_ids

        # XXX: _apply_hf_processor_text_mm will call tolist() on input_ids
        return BatchFeature(
            data={
                "input_ids": torch.tensor([input_ids]),
                **mm_inputs,
            }
        )


class KimiK25ProcessingInfo(BaseProcessingInfo):
    """Processing information for Kimi-K2.5 model.

    Provides configuration and utilities for processing both
    images and video-chunks.
    """

    def __init__(self, ctx: InputProcessingContext) -> None:
        super().__init__(ctx)
        self.hf_config = self.get_hf_config()
        self.media_token_id = self.hf_config.media_placeholder_token_id
        media_processor = cached_get_image_processor(
            self.ctx.model_config.model,
            trust_remote_code=self.ctx.model_config.trust_remote_code,
        )
        self.media_processor = media_processor
        self.hf_processor = MoonshotKimiVAutoProcessor(
            media_processor=self.media_processor,
            tokenizer=self.get_tokenizer(),
            media_token_id=self.media_token_id,
        )
        self.media_tokens_calculator = self.media_processor.media_tokens_calculator

    def get_hf_processor(self):
        return self.hf_processor

    def get_hf_config(self):
        return self.ctx.get_hf_config(KimiK25Config)

    def get_supported_mm_limits(self) -> Mapping[str, int | None]:
        # None means unlimited
        return {"vision_chunk": None}


class KimiK25DummyInputsBuilder(BaseDummyInputsBuilder[KimiK25ProcessingInfo]):
    """Builds dummy inputs for Kimi-K2.5 model profiling."""

    def __init__(self, info: KimiK25ProcessingInfo) -> None:
        super().__init__(info)
        self.media_token_id = self.info.media_token_id
        self.frame_per_chunk = self.info.media_processor.num_frames_per_chunk

    def get_dummy_text(self, mm_counts: Mapping[str, int]) -> str:
        num_media = mm_counts.get("vision_chunk", 0)
        return "<|media_pad|>" * num_media

    def get_dummy_mm_items(self):
        dummy_videos = self._get_dummy_images(
            height=MaxImageTokenMeta.height,
            width=MaxImageTokenMeta.width,
            num_images=self.frame_per_chunk,
        )

        video_chunk_dummy_item = VisionChunkVideo(
            type="video_chunk", video_chunk=dummy_videos
        )
        video_chunk_num_tokens = self.info.media_tokens_calculator(
            video_chunk_dummy_item
        )

        image_dummy_item = VisionChunkImage(
            type="image",
            image=self._get_dummy_images(
                height=MaxImageTokenMeta.height,
                width=MaxImageTokenMeta.width,
                num_images=1,
            )[0],
        )
        image_num_tokens = self.info.media_tokens_calculator(image_dummy_item)
        # return the larger one
        if video_chunk_num_tokens >= image_num_tokens:
            return [video_chunk_dummy_item]
        else:
            return [image_dummy_item]

    def get_dummy_mm_data(
        self,
        seq_len: int,
        mm_counts: Mapping[str, int],
        mm_options: Mapping[str, BaseDummyOptions],
    ) -> MultiModalDataDict:
        # TODO: Support mm_options for vision_chunk to allow user configuration
        dummy_items = self.get_dummy_mm_items()
        return {"vision_chunk": dummy_items}


class KimiK25MultiModalProcessor(BaseMultiModalProcessor[KimiK25ProcessingInfo]):
    """Multi-modal processor for Kimi-K2.5.

    Handles both image and video-chunk modalities.
    """

    def _get_mm_fields_config(
        self,
        hf_inputs: BatchFeature,
        hf_processor_mm_kwargs: Mapping[str, object],
    ) -> Mapping[str, MultiModalFieldConfig]:
        """Indicates how to slice media input into multiple items.

        pixel_values: [N, 3, patch_size, patch_size], all patches collected from B medias
        grid_thws: [B,3], each item: [N_t, N_h ,N_w], indicates the grid size in time/height/width direction
                    for current item.

        by multiplying [N_t, N_h ,N_w], we get the number of patches for each media item, thus we can slice
        pixel_values by pixel_values[start:start + N_t*N_h*N_w] to get patches of one item.

        """
        grid_thws = hf_inputs.get("grid_thws", torch.empty((0, 3)))
        grid_sizes = grid_thws.prod(-1)

        return dict(
            pixel_values=MultiModalFieldConfig.flat_from_sizes(
                "vision_chunk", grid_sizes
            ),
            grid_thws=MultiModalFieldConfig.batched("vision_chunk"),
        )

    def _get_prompt_updates(
        self,
        mm_items: MultiModalDataItems,
        hf_processor_mm_kwargs: Mapping[str, Any],
        out_mm_kwargs: MultiModalKwargsItems,
    ) -> Sequence[PromptUpdate]:
        hf_config = self.info.get_hf_config()
        media_token_id = hf_config.media_placeholder_token_id

        def get_replacement(item_idx: int):
            media = mm_items.get_items("vision_chunk", (VisionChunkProcessorItems,))
            num_media_token = self.info.media_tokens_calculator(media[item_idx])
            return [media_token_id] * num_media_token

        return [
            PromptReplacement(
                modality="vision_chunk",
                target=[media_token_id],
                replacement=get_replacement,
            ),
        ]

    def split_video_chunks(self, video):
        return self.info.media_processor.split_video_chunks(video)


@MULTIMODAL_REGISTRY.register_processor(
    KimiK25MultiModalProcessor,
    info=KimiK25ProcessingInfo,
    dummy_inputs=KimiK25DummyInputsBuilder,
)
class KimiK25ForConditionalGeneration(
    nn.Module,
    SupportsMultiModal,
    SupportsPP,
    SupportsQuant,
    SupportsEagle,
    SupportsEagle3,
):
    """Kimi-K2.5 model for conditional generation.

    Supports both image and video-chunk modalities.
    Video-chunks are temporal segments (typically 4 frames) that are
    processed with temporal pooling.
    """

    supports_encoder_tp_data = True

    hf_to_vllm_mapper = WeightsMapper(
        orig_to_new_prefix={
            # For legacy NVFP4 checkpoint compatibility:
            # see https://github.com/vllm-project/vllm/pull/33346#issuecomment-3851475033
            "language_model.layers.": "language_model.model.layers.",
            # mm projector
            "mm_projector.proj.0": "mm_projector.linear_1",
            "mm_projector.proj.2": "mm_projector.linear_2",
        }
    )

    @classmethod
    def get_placeholder_str(cls, modality: str, i: int) -> str | None:
        # Kimi-K2.5 uses video_chunk for all media types
        if modality == "image":
            return "<|media_begin|>image<|media_content|><|media_pad|><|media_end|>"
        elif modality == "video":
            # return a placeholder, to be replaced in the future.
            return "<|kimi_k25_video_placeholder|>"

        raise ValueError(f"Unsupported modality: {modality}")

    def __init__(
        self,
        vllm_config: VllmConfig,
        prefix: str = "",
    ) -> None:
        super().__init__()
        model_config = vllm_config.model_config
        config: KimiK25Config = model_config.hf_config
        self.config = config
        quant_config = vllm_config.quant_config

        # Check for MoonViT config compatibility
        self.use_data_parallel = (
            model_config.multimodal_config.mm_encoder_tp_mode == "data"
        )
        self.hidden_size = config.text_config.hidden_size
        self.device = current_platform.current_device()
        # Build vision tower directly with KimiK25VisionConfig
        with self._mark_tower_model(vllm_config, "vision_chunk"):
            self.vision_tower = MoonViT3dPretrainedModel(
                config.vision_config,
                quant_config=self._maybe_ignore_quant_config(quant_config),
                prefix=maybe_prefix(prefix, "vision_tower"),
            )
            self.vision_tower = self.vision_tower.to(
                device=self.device, dtype=model_config.dtype
            )

            self.mm_projector = KimiK25MultiModalProjector(
                config=config.vision_config,
                use_data_parallel=self.use_data_parallel,
                quant_config=self._maybe_ignore_quant_config(quant_config),
                prefix=maybe_prefix(prefix, "mm_projector"),
            )
            self.mm_projector = self.mm_projector.to(
                device=self.device, dtype=model_config.dtype
            )

        self.quant_config = quant_config
        with self._mark_language_model(vllm_config):
            self.language_model = init_vllm_registered_model(
                vllm_config=vllm_config,
                hf_config=config.text_config,
                prefix=maybe_prefix(prefix, "language_model"),
                architectures=["DeepseekV2ForCausalLM"],
            )
        self.make_empty_intermediate_tensors = (
            self.language_model.make_empty_intermediate_tensors
        )
        self.media_placeholder: int = self.config.media_placeholder_token_id

    def _maybe_ignore_quant_config(self, quant_config: QuantizationConfig):
        if isinstance(quant_config, CompressedTensorsConfig):
            return None
        # Handle Ascend-specific compressed tensors config
        try:
            from vllm_ascend.quantization.compressed_tensors_config import AscendCompressedTensorsConfig
            if isinstance(quant_config, AscendCompressedTensorsConfig):
                return None
        except ImportError:
            pass
        return quant_config

    def _parse_and_validate_media_input(
        self, **kwargs: object
    ) -> KimiK25MediaPixelInputs | None:
        pixel_values = kwargs.pop("pixel_values", None)
        grid_thws = kwargs.pop("grid_thws", None)
        if pixel_values is None:
            return None

        if isinstance(pixel_values, list):
            pixel_values = torch.cat(pixel_values, dim=0)

        if len(pixel_values.shape) == 5 or len(pixel_values.shape) == 3:
            pixel_values = pixel_values.reshape(
                pixel_values.shape[0] * pixel_values.shape[1], *pixel_values.shape[2:]
            )

        # The batch dimension of pixel_values has been flattened into shape[0]
        target_dtype = next(self.vision_tower.parameters()).dtype
        pixel_values = pixel_values.to(target_dtype)
        assert isinstance(grid_thws, torch.Tensor), (
            f"expect grid_thws to be a tensor, get {type(grid_thws)}"
        )
        # In some cases (e.g. with merger), grid_thws has an extra middle dimension
        grid_thws = grid_thws.reshape(-1, grid_thws.shape[-1])
        assert grid_thws.ndim == 2 and grid_thws.size(1) == 3, (
            f"unexpected shape for grid_thws: {grid_thws.shape}"
        )

        return KimiK25MediaPixelInputs(
            type="pixel_values",
            pixel_values=pixel_values,
            grid_thws=grid_thws,
        )

    def _process_media_input(
        self, media_input: KimiK25MediaPixelInputs
    ) -> list[torch.Tensor]:
        # NOTE(moyan): This forward will automatically batch the forward pass internally
        media_features = vision_tower_forward(
            self.vision_tower,
            media_input["pixel_values"],
            media_input["grid_thws"],
            mm_projector=self.mm_projector,
            use_data_parallel=self.use_data_parallel,
        )
        return media_features

    def embed_multimodal(self, **kwargs: object) -> NestedTensors | None:
        # Validate the multimodal input keyword arguments
        media_input = self._parse_and_validate_media_input(**kwargs)
        if media_input is None:
            return None

        # Run multimodal inputs through encoder and projector
        vision_embeddings = self._process_media_input(media_input)
        return vision_embeddings

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        intermediate_tensors: IntermediateTensors | None = None,
        inputs_embeds: torch.Tensor | None = None,
        **kwargs: object,
    ) -> IntermediateTensors:
        if intermediate_tensors is not None:
            inputs_embeds = None
        hidden_states = self.language_model(
            input_ids=input_ids,
            positions=positions,
            intermediate_tensors=intermediate_tensors,
            inputs_embeds=inputs_embeds,
        )

        return hidden_states

    def compute_logits(self, hidden_states: torch.Tensor, **kwargs) -> torch.Tensor:
        logits = self.language_model.compute_logits(hidden_states)
        return logits

    def set_aux_hidden_state_layers(self, layers: tuple[int, ...]) -> None:
        self.language_model.set_aux_hidden_state_layers(layers)

    def get_eagle3_aux_hidden_state_layers(self) -> tuple[int, ...]:
        return self.language_model.get_eagle3_aux_hidden_state_layers()

    def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]):
        loader = AutoWeightsLoader(self)
        return loader.load_weights(weights, mapper=self.hf_to_vllm_mapper)

方案三:从本项目文件中下载补丁文件并替换到环境中vllm对应文件

从本项目文件中下载kimi_k25 (for vllm18).py补丁文件,并更名为kimi_k25.py,然后替换原有vllm安装目录/vllm/model_executor/models/下的kimi_k25.py即可

7.推理执行

创建推理脚本

vi编辑infer_kimi-k2.6.sh推理脚本

export VLLM_USE_V1=1 
export PYTORCH_NPU_ALLOC_CONF="expandable_segments:True"   
export TASK_QUEUE_ENABLE=1   
export HCCL_BUFFSIZE=1024
export HCCL_OP_EXPANSION_MODE="AIV"
export VLLM_ASCEND_ENABLE_FLASHCOMM=0   

export VLLM_TORCH_PROFILER_WITH_STACK=0    
export VLLM_ASCEND_ENABLE_FUSED_MC2=1  
export VLLM_ASCEND_ENABLE_TOPK_OPTIMIZE=1
export VLLM_ASCEND_FUSION_OP_TRANSPOSE_KV_CACHE_BY_BLOCK=1

export HCCL_INTRA_ROCE_ENABLE=1

export CUDA_DEVICE_MAX_CONNECTIONS=1
export OMP_PROC_BIND=false
export OMP_NUM_THREADS=8

export ATB_LLM_HCCL_ENABLE=1
export ATB_OPERATION_EXECUTE_ASYNC=2
vllm serve 模型权重所在目录 \
--served-model-name Kimi-K2.6 \
--host 0.0.0.0 \
--port 8016 \
--enable-auto-tool-choice \
--tool-call-parser kimi_k2 \
--tensor-parallel-size 16 \
--enable-expert-parallel \
--max-num-seqs 16 \
--max-model-len 16384 \
--max-num-batched-tokens 1024 \
--trust-remote-code \
--gpu-memory-utilization 0.85 \
--enable-prefix-caching \
--additional-config '{"ascend_scheduler_config":{"enabled":true,"enable_chunked_prefill":true},"torchair_graph_config":{"enabled":true}}' \
--compilation-config  '{"cudagraph_capture_sizes":[1,2,4,8,16,32,64],"cudagraph_mode":"FULL_DECODE_ONLY"}' \
--kv-cache-dtype auto \
--seed 1024

执行推理脚本

conda activate ascend-infer
bash  infer_kimi-k2.6.sh

8.测试验证

1) 简单对话测试

curl http://0.0.0.0:8016/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "kimi",
"messages": [
        {"role": "system", "content": "You are Kimi, an AI assistant created by Moonshot AI."},
        {"role": "user", "content": [{"type": "text", "text": "请介绍一下你自己"}]}
    ],
"temperature": 0.1,
"max_tokens": 256
}'

2) 工具调用测试

curl http://0.0.0.0:8016/v1/chat/completions \  
-H "Content-Type: application/json" \ 
-d '{
    "model": "Kimi-K2.6",
    "messages": [
        {
            "role": "user",
            "content": "请帮我查询今天北京的天气"
        }
    ],
    "tools": [
        {
            "type": "function",
            "function": {
                "name": "get_weather",
                "description": "获取指定城市的天气",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "city": {
                            "type": "string"
                        }
                    },
                    "required": [
                        "city"
                    ]
                }
            }
        }
    ]
}'

3) 图像识别推理测试

import openai
import base64
import requests

def chat_with_image(client: openai.OpenAI, model_name: str):
    # 图片加载
    url = 'https://huggingface.co/moonshotai/Kimi-K2.5/resolve/main/figures/kimi-logo.png'
    image_base64 = base64.b64encode(requests.get(url).content).decode()

    messages = [
        {
            'role': 'user',
            'content': [
                {'type': 'image_url', 'image_url': {'url': f'data:image/png;base64,{image_base64}'}},
                {'type': 'text', 'text': '请详细描述图片的内容'},
            ]
        }
    ]

    print('====== 本地服务调用======')
    response = client.chat.completions.create(
        model=model_name,
        messages=messages,
        stream=False,
        max_tokens=8192
    )
    print(response.choices[0].message.content)

    return response.choices[0].message.content


if __name__ == "__main__":
    client = openai.OpenAI(
        api_key="dummy",  # 本地服务无需真实密钥,任意字符串即可
        base_url="http://127.0.0.1:8016/v1"  # 你的本地服务地址+端口+/v1
    )
    chat_with_image(client, "Kimi-K2.6")

4) 视频识别推理测试

from openai import OpenAI
import base64
import requests
import cv2
import numpy as np
import tempfile

# ===================== 【本地 vLLM 服务】=====================
client = OpenAI(
    api_key="dummy",  # 本地服务无需真实密钥,任意字符串即可
    base_url="http://127.0.0.1:8016/v1"  # 你的本地服务地址+端口+/v1
)
MODEL_NAME = "Kimi-K2.6"

# ===================== 在线视频推理函数 =====================
def chat_with_video(client: OpenAI, model_name: str):
    # 在线视频地址
    url = 'https://huggingface.co/moonshotai/Kimi-K2.5/resolve/main/figures/demo_video.mp4'
    print("正在下载在线视频...")

    # 1. 下载视频
    video_content = requests.get(url).content

    # 2. 写入临时文件
    with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
        tmp_file.write(video_content)
        tmp_path = tmp_file.name

    # 3. 读取临时视频文件并抽帧
    print("正在抽取视频关键帧...")
    cap = cv2.VideoCapture(tmp_path)
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    sample_count = 8
    step = max(1, total_frames // sample_count)

    for i in range(sample_count):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
        ret, frame = cap.read()
        if ret:
            _, buffer = cv2.imencode('.jpg', frame)
            base64_frame = base64.b64encode(buffer).decode()
            frames.append({
                "type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{base64_frame}"}
            })
    cap.release()

    # 4. 删除临时文件
    import os
    os.unlink(tmp_path)

    # 5. 构造请求
    messages = [
        {
            "role": "user",
            "content": [
                *frames,
                {"type": "text", "text": "Describe the video in detail."},
            ],
        }
    ]

    try:
        # ===================== Thinking Mode =====================
        print("\n====== Thinking Mode 推理中 ======")
        response = client.chat.completions.create(
            model=model_name,
            messages=messages,
            stream=False,
            max_tokens=4096,
            extra_body={"chat_template_kwargs": {"thinking": True}}
        )
        print('====== reasoning_content ======')
        reasoning = getattr(response.choices[0].message, 'reasoning_content', '')
        print(f'reasoning content: {reasoning}')
        print('====== response ======')
        print(f'response: {response.choices[0].message.content}')

        # ===================== Instant Mode =====================
        print("\n====== Instant Mode 推理中 ======")
        response = client.chat.completions.create(
            model=model_name,
            messages=messages,
            stream=False,
            max_tokens=4096,
            extra_body={"chat_template_kwargs": {"thinking": False}}
        )
        print('====== response ======')
        print(f'response: {response.choices[0].message.content}')

        return response.choices[0].message.content

    except Exception as e:
        print("\n请求失败:", str(e))

# ===================== 运行 =====================
if __name__ == "__main__":
    chat_with_video(client, MODEL_NAME)

5) 思考模式推理测试

import openai

def simple_chat(client: openai.OpenAI, model_name: str):
    messages = [
        {'role': 'system', 'content': 'You are Kimi, an AI assistant created by Moonshot AI.'},
        {
            'role': 'user',
            'content': [
                {'type': 'text', 'text': '那个数字更大一些? 9.11 还是 9.9? 认真考虑一下.'}
            ],
        },
    ]

    # 本地vLLM服务 思考模式
    print('====== 调用本地服务:思考模式 ======')
    response = client.chat.completions.create(
        model=model_name, 
        messages=messages, 
        stream=False, 
        max_tokens=4096
    )

    print(f'response: {response.choices[0].message.content}')

    print('\n====== 调用本地服务:即时模式 ======')
    response = client.chat.completions.create(
        model=model_name,
        messages=messages,
        stream=False,
        max_tokens=4096,
        extra_body={'chat_template_kwargs': {"thinking": False}}
    )
    print(f'response: {response.choices[0].message.content}')


if __name__ == "__main__":
    client = openai.OpenAI(
        api_key="dummy_key",  # 本地服务无需真实密钥,任意字符串即可
        base_url="http://127.0.0.1:8016/v1"  # 你的本地服务地址+端口+/v1
    )

    # 模型名称:与启动脚本中 --served-model-name kimi 保持一致
    MODEL_NAME = "Kimi-K2.6"

    # 执行测试
    simple_chat(client, MODEL_NAME)

9.注意事项

1)启动报错 W4A16 不支持,关键是修改 kimi_k25.py 适配昇腾量化配置,解决Kimi-K2.5/K2.6原生模型在昇腾 vLLM 上的适配问题。 2)严格遵循版本对应原则:vllm=0.18.0+empty、vllm_ascend=0.18.0rc1 是适配基础 3)启动参数kv-cache-dtype要设为 auto,不能设置为FP8,避免核心报错 4) 如果需要开启工具调用能力,需要增加--enable-auto-tool-choice 和 --tool-call-parser kimi_k2 两个启动参数