HuggingFace镜像/VideoGameBunny-V1-4B
模型介绍文件和版本分析
下载使用量0

VideoGameBunny V1.0-4B

[网站]

在 openmind 中的使用

import importlib
from packaging import version
import os
import time
import argparse
import torch
import numpy as np
import logging

# 设置HF_ENDPOINT=https://hf-mirror.com
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'

# 获取当前 Python 文件所在的文件夹路径
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, 'config.json')

def set_logging(model_name):
    log_filename = os.path.join(os.getcwd(), f"{model_name}_inference_{time.strftime('%Y%m%d_%H%M%S')}.log")
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        handlers=[
            logging.FileHandler(log_filename),
            logging.StreamHandler(),
        ],
    )
def parse_args():
    parser = argparse.ArgumentParser(description="NPU Inference for Text Generation Model")
    parser.add_argument(
        "--model_name_or_path",
        "-m",
        type=str,
        help="Path to model",
        default=".",
    )
    parser.add_argument(
        "--inference_mode",
        "-i",
        type=str,
        help="Inference mode",
        default="model",
    )
    parser.add_argument(
        "--debug",
        action="store_true",
        help="Debug mode",
    )
    parser.add_argument(
        "--gguf_file",
        "-g",
        type=str,
        help="Path to GGUF file",
        default=None,
    )
    return parser.parse_args()

args = parse_args()
model_path = args.model_name_or_path
abs_model_path = os.path.abspath(model_path)
model_name = os.path.basename(abs_model_path)
set_logging(model_name)

if args.debug:
    logging.info("Debug mode enabled, using transformers package from source.")
    from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig, pipeline, is_torch_npu_available
else:
    logging.info("Debug mode disabled, using openmind package.")
    from openmind import AutoTokenizer, AutoModelForCausalLM, AutoConfig, pipeline, is_torch_npu_available

def load_model_from_gguf(model_path: str, device_map="auto"):
    gguf_filename = args.gguf_file
    tokenizer = AutoTokenizer.from_pretrained(model_path, gguf_file=gguf_filename)
    tokenizer.pad_token = tokenizer.eos_token
    model = AutoModelForCausalLM.from_pretrained(model_path, gguf_file=gguf_filename, device_map=device_map)
    return tokenizer, model

def load_model_from_local(model_path: str, device_map="auto"):
    config = AutoConfig.from_pretrained(config_path, trust_remote_code=True)
    tokenizer = AutoTokenizer.from_pretrained(model_path, config=config, trust_remote_code=True)
    tokenizer.pad_token = tokenizer.eos_token
    model = AutoModelForCausalLM.from_pretrained(model_path, config=config, device_map=device_map, trust_remote_code=True)
    return tokenizer, model

def load_model_from_pipeline(model_path: str, device_map="auto", task="text-generation"):
    config = AutoConfig.from_pretrained(config_path, trust_remote_code=True)
    pipeline_pt = pipeline(
        task=task,
        model=model_path,
        device_map=device_map,
        framework="pt",
        truncation=True,
        trust_remote_code=True,
        config=config,
    )
    if not args.debug:
        # 判断openmind版本是否大于0.9.0
        if version.parse(importlib.metadata.version("openmind")) >= version.parse("0.9.0"):
            pipeline_pt = pipeline_pt.pipeline
    return pipeline_pt.tokenizer, pipeline_pt

def load_model(mode: str, *args, **kwargs):
    if mode == "gguf":
        return load_model_from_gguf(*args, **kwargs)
    elif mode == "model":
        return load_model_from_local(*args, **kwargs)
    elif mode == "pipeline":
        return load_model_from_pipeline(*args, **kwargs)
    else:
        raise ValueError(f"load_model Unknown mode: {mode}")
    
def generate_text_form_model(tokenizer, model, prompt, max_new_tokens=50):
    inputs = tokenizer(prompt, return_tensors="pt", padding=True).to(model.device)
    output = model.generate(
        input_ids=inputs['input_ids'], 
        attention_mask=inputs['attention_mask'],
        max_new_tokens=max_new_tokens,
    )
    return tokenizer.decode(output[0], skip_special_tokens=True)

def generate_text_from_pipeline(tokenizer, pipeline, prompt, max_new_tokens=50):
    results = pipeline(
        prompt,
        max_new_tokens=max_new_tokens,
    )
    return results[0]["generated_text"]

def generate_text(mode: str, *args, **kwargs):
    if mode == "model" or mode == "gguf":
        return generate_text_form_model(*args, **kwargs)
    elif mode == "pipeline":
        return generate_text_from_pipeline(*args, **kwargs)
    else:
        raise ValueError(f"generate_text Unknown mode: {mode}")

def apply_chat_template(tokenizer, tokenize=False):
    if tokenizer.chat_template is None:
        print("Chat template is not defined, use default template.")
        tokenizer.chat_template = "{% if not add_generation_prompt is defined %}{% set add_generation_prompt = false %}{% endif %}{% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}"
    chat = [
        {
            "role": "system",
            "content": "You are a helpful assistant who always responds in a friendly manner",
        },
        {
            "role": "user",
            "content": "Why does the ocean appear blue?",
        },
    ]
    chat_input = tokenizer.apply_chat_template(chat, tokenize=tokenize)
    return chat_input

def main():
    model_path = args.model_name_or_path
    abs_model_path = os.path.abspath(model_path)
    model_name = os.path.basename(abs_model_path)
    logging.info(f"测试模型: {model_name}")
    logging.info(f"模型路径: {model_path}")
    logging.info(f"绝对路径: {abs_model_path}")
    inference_mode = args.inference_mode
    logging.info(f"推理模式: {inference_mode}")
    
    # 确保使用 NPU 设备
    device_map = "auto" if is_torch_npu_available() else "cpu"
    logging.info(f"NPU {'available' if device_map == 'auto' else 'not available'}, use device_map='{device_map}'.")

    # 加载模型
    tokenizer, task_pipeline = load_model(mode=inference_mode, model_path=model_path, device_map=device_map)
    prompt = apply_chat_template(tokenizer, tokenize=False)

    # 推理性能测试
    inference_times = []
    num_runs = 10

    logging.info(f"\n=== NPU {model_name} 性能测试 ===")

    for i in range(num_runs):
        input_text = prompt

        start_time = time.time()
        
        results = generate_text(inference_mode, tokenizer, task_pipeline, input_text)
        torch.npu.synchronize()

        inference_time = time.time() - start_time
        inference_times.append(inference_time)

        if i == 0:
            logging.info(f"输入文本: {input_text}")
            logging.info("生成结果:")
            logging.info(f"  {results}")

    avg_time = np.mean(inference_times)
    std_time = np.std(inference_times)

    logging.info("\n性能分析:")
    logging.info(f"NPU平均推理时间: {avg_time:.4f} 秒")
    logging.info(f"NPU推理时间标准差: {std_time:.4f} 秒")
    logging.info(f"推理时间列表: {inference_times}")


if __name__ == "__main__":
    main()