该模型是在 sardinelab/MT-pref 数据集上对 martimfasantos/gemma-2-2b-it-MT-SFT 进行微调得到的版本。
import importlib.metadata
import os
import time
import argparse
import torch
import numpy as np
import logging
from packaging import version
def set_logging(model_name):
log_filename = os.path.join(os.getcwd(), f"{model_name}_inference_{time.strftime('%Y%m%d_%H%M%S')}.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler(),
],
)
def parse_args():
parser = argparse.ArgumentParser(description="NPU Inference for Text Generation Model")
parser.add_argument("-m", "--model_name_or_path", type=str, help="Path to model", default=".")
parser.add_argument("-i", "--inference_mode", type=str, help="Inference mode ['pipeline', 'auto', 'gguf']", default="pipeline")
parser.add_argument("--debug", action="store_true", help="Debug mode for transformers package", default=False)
parser.add_argument("-g", "--gguf_file", type=str, help="GGUF file", default=None)
parser.add_argument("-t", "--task_type", type=str, help="Pipeline task type", default="text-generation")
parser.add_argument("-p", "--prompt_type", type=str, help="Prompt type ['chat', 'simple', 'translate']", default="translate")
parser.add_argument("-c", "--custom_config", action="store_true", help="Use custom config", default=False)
return parser.parse_args()
args = parse_args()
model_path = args.model_name_or_path
abs_model_path = os.path.abspath(model_path)
model_name = os.path.basename(abs_model_path)
set_logging(model_name)
if args.custom_config:
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
logging.info("Set HF_ENDPOINT to 'https://hf-mirror.com' for download config file.")
if args.debug:
logging.info("Debug mode enabled, using transformers package from source.")
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, is_torch_npu_available
if args.custom_config:
from transformers import AutoConfig
else:
logging.info("Debug mode disabled, using openmind package.")
from openmind import AutoTokenizer, AutoModelForCausalLM, pipeline, is_torch_npu_available
if args.custom_config:
from openmind import AutoConfig
model_config = None
if args.custom_config:
# 获取当前 Python 文件所在的文件夹路径
file_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(file_dir, 'config.json')
if os.path.exists(config_path):
logging.info(f"Custom config file: {config_path}")
model_config = AutoConfig.from_pretrained(config_path)
else:
logging.warning("Custom config file not found, use default config.")
def load_model_from_gguf(model_path: str, device_map="auto", **kwargs):
gguf_filename = args.gguf_file
tokenizer = AutoTokenizer.from_pretrained(model_path, gguf_file=gguf_filename)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(model_path, gguf_file=gguf_filename, device_map=device_map)
return tokenizer, model
def load_model_from_auto(model_path: str, device_map="auto", **kwargs):
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True, config=model_config)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(model_path, device_map=device_map, trust_remote_code=True, config=model_config)
return tokenizer, model
def load_model_from_pipeline(model_path: str, device_map="auto", task="text-generation", **kwargs):
pipeline_pt = pipeline(
task=task,
model=model_path,
device_map=device_map,
framework="pt",
truncation=True,
trust_remote_code=True,
config=model_config,
)
if not args.debug:
# 判断openmind版本是否大于0.9.0
if version.parse(importlib.metadata.version("openmind")) >= version.parse("0.9.0"):
pipeline_pt = pipeline_pt.pipeline
return pipeline_pt.tokenizer, pipeline_pt
def load_model(mode: str, *args, **kwargs):
if mode == "gguf":
return load_model_from_gguf(*args, **kwargs)
elif mode == "auto":
return load_model_from_auto(*args, **kwargs)
elif mode == "pipeline":
return load_model_from_pipeline(*args, **kwargs)
else:
raise ValueError(f"load_model Unknown mode: {mode}, should be one of ['gguf', 'auto', 'pipeline']")
def generate_text_form_model(tokenizer, model, prompt, max_new_tokens=50, **kwargs):
inputs = tokenizer(prompt, return_tensors="pt", padding=True).to(model.device)
output = model.generate(
input_ids=inputs['input_ids'],
attention_mask=inputs['attention_mask'],
max_new_tokens=max_new_tokens,
)
return tokenizer.decode(output[0], skip_special_tokens=True)
def generate_text_from_pipeline(tokenizer, pipeline, prompt, max_new_tokens=50, **kwargs):
results = pipeline(
prompt,
max_new_tokens=max_new_tokens,
)
return results[0]["generated_text"]
def generate_text(mode: str, **kwargs):
if mode == "auto" or mode == "gguf":
model = kwargs.pop("model_or_pipeline")
return generate_text_form_model(model=model, **kwargs)
elif mode == "pipeline":
pipeline = kwargs.pop("model_or_pipeline")
return generate_text_from_pipeline(pipeline=pipeline, **kwargs)
else:
raise ValueError(f"generate_text Unknown mode: {mode}")
def apply_template(tokenizer, tokenize=False, prompt_type="chat"):
if hasattr(tokenizer, 'chat_template'):
if tokenizer.chat_template is None:
logging.warning("Chat template is not defined, use default template.")
tokenizer.chat_template = "{% if not add_generation_prompt is defined %}{% set add_generation_prompt = false %}{% endif %}{% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}"
if prompt_type == "chat":
chat = [
{
"role": "system",
"content": "You are a helpful assistant who always responds in a friendly manner",
},
{
"role": "user",
"content": "Why does the ocean appear blue?",
},
]
chat_input = tokenizer.apply_chat_template(chat, tokenize=tokenize)
elif prompt_type == "simple":
chat = [
{
"role": "user",
"content": "Why does the ocean appear blue?",
},
]
chat_input = tokenizer.apply_chat_template(chat, tokenize=tokenize)
elif prompt_type == "translate":
chat = [
{
"role": "user",
"content": "Translate English to Chinese: Hello, how are you?",
},
]
chat_input = tokenizer.apply_chat_template(chat, tokenize=tokenize)
else:
logging.info(f"Unknown prompt type, use prompt type as input: {prompt_type}")
chat_input = prompt_type
else:
chat_input = "Why does the ocean appear blue?"
return chat_input
def main():
model_path = args.model_name_or_path
abs_model_path = os.path.abspath(model_path)
model_name = os.path.basename(abs_model_path)
logging.info(f"测试模型: {model_name}")
logging.info(f"模型路径: {model_path}")
logging.info(f"绝对路径: {abs_model_path}")
inference_mode = args.inference_mode
logging.info(f"推理模式: {inference_mode}")
if args.gguf_file is not None:
logging.info(f"GGUF文件: {args.gguf_file}")
logging.info(f"任务模式: {args.task_type}")
logging.info(f"文本类型: {args.prompt_type}")
# 确保使用 NPU 设备
device_map = "npu" if is_torch_npu_available() else "cpu"
logging.info(f"NPU {'available' if device_map == 'npu' else 'not available'}, use device_map='{device_map}'.")
# 加载模型
tokenizer, model_or_pipeline = load_model(mode=inference_mode, model_path=model_path, device_map=device_map, task=args.task_type)
prompt = apply_template(tokenizer, tokenize=False, prompt_type=args.prompt_type)
# 推理性能测试
inference_times = []
num_runs = 10
logging.info(f"\n=== NPU {model_name} 性能测试 ===")
for i in range(num_runs):
input_text = prompt
start_time = time.time()
results = generate_text(mode=inference_mode, tokenizer=tokenizer, model_or_pipeline=model_or_pipeline, prompt=input_text)
if device_map in ["npu", "auto"]:
torch.npu.synchronize()
else:
pass
inference_time = time.time() - start_time
inference_times.append(inference_time)
if i == 0:
logging.info(f"输入文本: {input_text}")
logging.info("生成结果:")
logging.info(f"{results}")
avg_time = np.mean(inference_times)
std_time = np.std(inference_times)
logging.info("\n性能分析:")
logging.info(f"NPU平均推理时间: {avg_time:.4f} 秒")
logging.info(f"NPU推理时间标准差: {std_time:.4f} 秒")
logging.info(f"推理时间列表: {inference_times}")
if __name__ == "__main__":
main()需要更多信息
需要更多信息
需要更多信息
训练过程中使用了以下超参数: