# Copyright 2023 Cheng Li
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
import fire
from llm_analysis.constant import (
DTYPE_CONFIG_DIR_NAME,
GPU_CONFIG_DIR_NAME,
MODEL_CONFIG_DIR_NAME,
)
from llm_analysis.logger import logger
try:
from transformers import AutoConfig
except ImportError:
logger.warning(
f"cannot import AutoConfig from transformers, `transformers` is not installed, HuggingFace will not be available to use for model config retrieval"
)
AutoConfig = None
[docs]class EnhancedJSONEncoder(json.JSONEncoder):
[docs] def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)
[docs]@dataclass
class ModelConfig:
name: str # model config name
num_layers: int # number of transformer layers (blocks)
n_head: int # number of attention heads
hidden_dim: int # hidden dimension
vocab_size: int # vocabulary size
max_seq_len: int = None # max sequence length
num_key_value_heads: int = None # the number of key value heads implementing Grouped Query Attention (GQA), If it is not specified, will default to n_head. If `num_key_value_heads=num_attention_heads`, the model will use Multi Head Attention (MHA), if `num_key_value_heads=1 the model will use Multi Query Attention (MQA) otherwise GQA is used. See https://github.com/huggingface/transformers/blob/main/src/transformers/models/llama/configuration_llama.py for details
num_key_value_groups: int = None # number of key value groups for GQA
ffn_embed_dim: int = (
None # hidden dimension of FFN, default to 4 * hidden_dim
)
expansion_ratio: int = None
model_type: str = (
None # model type as tagged on Hugging Face (e.g., gpt2, opt, llama.)
)
moe_num_experts: int = 1 # number of experts for mixture of experts model
moe_top_k: int = 1 # top k experts for mixture of experts model
def __post_init__(self):
if self.ffn_embed_dim is None and self.expansion_ratio is None:
self.ffn_embed_dim = self.hidden_dim * 4
self.expansion_ratio = 4
elif self.ffn_embed_dim is None:
self.ffn_embed_dim = self.hidden_dim * self.expansion_ratio
elif self.expansion_ratio is None:
assert self.ffn_embed_dim % self.hidden_dim == 0, f"ffn_embed_dim ({self.ffn_embed_dim}) must be divisible by hidden_dim ({self.hidden_dim})"
self.expansion_ratio = self.ffn_embed_dim / self.hidden_dim
if self.num_key_value_heads is None:
self.num_key_value_heads = self.n_head
assert self.n_head % self.num_key_value_heads == 0, f"n_head ({self.n_head}) must be divisible by num_key_value_heads ({self.num_key_value_heads})"
self.num_key_value_groups = self.n_head / self.num_key_value_heads
def __str__(self):
return dataclasses.asdict(self).__str__()
[docs]@dataclass
class GPUConfig:
name: str # GPU config name
mem_per_GPU_in_GB: float # memory per GPU in GB
hbm_bandwidth_in_GB_per_sec: float # GPU HBM bandwidth in GB/s
intra_node_bandwidth_in_GB_per_sec: float # intra node GPU bandwidth in GB/s
intra_node_min_message_latency: float # minimum intra node message latency in seconds
peak_fp16_TFLOPS: float # peak Tensor TFLOPS for FP16
peak_i8_TFLOPS: float = None # peak Tensor TFLOPS for INT8
peak_i4_TFLOPS: float = None # peak Tensor TFLOPS for INT4
inter_node_bandwidth_in_GB_per_sec: float = 200 # inter node bandwidth in GB/s, assuming Mellanox 200Gbps HDR Infiniband
def __post_init__(self):
if self.peak_i8_TFLOPS is None:
self.peak_i8_TFLOPS = 2 * self.peak_fp16_TFLOPS
if self.peak_i4_TFLOPS is None:
self.peak_i4_TFLOPS = 4 * self.peak_fp16_TFLOPS
[docs]@dataclass
class DtypeConfig:
name: str = "w16a16e16" # dtype config name
weight_bits: int = 16 # number of bits for weight
activation_bits: int = 16 # number of bits for activation
embedding_bits: int = 16 # number of bits for the embedding
[docs]@dataclass
class ParallelismConfig:
tp_size: int = 1 # tensor parallelism size, Megatron-LM tensor parallelism implementation
pp_size: int = 1 # pipeline parallelism size, Megatron-LM pipeline parallelism implementation
dp_size: int = (
1 # data parallelism size, DeepSpeed Zero parallelism implementation
)
ep_size: int = 1 # expert parallelism size
sp_size: int = None # sequence parallelism size, Megatron-LM sequence parallelism implementation
def __post_init__(self):
if self.sp_size is None:
self.sp_size = self.tp_size
# model name and configurations mapping populated from MODEL_CONFIG_DIR_NAME
model_configs = {}
# gpu name and configurations mapping populated from MODEL_CONFIG_DIR_NAME
# https://gist.github.com/joshlk/bbb1aca6e70b11d251886baee6423dcb
gpu_configs = {}
# dtype name and configurations mapping populated from MODEL_CONFIG_DIR_NAME
dtype_configs = {}
def canonical_model_name(name: str) -> str:
return name.replace("/", "_")
[docs]def dump_configs(configs: dict, config_dir_name: str) -> None:
"""Dump configs to json files under config_dir_name.
Args:
configs (dict): a dict of configs
config_dir_name (str): the name of the output directory
"""
for k, v in configs.items():
with open(
Path(__file__).parent / Path(config_dir_name, f"{k}.json"),
"w") as f:
json.dump(v, f, cls=EnhancedJSONEncoder, indent=4)
logger.info(f"dumped {len(configs)} configs to {config_dir_name}")
[docs]def get_model_config_from_hf(name: str, ) -> ModelConfig:
"""Get model config from HuggingFace transformers library `AutoConfig`; if the model
does not exist, try updating the transformers library.
Args:
name (str): the model id of a pretrained model configuration hosted inside a model repo on huggingface.co
Returns:
ModelConfig: a dataclass for llm-analysis model config
"""
if AutoConfig is None:
logger.warning(
f"cannot import AutoConfig from transformers, `transformers` is not installed, HuggingFace will not be available to use for model config retrieval"
)
return None
hf_config = AutoConfig.from_pretrained(name, trust_remote_code=True)
if hasattr(hf_config, "num_hidden_layers"):
num_layers = hf_config.num_hidden_layers
elif hasattr(hf_config, "n_layers"):
num_layers = hf_config.n_layers
else:
raise Exception(
"hf config does not have num_hidden_layers or n_layers, check the config.json file"
)
if hasattr(hf_config, "num_attention_heads"):
n_head = hf_config.num_attention_heads
elif hasattr(hf_config, "n_heads"):
n_head = hf_config.n_heads
else:
raise Exception(
"hf config does not have num_attention_heads or n_heads, check the config.json file"
)
if hasattr(hf_config, "hidden_size"):
hidden_dim = hf_config.hidden_size
elif hasattr(hf_config, "d_model"):
hidden_dim = hf_config.d_model
else:
raise Exception(
"hf config does not have hidden_size or d_model, check the config.json file"
)
config = ModelConfig(
name=canonical_model_name(name),
max_seq_len=hf_config.max_position_embeddings if hasattr(
hf_config, "max_position_embeddings") else None,
num_layers=num_layers,
n_head=n_head,
hidden_dim=hidden_dim,
vocab_size=hf_config.vocab_size,
model_type=hf_config.model_type
if hasattr(hf_config, "model_type") else None,
num_key_value_heads=hf_config.num_key_value_heads if hasattr(
hf_config, "num_key_value_heads") else None,
)
return config
[docs]def read_configs(config_dir_name: str, type="model") -> dict:
"""Read configs from a directory."""
configs = {}
for filename in os.listdir(config_dir_name):
filepath = os.path.join(config_dir_name, filename)
with open(filepath, "r") as f:
config_json = json.load(f)
if type == "model":
config = ModelConfig(**config_json)
elif type == "gpu":
config = GPUConfig(**config_json)
elif type == "dtype":
config = DtypeConfig(**config_json)
else:
assert False, f"unknown config type when reading: {type}"
if config.name not in configs:
configs[config.name] = config
logger.info(f"Loaded {len(configs)} configs from {config_dir_name}")
return configs
[docs]def get_hf_models_by_type_and_task(
model_type: str = "opt",
task: str = None,
min_downloads: int = 10000,
top_k: int = 6,
full_info: bool = False,
) -> list:
"""Get a HuggingFace model name list by model type and task, filtered by popularity
(minimal number of downloads)
Args:
model_type (str, optional): model type, e.g., gpt, llama, opt, bloom. Defaults to "opt".
task (str, optional): model task, e.g., text-generation, fill-mask. Defaults to "text-generation".
min_downloads (int, optional): minimal number of downloads to filter the models. Defaults to 10000.
top_k (int, optional): _description_. Defaults to 6.
full_info (bool, optional): whether to return full model information, if False, just return the list of model names. Defaults to False.
Returns:
list: a list of HuggingFace model information
"""
try:
from huggingface_hub import HfApi
except ImportError:
logger.error(
f"cannot import HfApi from huggingface_hub, lease install huggingface_hub first"
)
api = HfApi()
models = api.list_models(filter=model_type)
logger.info(f"found {len(models)} models of type {model_type}")
# sort by number of downloads
ordered = sorted(
models,
reverse=True,
key=lambda t: t.downloads if hasattr(t, "downloads") else 0,
)
ret = []
for m in ordered:
if hasattr(m, "downloads") and m.downloads > min_downloads:
if task:
if hasattr(m, "pipeline_tag") and m.pipeline_tag == task:
ret.append(m)
else:
ret.append(m)
top_k = max(1, min(top_k, len(ret)))
logger.info(f"take top {top_k} of the list of found models")
if full_info:
return ret[:top_k]
return [r.modelId for r in ret][:top_k]
[docs]def populate_model_and_gpu_configs() -> None:
"""Populate model, gpu, and data type configs from the pre-defined json files."""
global model_configs, gpu_configs, dtype_configs
model_configs = read_configs(Path(__file__).parent /
Path(MODEL_CONFIG_DIR_NAME),
type="model")
gpu_configs = read_configs(Path(__file__).parent /
Path(GPU_CONFIG_DIR_NAME),
type="gpu")
dtype_configs = read_configs(Path(__file__).parent /
Path(DTYPE_CONFIG_DIR_NAME),
type="dtype")
logger.info(
f"Populated {len(model_configs)} model configs, {len(gpu_configs)} gpu configs, {len(dtype_configs)} dtype configs"
)
[docs]def list_model_configs() -> None:
"""List all predefined model configs."""
logger.info(model_configs.keys())
[docs]def list_gpu_configs() -> None:
"""List all predefined gpu configs."""
logger.info(gpu_configs.keys())
[docs]def list_dtype_configs() -> None:
"""List all predefined data type configs."""
logger.info(dtype_configs.keys())
[docs]def get_model_config_by_name(name_or_path: str) -> ModelConfig:
"""Get model config from the populated mapping by name, or from model config json file path, if not found from the previous methods, try to get it from HuggingFace."""
if name_or_path in model_configs:
return model_configs[name_or_path]
if os.path.isfile(name_or_path) and ".json" in name_or_path:
try:
with open(name_or_path, "r") as f:
config_json = json.load(f)
config = ModelConfig(**config_json)
if config.name not in model_configs:
model_configs[config.name] = config
return config
except Exception as e:
raise ValueError(f"unknown gpu config name: {e}")
model_config = get_model_config_from_hf(name_or_path)
if model_config is None:
raise (
f"unknown model config name: {name_or_path}, and none is found on HuggingFace Hub"
)
return model_config
[docs]def get_gpu_config_by_name(name: str) -> GPUConfig:
"""Get gpu config from the populated mapping by name."""
if name not in gpu_configs:
raise ValueError(f"unknown gpu config name: {name}")
return gpu_configs[name]
[docs]def get_dtype_config_by_name(name: str) -> DtypeConfig:
"""Get data type config from the populated mapping by name."""
if name not in dtype_configs:
raise ValueError(f"unknown quant config name: {name}")
return dtype_configs[name]
[docs]def dump_model_config_by_name(name: str,
config_dir_name: str = MODEL_CONFIG_DIR_NAME
) -> None:
"""Dump a model config from either the populated `model_configs` or Hugging Face by
name to `config_dir_name`
Args:
name (str): model name, e,g., gpt2, facebook/opt-1.3b, decapoda-research/llama-7b-hf, etc.
config_dir_name (str, optional): _description_. Defaults to MODEL_CONFIG_DIR_NAME.
"""
model_config = get_model_config_by_name(name)
dump_configs({model_config.name: model_config}, config_dir_name)
logger.info(f"dumped model config {model_config} to {config_dir_name}")
[docs]def dump_hf_model_configs_by_type_and_task(
model_type: str = "opt",
task: str = None,
min_downloads: int = 10000,
top_k: int = 6,
config_dir_name: str = MODEL_CONFIG_DIR_NAME,
) -> None:
"""Dump model configs from HuggingFace by type and task to `config_dir_name`
Args:
model_type (str, optional): model type, e.g., gpt, llama, opt, bloom. Defaults to "opt".
task (str, optional): model task, e.g., text-generation, fill-mask. Defaults to "text-generation".
min_downloads (int, optional): minimal number of downloads to filter the models. Defaults to 10000.
top_k (int, optional): _description_. Defaults to 6.
config_dir_name (str, optional): _description_. Defaults to MODEL_CONFIG_DIR_NAME.
"""
model_list = get_hf_models_by_type_and_task(
model_type=model_type,
task=task,
min_downloads=min_downloads,
top_k=top_k,
full_info=False,
)
for m in model_list:
dump_model_config_by_name(m, config_dir_name)
logger.info(
f"In total, dumped {len(model_list)} model configs of model_type={model_type}, task={task}, to {config_dir_name}"
)
populate_model_and_gpu_configs()
if __name__ == "__main__":
logger.setLevel(logging.getLevelName("INFO"))
fire.Fire(
{
"list_model_configs":
list_model_configs,
"list_gpu_configs":
list_gpu_configs,
"list_dtype_configs":
list_dtype_configs,
"get_model_config_by_name":
get_model_config_by_name,
"get_gpu_config_by_name":
get_gpu_config_by_name,
"get_dtype_config_by_name":
get_dtype_config_by_name,
"get_hf_models_by_type_and_task":
get_hf_models_by_type_and_task,
"dump_model_config_by_name":
dump_model_config_by_name,
"dump_hf_model_configs_by_type_and_task":
dump_hf_model_configs_by_type_and_task,
},
serialize=lambda x: json.dumps(x, cls=EnhancedJSONEncoder, indent=4)
if dataclasses.is_dataclass(x) else x,
)