这是使用llama.cpp对ministral/Ministral-3b-instruct进行量化的版本。
注意:使用时需通过examples里的gguf_new_metadata.py(复制而来)修改gguf文件的元数据。
import importlib
import os
import time
import argparse
import torch
import numpy as np
import logging
from packaging import version
def set_logging(model_name):
log_filename = os.path.join(os.getcwd(), f"{model_name}_inference_{time.strftime('%Y%m%d_%H%M%S')}.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler(),
],
)
def parse_args():
parser = argparse.ArgumentParser(description="NPU Inference for Text Generation Model")
parser.add_argument(
"--model_name_or_path",
"-m",
type=str,
help="Path to model",
default=".",
)
parser.add_argument(
"--inference_mode",
"-i",
type=str,
help="Inference mode",
default="gguf",
)
parser.add_argument(
"--debug",
action="store_true",
help="Debug mode",
)
parser.add_argument(
"--gguf_file",
"-g",
type=str,
help="Path to GGUF file",
default="Ministral-3b-instruct.Q4_0.gguf",
)
return parser.parse_args()
args = parse_args()
model_path = args.model_name_or_path
abs_model_path = os.path.abspath(model_path)
model_name = os.path.basename(abs_model_path)
set_logging(model_name)
# 获取当前文件位置
current_file_path = os.path.abspath(__file__)
# 运行gguf_new_metadata.py
gguf_new_metadata_path = os.path.join(os.path.dirname(current_file_path), "gguf_new_metadata.py")
gguf_file_path = os.path.join(model_path, args.gguf_file)
new_gguf_file = args.gguf_file+"_modified.gguf"
output_gguf_file_path = os.path.join(model_path, new_gguf_file)
os.system(f"python {gguf_new_metadata_path} {gguf_file_path} {output_gguf_file_path} --general-name ministral_3b")
args.gguf_file = new_gguf_file
if args.debug:
logging.info("Debug mode enabled, using transformers package from source.")
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, is_torch_npu_available
else:
logging.info("Debug mode disabled, using openmind package.")
from openmind import AutoTokenizer, AutoModelForCausalLM, pipeline, is_torch_npu_available
def load_model_from_gguf(model_path: str, device_map="auto"):
gguf_filename = args.gguf_file
tokenizer = AutoTokenizer.from_pretrained(model_path, gguf_file=gguf_filename)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(model_path, gguf_file=gguf_filename, device_map=device_map)
return tokenizer, model
def load_model_from_local(model_path: str, device_map="auto"):
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(model_path, device_map=device_map, trust_remote_code=True)
return tokenizer, model
def load_model_from_pipeline(model_path: str, device_map="auto", task="text-generation"):
pipeline_pt = pipeline(
task=task,
model=model_path,
device_map=device_map,
framework="pt",
truncation=True,
trust_remote_code=True,
)
if not args.debug:
# 判断openmind版本是否大于0.9.0
if version.parse(importlib.metadata.version("openmind")) >= version.parse("0.9.0"):
pipeline_pt = pipeline_pt.pipeline
return pipeline_pt.tokenizer, pipeline_pt
def load_model(mode: str, *args, **kwargs):
if mode == "gguf":
return load_model_from_gguf(*args, **kwargs)
elif mode == "model":
return load_model_from_local(*args, **kwargs)
elif mode == "pipeline":
return load_model_from_pipeline(*args, **kwargs)
else:
raise ValueError(f"load_model Unknown mode: {mode}")
def generate_text_form_model(tokenizer, model, prompt, max_new_tokens=50):
inputs = tokenizer(prompt, return_tensors="pt", padding=True).to(model.device)
output = model.generate(
input_ids=inputs['input_ids'],
attention_mask=inputs['attention_mask'],
max_new_tokens=max_new_tokens,
)
return tokenizer.decode(output[0], skip_special_tokens=True)
def generate_text_from_pipeline(tokenizer, pipeline, prompt, max_new_tokens=50):
results = pipeline(
prompt,
max_new_tokens=max_new_tokens,
)
return results[0]["generated_text"]
def generate_text(mode: str, *args, **kwargs):
if mode == "model" or mode == "gguf":
return generate_text_form_model(*args, **kwargs)
elif mode == "pipeline":
return generate_text_from_pipeline(*args, **kwargs)
else:
raise ValueError(f"generate_text Unknown mode: {mode}")
def apply_chat_template(tokenizer, tokenize=False):
if tokenizer.chat_template is None:
print("Chat template is not defined, use default template.")
tokenizer.chat_template = "{% if not add_generation_prompt is defined %}{% set add_generation_prompt = false %}{% endif %}{% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}"
chat = [
{
"role": "system",
"content": "You are a helpful assistant who always responds in a friendly manner",
},
{
"role": "user",
"content": "Why does the ocean appear blue?",
},
]
chat_input = tokenizer.apply_chat_template(chat, tokenize=tokenize)
return chat_input
def main():
model_path = args.model_name_or_path
abs_model_path = os.path.abspath(model_path)
model_name = os.path.basename(abs_model_path)
logging.info(f"测试模型: {model_name}")
logging.info(f"模型路径: {model_path}")
logging.info(f"绝对路径: {abs_model_path}")
inference_mode = args.inference_mode
logging.info(f"推理模式: {inference_mode}")
# 确保使用 NPU 设备
device_map = "npu" if is_torch_npu_available() else "cpu"
logging.info(f"NPU {'available' if device_map == 'npu' else 'not available'}, use device_map='{device_map}'.")
# 加载模型
tokenizer, task_pipeline = load_model(mode=inference_mode, model_path=model_path, device_map=device_map)
prompt = apply_chat_template(tokenizer, tokenize=False)
# 推理性能测试
inference_times = []
num_runs = 10
logging.info(f"\n=== NPU {model_name} 性能测试 ===")
for i in range(num_runs):
input_text = prompt
start_time = time.time()
results = generate_text(inference_mode, tokenizer, task_pipeline, input_text)
torch.npu.synchronize()
inference_time = time.time() - start_time
inference_times.append(inference_time)
if i == 0:
logging.info(f"输入文本: {input_text}")
logging.info("生成结果:")
logging.info(f" {results}")
avg_time = np.mean(inference_times)
std_time = np.std(inference_times)
logging.info("\n性能分析:")
logging.info(f"NPU平均推理时间: {avg_time:.4f} 秒")
logging.info(f"NPU推理时间标准差: {std_time:.4f} 秒")
logging.info(f"推理时间列表: {inference_times}")
if __name__ == "__main__":
main()
Ministral 是一系列语言模型,采用与知名的 Mistral 模型相同的架构,但模型规模更小。