我们将从零开始,使用 The Pile 数据集训练一个 20 亿参数的 LLM。最终结果是,我们得到的 LLM 在生成的回答中语法和标点完美无缺,短篇上下文能通顺连贯,但整个回答可能不完全合理。
之前,我写过一篇关于使用 Tiny Shakespeare 数据集创建一个 230 万参数 LLM 的文章,但其输出并不通顺。
# 2.3 Million Parameter LLM Output
ZELBETH:
Sey solmenter! tis tonguerered if
Vurint as steolated have loven OID the queend refore
Are been, good plmp:
Proforne, wiftes swleen, was no blunderesd a a quain beath!
Tybell is my gateer stalk smend as be matious dazest
我突然想到,如果我将 Transformer 架构做得更小、更简单,并且训练数据更加多样化,那么一个人使用几乎已经报废的 GPU,能创建出一个多大参数的模型,这个模型能说出语法正确、内容有些逻辑的文本呢?
这就是我们训练后模型的输出,具体请参见本文:
我发现,1300 万参数的模型就足以在语法和标点方面开始产生合理的输出,这是一个积极的信号。这意味着,我们可以使用一个非常具体的数据集,进一步对之前训练过的模型进行微调,以适应更小的任务。最终我们可能会得到一个参数量低于 10 亿,甚至接近 5 亿参数的模型,完美适用于我们的特定用例,尤其是可以安全地在私有数据上运行。
我建议你首先使用我在 GitHub 上提供的脚本训练一个 1300 万参数的模型。你将在一天内得到结果,而不必等待更长时间,或者如果你的本地 GPU 不足以训练一个十亿参数的模型时,这是一个很好的选择。(代码资源见文末)
确保你对面向对象编程(OOP)和神经网络(NN)有基本的理解。熟悉 PyTorch 会对编码有所帮助。
你将需要一个 GPU 来训练模型。Colab 或 Kaggle T4 适用于训练一个 1300 万以上参数的模型,但它们无法训练十亿参数的模型。下面是它们的比较:
确保你的环境中已安装 Git。首先,你需要克隆仓库:
git clone https://github.com/FareedKhan-dev/train-llm-from-scratch.git
cd train-llm-from-scratch
然后你可以安装所需的依赖项:
pip install -r requirements.txt
让我们导入在本文中将使用的必需库:
# PyTorch for deep learning functions and tensors
import torch
import torch.nn as nn
import torch.nn.functional as F
# Numerical operations and arrays handling
import numpy as np
# Handling HDF5 files
import h5py
# Operating system and file management
import os
# Command-line argument parsing
import argparse
# HTTP requests and interactions
import requests
# Progress bar for loops
from tqdm import tqdm
# JSON handling
import json
# Zstandard compression library
import zstandard as zstd
# Tokenization library for large language models
import tiktoken
# Math operations (used for advanced math functions)
import math
我们的训练数据集需要具有多样性,包含不同领域的信息,而 The Pile 正好适合这个需求。尽管它的大小为 825GB,但我们只使用其中的一小部分,即 5%-10%。首先,我们下载数据集并查看它是如何工作的。我将下载 HuggingFace 上提供的版本。
# Download validation dataset
!wget https://huggingface.co/datasets/monology/pile-uncopyrighted/resolve/main/val.jsonl.zst
# Download the first part of the training dataset
!wget https://huggingface.co/datasets/monology/pile-uncopyrighted/resolve/main/train/00.jsonl.zst
# Download the second part of the training dataset
!wget https://huggingface.co/datasets/monology/pile-uncopyrighted/resolve/main/train/01.jsonl.zst
# Download the third part of the training dataset
!wget https://huggingface.co/datasets/monology/pile-uncopyrighted/resolve/main/train/02.jsonl.zst
下载会花费一些时间,但你也可以将训练数据集限制为仅一个文件,00.jsonl.zst
,而不是三个。数据已经分为 train/val/test 。下载完成后,确保将文件正确放置到各自的目录中。
import os
import shutil
import glob
# Define directory structure
train_dir = "data/train"
val_dir = "data/val"
# Create directories if they don't exist
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
# Move all train files (e.g., 00.jsonl.zst, 01.jsonl.zst, ...)
train_files = glob.glob("*.jsonl.zst")
for file in train_files:
if file.startswith("val"):
# Move validation file
dest = os.path.join(val_dir, file)
else:
# Move training file
dest = os.path.join(train_dir, file)
shutil.move(file, dest)
我们的数据集是以.jsonl.zst
格式存储的,这是一种常用于存储大数据集的压缩文件格式。它结合了 JSON 行 (.jsonl) ,每一行代表一个有效的 JSON 对象,以及 Zstandard (.zst) 压缩。让我们读取其中一个下载文件的样本,看看它是什么样的。
in_file = "data/val/val.jsonl.zst" # Path to our validation file
with zstd.open(in_file, 'r') as in_f:
for i, line in tqdm(enumerate(in_f)): # Read first 5 lines
data = json.loads(line)
print(f"Line {i}: {data}") # Print the raw data for inspection
if i == 2:
break
#### OUTPUT ####
Line: 0
{
"text": "Effect of sleep quality ... epilepsy.",
"meta": {
"pile_set_name": "PubMed Abstracts"
}
}
Line: 1
{
"text": "LLMops a new GitHub Repository ...",
"meta": {
"pile_set_name": "Github"
}
}
接下来,我们需要对数据集进行编码(分词)。我们的目标是让 LLM 至少能够输出正确的单词。为此,我们需要使用一个已经存在的分词器。我们将使用 OpenAI 提供的开源分词器 tiktoken。我们将使用r50k_base
分词器,这个分词器是 ChatGPT(GPT-3)模型使用的,用来对我们的数据集进行分词。
为了避免重复,我们需要为此创建一个函数,因为我们将对训练集和验证集都进行分词处理。
def process_files(input_dir, output_file):
"""
Process all .zst files in the specified input directory and save encoded tokens to an HDF5 file.
Args:
input_dir (str): Directory containing input .zst files.
output_file (str): Path to the output HDF5 file.
"""
with h5py.File(output_file, 'w') as out_f:
# Create an expandable dataset named 'tokens' in the HDF5 file
dataset = out_f.create_dataset('tokens', (0,), maxshape=(None,), dtype='i')
start_index = 0
# Iterate through all .zst files in the input directory
for filename in sorted(os.listdir(input_dir)):
if filename.endswith(".jsonl.zst"):
in_file = os.path.join(input_dir, filename)
print(f"Processing: {in_file}")
# Open the .zst file for reading
with zstd.open(in_file, 'r') as in_f:
# Iterate through each line in the compressed file
for line in tqdm(in_f, desc=f"Processing {filename}"):
# Load the line as JSON
data = json.loads(line)
# Append the end-of-text token to the text and encode it
text = data['text'] + "<|endoftext|>"
encoded = enc.encode(text, allowed_special={'<|endoftext|>'})
encoded_len = len(encoded)
# Calculate the end index for the new tokens
end_index = start_index + encoded_len
# Expand the dataset size and store the encoded tokens
dataset.resize(dataset.shape[0] + encoded_len, axis=0)
dataset[start_index:end_index] = encoded
# Update the start index for the next batch of tokens
start_index = end_index
这个函数有两个重要的要点:
<|endoftext|>
标记表示每个文本序列的结束,向模型发出信号,表明已到达一个有意义的上下文的结尾,这有助于生成连贯的输出。现在,我们可以简单地使用以下方式对训练集和验证集进行编码:
# Define tokenized data output directories
out_train_file = "data/train/pile_train.h5"
out_val_file = "data/val/pile_dev.h5"
# Loading tokenizer of (GPT-3/GPT-2 Model)
enc = tiktoken.get_encoding('r50k_base')
# Process training data
process_files(train_dir, out_train_file)
# Process validation data
process_files(val_dir, out_val_file)
让我们先看看分词后的数据样本:
with h5py.File(out_val_file, 'r') as file:
# Access the 'tokens' dataset
tokens_dataset = file['tokens']
# Print the dtype of the dataset
print(f"Dtype of 'tokens' dataset: {tokens_dataset.dtype}")
# load and print the first few elements of the dataset
print("First few elements of the 'tokens' dataset:")
print(tokens_dataset[:10]) # First 10 token
#### OUTPUT ####
Dtype of 'tokens' dataset: int32
First few elements of the 'tokens' dataset:
[ 2725 6557 83 23105 157 119 229 77 5846 2429]
我们已经准备好了训练数据集,现在我们将编写 Transformer 架构,并相应地探讨其理论。
让我们快速了解一下 Transformer 架构如何处理和理解文本。它通过将文本分解成更小的单元,称为 token,然后预测序列中下一个 token。一个 Transformer 包含许多层,这些层被称为 Transformer 块,它们堆叠在一起,最终一层用于进行预测。
每个 Transformer 块有两个主要组件:
综合来看,注意力头负责“关注什么”,而 MLP 则负责“如何去思考”。通过堆叠许多 Transformer 块,模型能够理解文本中的复杂模式和关系,但这并不总是能够得到保证。
与其查看原论文中的图示,我们不如来看看一个更简单、更易理解的架构图,这就是我们将要编写代码的内容。
Transformer 架构,作者手绘
我们一起来阅读一下我们将要编写代码的架构流程:
MLP 是 Transformer 前馈网络中的一个基本构建块。它的作用是引入非线性并学习嵌入表示中的复杂关系。在定义 MLP 模块时,一个重要的参数是n_embed
,它定义了输入嵌入的维度。
MLP 通常包含一个隐藏的线性层,该层将输入维度扩展一个倍数(通常为 4,我们也使用这个倍数),接着是一个非线性激活函数,通常使用 ReLU。这种结构使得我们的网络能够学习更复杂的特征。最后,投影线性层将扩展后的表示映射回原始嵌入维度。这一系列的变换使得 MLP 能够精炼注意力机制所学到的表示。
MLP,作者手绘
# --- MLP (Multi-Layer Perceptron) Class ---
class MLP(nn.Module):
"""
A simple Multi-Layer Perceptron with one hidden layer.
This module is used within the Transformer block for feed-forward processing.
It expands the input embedding size, applies a ReLU activation, and then projects it back
to the original embedding size.
"""
def __init__(self, n_embed):
super().__init__()
self.hidden = nn.Linear(n_embed, 4 * n_embed) # Linear layer to expand embedding size
self.relu = nn.ReLU() # ReLU activation function
self.proj = nn.Linear(4 * n_embed, n_embed) # Linear layer to project back to original size
def forward(self, x):
"""
Forward pass through the MLP.
Args:
x (torch.Tensor): Input tensor of shape (B, T, C), where B is batch size,
T is sequence length, and C is embedding size.
Returns:
torch.Tensor: Output tensor of the same shape as the input.
"""
x = self.forward_embedding(x)
x = self.project_embedding(x)
return x
def forward_embedding(self, x):
"""
Applies the hidden linear layer followed by ReLU activation.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output after the hidden layer and ReLU.
"""
x = self.relu(self.hidden(x))
return x
def project_embedding(self, x):
"""
Applies the projection linear layer.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output after the projection layer.
"""
x = self.proj(x)
return x
我们刚刚编写了 MLP 部分,其中__init__
方法初始化了一个隐藏的线性层,用于扩展输入嵌入大小(n_embed
),以及一个投影层,用于将其缩小回原始维度。ReLU 激活函数在隐藏层后应用。forward
方法定义了数据流经这些层的过程,通过forward_embedding
应用隐藏层和 ReLU,通过project_embedding
应用投影层。
注意力头是我们模型的核心部分。它的作用是聚焦于输入序列中相关的部分。在定义一个 Head 模块时,几个重要的参数是head_size
、n_embed
和context_length
。head_size
参数决定了 key、query 和 value 投影的维度,进而影响注意力机制的表示能力。
输入嵌入维度n_embed
定义了这些投影层的输入大小。context_length
用于创建因果掩码,确保模型只关注前面的 token。
在 Head 模块内,key、query 和 value 的线性层(nn.Linear
)是没有偏置初始化的。一个大小为context_length x context_length
的下三角矩阵(tril
)作为缓冲区进行注册,以实现因果掩码,防止注意力机制关注未来的 token。
单头注意力机制,作者手绘
# --- Attention Head Class ---
class Head(nn.Module):
"""
A single attention head.
This module calculates attention scores and applies them to the values.
It includes key, query, and value projections, and uses causal masking
to prevent attending to future tokens.
"""
def __init__(self, head_size, n_embed, context_length):
super().__init__()
self.key = nn.Linear(n_embed, head_size, bias=False) # Key projection
self.query = nn.Linear(n_embed, head_size, bias=False) # Query projection
self.value = nn.Linear(n_embed, head_size, bias=False) # Value projection
# Lower triangular matrix for causal masking
self.register_buffer('tril', torch.tril(torch.ones(context_length, context_length)))
def forward(self, x):
"""
Forward pass through the attention head.
Args:
x (torch.Tensor): Input tensor of shape (B, T, C).
Returns:
torch.Tensor: Output tensor after applying attention.
"""
B, T, C = x.shape
k = self.key(x) # (B, T, head_size)
q = self.query(x) # (B, T, head_size)
scale_factor = 1 / math.sqrt(C)
# Calculate attention weights: (B, T, head_size) @ (B, head_size, T) -> (B, T, T)
attn_weights = q @ k.transpose(-2, -1) * scale_factor
# Apply causal masking
attn_weights = attn_weights.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
attn_weights = F.softmax(attn_weights, dim=-1)
v = self.value(x) # (B, T, head_size)
# Apply attention weights to values
out = attn_weights @ v # (B, T, T) @ (B, T, head_size) -> (B, T, head_size)
return out
我们的注意力头类的__init__
方法初始化了 key、query 和 value 投影的线性层,每个投影将输入嵌入(n_embed
)映射到head_size
。基于context_length
的下三角矩阵用于因果掩码。forward
方法通过对 query 和 key 的点积进行缩放,计算注意力权重,应用因果掩码,使用 softmax 进行归一化,并计算值的加权和,生成注意力输出。
为了捕捉输入序列中多样的关系,我们将使用多头注意力的概念。MultiHeadAttention
模块管理多个独立的注意力头,它们并行工作。
这里的关键参数是n_head
,它决定了并行注意力头的数量。输入嵌入维度(n_embed
)和context_length
也是实例化单个注意力头所需的参数。每个头独立处理输入,并将其投影到一个更低维的子空间,大小为n_embed // n_head
。通过使用多个头,模型可以同时关注输入的不同方面。
多头注意力机制,作者手绘
# --- Multi-Head Attention Class ---
class MultiHeadAttention(nn.Module):
"""
Multi-Head Attention module.
This module combines multiple attention heads in parallel. The outputs of each head
are concatenated to form the final output.
"""
def __init__(self, n_head, n_embed, context_length):
super().__init__()
self.heads = nn.ModuleList([Head(n_embed // n_head, n_embed, context_length) for _ in range(n_head)])
def forward(self, x):
"""
Forward pass through the multi-head attention.
Args:
x (torch.Tensor): Input tensor of shape (B, T, C).
Returns:
torch.Tensor: Output tensor after concatenating the outputs of all heads.
"""
# Concatenate the output of each head along the last dimension (C)
x = torch.cat([h(x) for h in self.heads], dim=-1)
return x
现在我们已经定义了MultiHeadAttention
类,它将多个注意力头结合在一起,__init__
方法初始化了一个Head
实例的列表(总共有n_head
个实例),每个实例的head_size
为n_embed // n_head
。forward
方法将每个注意力头应用于输入 x,并将它们的输出沿最后一个维度拼接在一起,合并每个头学习到的信息。
要构建一个百亿级参数的模型,必须设计一个深度架构。为此,需要实现 Transformer 块并进行堆叠。块的关键参数包括n_head
(注意力头的数量)、n_embed
(嵌入维度)和context_length
(上下文长度)。每个块由一个多头注意力层和一个前馈神经网络(MLP)组成,在每个模块前应用层归一化(Layer Normalization),并在计算后使用残差连接以优化梯度流。
层归一化由n_embed
设定,确保训练过程的稳定性。多头注意力机制依赖n_head
、n_embed
和context_length
进行计算,从不同角度捕捉序列中的信息。MLP 作为前馈神经网络,同样基于n_embed
进行处理,以进一步提取和转换特征。这些组件协同工作,使模型能够学习复杂的数据模式,提高文本理解和生成能力。
Transformer 块,作者手绘
# --- Transformer Block Class ---
class Block(nn.Module):
"""
A single Transformer block.
This block consists of a multi-head attention layer followed by an MLP,
with layer normalization and residual connections.
"""
def __init__(self, n_head, n_embed, context_length):
super().__init__()
self.ln1 = nn.LayerNorm(n_embed)
self.attn = MultiHeadAttention(n_head, n_embed, context_length)
self.ln2 = nn.LayerNorm(n_embed)
self.mlp = MLP(n_embed)
def forward(self, x):
"""
Forward pass through the Transformer block.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor after the block.
"""
# Apply multi-head attention with residual connection
x = x + self.attn(self.ln1(x))
# Apply MLP with residual connection
x = x + self.mlp(self.ln2(x))
return x
def forward_embedding(self, x):
"""
Forward pass focusing on the embedding and attention parts.
Args:
x (torch.Tensor): Input tensor.
Returns:
tuple: A tuple containing the output after MLP embedding and the residual.
"""
res = x + self.attn(self.ln1(x))
x = self.mlp.forward_embedding(self.ln2(res))
return x, res
我们的Block
类表示一个单独的 Transformer 块。__init__
方法初始化了层归一化层(ln1
,ln2
)、MultiHeadAttention
模块和 MLP 模块,所有这些都由n_head
、n_embed
和context_length
参数化。
forward
方法实现了块的前向传递,首先应用层归一化和多头注意力(带残差连接),然后是另一个层归一化和 MLP,依然带有残差连接。forward_embedding
方法提供了一种聚焦于注意力和初始 MLP 嵌入阶段的替代前向传递。
到目前为止,我们已经编写了 Transformer 模型的小组件。接下来,我们将 token 和 position embedding 与一系列 Transformer 块集成,以执行序列到序列的任务。为此,我们需要编写几个关键参数:n_head
、n_embed
、context_length
、vocab_size
和N_BLOCKS
。
vocab_size
决定了 token 嵌入层的大小,将每个 token 映射到大小为n_embed
的密集向量。context_length
参数对位置嵌入层至关重要,位置嵌入层对输入序列中每个 token 的位置进行编码,维度也为n_embed
。注意力头的数量(n_head
)和块的数量(N_BLOCKS
)决定了网络的深度和复杂度。
这些参数共同定义了 Transformer 模型的架构和容量,因此我们需要编写它。
Transformer 类,作者手绘
# --- Transformer Model Class ---
class Transformer(nn.Module):
"""
The main Transformer model.
This class combines token and position embeddings with a sequence of Transformer blocks
and a final linear layer for language modeling.
"""
def __init__(self, n_head, n_embed, context_length, vocab_size, N_BLOCKS):
super().__init__()
self.context_length = context_length
self.N_BLOCKS = N_BLOCKS
self.token_embed = nn.Embedding(vocab_size, n_embed)
self.position_embed = nn.Embedding(context_length, n_embed)
self.attn_blocks = nn.ModuleList([Block(n_head, n_embed, context_length) for _ in range(N_BLOCKS)])
self.layer_norm = nn.LayerNorm(n_embed)
self.lm_head = nn.Linear(n_embed, vocab_size)
self.register_buffer('pos_idxs', torch.arange(context_length))
def _pre_attn_pass(self, idx):
"""
Combines token and position embeddings.
Args:
idx (torch.Tensor): Input token indices.
Returns:
torch.Tensor: Sum of token and position embeddings.
"""
B, T = idx.shape
tok_embedding = self.token_embed(idx)
pos_embedding = self.position_embed(self.pos_idxs[:T])
return tok_embedding + pos_embedding
def forward(self, idx, targets=None):
"""
Forward pass through the Transformer.
Args:
idx (torch.Tensor): Input token indices.
targets (torch.Tensor, optional): Target token indices for loss calculation. Defaults to None.
Returns:
tuple: Logits and loss (if targets are provided).
"""
x = self._pre_attn_pass(idx)
for block in self.attn_blocks:
x = block(x)
x = self.layer_norm(x)
logits = self.lm_head(x)
loss = None
if targets is not None:
B, T, C = logits.shape
flat_logits = logits.view(B * T, C)
targets = targets.view(B * T).long()
loss = F.cross_entropy(flat_logits, targets)
return logits, loss
def forward_embedding(self, idx):
"""
Forward pass focusing on the embedding and attention blocks.
Args:
idx (torch.Tensor): Input token indices.
Returns:
tuple: Output after attention blocks and the residual.
"""
x = self._pre_attn_pass(idx)
residual = x
for block in self.attn_blocks:
x, residual = block.forward_embedding(x)
return x, residual
def generate(self, idx, max_new_tokens):
"""
Generates new tokens given a starting sequence.
Args:
idx (torch.Tensor): Initial sequence of token indices.
max_new_tokens (int): Number of tokens to generate.
Returns:
torch.Tensor: The extended sequence of tokens.
"""
for _ in range(max_new_tokens):
idx_cond = idx[:, -self.context_length:]
logits, _ = self(idx_cond)
logits = logits[:, -1, :]
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
idx = torch.cat((idx, idx_next), dim=1)
return idx
我们的Transformer
类的__init__
方法初始化了 token 和位置嵌入层(token_embed
,position_embed
),一系列Block
模块(attn_blocks
),最终的层归一化层(layer_norm
),以及用于语言建模的线性层(lm_head
)。
_pre_attn_pass
方法将 token 嵌入和位置嵌入结合起来。forward
方法将输入序列通过嵌入层和一系列 Transformer 块进行处理,应用最终的层归一化,并生成 logits。如果提供了目标数据,它还会计算损失。forward_embedding
方法提供了一个中间的前向传递过程,直到注意力块的输出,而generate
方法实现了 token 生成。
当我们在大数据上训练深度学习模型时,由于 GPU 限制,我们会将数据分批处理。因此,我们需要创建一个get_batch_iterator
函数,该函数接收数据路径(指向 HDF5 文件)、所需的batch_size
、每个序列的context_length
以及数据加载的设备。
batch_size
决定了在训练过程中每次处理多少个序列,而context_length
指定了每个输入序列的长度。data_path
指向训练数据所在的位置。
# --- Data Loading Utility ---
def get_batch_iterator(data_path, batch_size, context_length, device="gpu"):
"""
Creates an iterator for generating batches of data from an HDF5 file.
Args:
data_path (str): Path to the HDF5 file containing tokenized data.
batch_size (int): Number of sequences in each batch.
context_length (int): Length of each sequence.
device (str, optional): Device to load the data onto ('cpu' or 'cuda'). Defaults to "cpu".
Yields:
tuple: A tuple containing input sequences (xb) and target sequences (yb).
"""
# Open the HDF5 file in read mode
with h5py.File(data_path, 'r') as hdf5_file:
# Extract the dataset of tokenized sequences
dataset = hdf5_file['tokens']
# Get the total size of the dataset
dataset_size = dataset.shape[0]
# Calculate the number of examples (sequences) that can be made from the data
n_examples = (dataset_size - 1) // context_length
# Create an array of indices for examples and shuffle them for randomness
example_idxs = np.arange(n_examples)
np.random.shuffle(example_idxs)
# Initialize epoch counter and example counter
epochs = 0
counter = 0
while True:
# Check if the current batch exceeds the number of available examples
if counter + batch_size > n_examples:
# Shuffle the indices again and reset the counter to 0
np.random.shuffle(example_idxs)
counter = 0
print(f"Finished epoch {epochs}") # Print epoch number when an epoch finishes
epochs += 1 # Increment the epoch counter
# Select a batch of random indices to generate sequences
random_indices = example_idxs[counter:counter+batch_size] * context_length
# Retrieve sequences from the dataset based on the random indices
random_samples = torch.tensor(np.array([dataset[idx:idx+context_length+1] for idx in random_indices]))
# Separate the input sequences (xb) and target sequences (yb)
xb = random_samples[:, :context_length].to(device) # Input sequence (first half of the random sample)
yb = random_samples[:, 1:context_length+1].to(device) # Target sequence (second half of the random sample)
# Increment the counter to move to the next batch
counter += batch_size
# Yield the input and target sequences as a tuple for the current batch
yield xb, yb
我们的get_batch_iterator
函数处理训练数据的加载和批处理。它接收data_path
、batch_size
、context_length
和device
作为输入。该函数打开 HDF5 文件,打乱数据,然后进入无限循环以生成批次。在每次迭代中,它会随机选择数据的一个子集,形成一个输入序列(xb
)及其相应的目标序列(yb
)批次。
现在我们已经编写了模型代码,接下来需要定义训练参数,例如头的数量、块的数量等,以及数据路径。
# --- Configuration ---
# Define vocabulary size and transformer configuration
VOCAB_SIZE = 50304 # Number of unique tokens in the vocabulary
CONTEXT_LENGTH = 512 # Maximum sequence length for the model
N_EMBED = 2048 # Dimension of the embedding space
N_HEAD = 16 # Number of attention heads in each transformer block
N_BLOCKS = 64 # Number of transformer blocks in the model
# Paths to training and development datasets
TRAIN_PATH = "data/train/pile_val.h5"# File path for the training dataset
DEV_PATH = "data/val/pile_val.h5" # File path for the validation dataset
# Transformer training parameters
T_BATCH_SIZE = 32 # Number of samples per training batch
T_CONTEXT_LENGTH = 16 # Context length for training batches
T_TRAIN_STEPS = 200000 # Total number of training steps
T_EVAL_STEPS = 1000 # Frequency (in steps) to perform evaluation
T_EVAL_ITERS = 250 # Number of iterations to evaluate the model
T_LR_DECAY_STEP = 50000 # Step at which to decay the learning rate
T_LR = 5e-4 # Initial learning rate for training
T_LR_DECAYED = 5e-5 # Learning rate after decay
T_OUT_PATH = "models/transformer_B.pt"# Path to save the trained model
# Device configuration
DEVICE = 'cuda'
# Store all configurations in a dictionary for easy access and modification
default_config = {
'vocab_size': VOCAB_SIZE,
'context_length': CONTEXT_LENGTH,
'n_embed': N_EMBED,
'n_head': N_HEAD,
'n_blocks': N_BLOCKS,
'train_path': TRAIN_PATH,
'dev_path': DEV_PATH,
't_batch_size': T_BATCH_SIZE,
't_context_length': T_CONTEXT_LENGTH,
't_train_steps': T_TRAIN_STEPS,
't_eval_steps': T_EVAL_STEPS,
't_eval_iters': T_EVAL_ITERS,
't_lr_decay_step': T_LR_DECAY_STEP,
't_lr': T_LR,
't_lr_decayed': T_LR_DECAYED,
't_out_path': T_OUT_PATH,
'device': DEVICE,
}
对于大多数参数,我使用了最常见的值,并将它们存储在字典中以便于访问。在这里,这些参数适用于一个十亿参数的模型。如果你想训练一个参数量在百万级的模型,你可以减少主要参数,包括CONTEXT_LENGTH
、N_EMBED
、N_HEAD
和N_BLOCKS
。不过,你也可以运行我 GitHub 仓库中的百万参数模型脚本。
让我们初始化我们的 Transformer 模型,并检查它的总参数数量。
# --- Initialize the Model and Print Parameters ---
model = Transformer(
n_head=config['n_head'],
n_embed=config['n_embed'],
context_length=config['context_length'],
vocab_size=config['vocab_size'],
N_BLOCKS=config['n_blocks']
).to(config['device'])
# Print the total number of parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters in the model: {total_params:,}")
#### OUTPUT ####
2,141,346,251
现在我们已经有了一个 20 亿参数的模型,需要定义 Adam 优化器和损失跟踪函数,以便在整个训练过程中跟踪模型的训练进度。
# --- Optimizer Setup and Loss Tracking ---
# Set up the AdamW optimizer with the specified learning rate.
optimizer = torch.optim.AdamW(model.parameters(), lr=config['t_lr'])
# List to track loss values during training.
losses = []
# Define a window size for averaging recent losses in the training loop.
AVG_WINDOW = 64
# Helper function to estimate the average loss for training and development data.
@torch.no_grad()
def estimate_loss(steps):
"""
Evaluate the model on training and development datasets and calculate average loss.
Args:
steps (int): Number of steps to evaluate.
Returns:
dict: Dictionary containing average losses for 'train' and 'dev' splits.
"""
out = {}
model.eval() # Set the model to evaluation mode.
for split in ['train', 'dev']:
# Select the appropriate data path for the current split.
data_path = config['train_path'] if split == 'train'else config['dev_path']
# Create a batch iterator for evaluation.
batch_iterator_eval = get_batch_iterator(
data_path, config['t_batch_size'], config['t_context_length'], device=config['device']
)
# Initialize a tensor to track loss values for each evaluation step.
losses_eval = torch.zeros(steps)
for k in range(steps):
try:
# Fetch a batch and calculate the loss.
xb, yb = next(batch_iterator_eval)
_, loss = model(xb, yb)
losses_eval[k] = loss.item()
except StopIteration:
# Handle the case where the data iterator ends early.
print(f"Warning: Iterator for {split} ended early.")
break
# Compute the mean loss for the current split.
out[split] = losses_eval[:k + 1].mean()
model.train() # Restore the model to training mode.
return out
现在,我们将初始化批处理函数和训练循环,以启动模型训练。
# --- Training Loop ---
# Create a batch iterator for the training data.
batch_iterator = get_batch_iterator(
config['train_path'],
config['t_batch_size'],
config['t_context_length'],
device=config['device']
)
# Create a progress bar to monitor training progress.
pbar = tqdm(range(config['t_train_steps']))
for step in pbar:
try:
# Fetch a batch of input and target data.
xb, yb = next(batch_iterator)
# Perform a forward pass and compute the loss.
_, loss = model(xb, yb)
# Record the loss for tracking.
losses.append(loss.item())
pbar.set_description(f"Train loss: {np.mean(losses[-AVG_WINDOW:]):.4f}")
# Backpropagate the loss and update the model parameters.
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
# Periodically evaluate the model on training and development data.
if step % config['t_eval_steps'] == 0:
train_loss, dev_loss = estimate_loss(config['t_eval_iters']).values()
print(f"Step: {step}, Train loss: {train_loss:.4f}, Dev loss: {dev_loss:.4f}")
# Decay the learning rate at the specified step.
if step == config['t_lr_decay_step']:
print('Decaying learning rate')
for g in optimizer.param_groups:
g['lr'] = config['t_lr_decayed']
except StopIteration:
# Handle the case where the training data iterator ends early.
print("Training data iterator finished early.")
break
由于我们的训练循环具有错误处理能力,如果训练过程中出现错误,部分训练完成的模型将被保存,以避免数据丢失。训练完成后,我们可以保存已训练的模型,以便后续推理使用。
# --- Save Model and Final Evaluation ---
# Perform a final evaluation of the model on training and development datasets.
train_loss, dev_loss = estimate_loss(200).values()
# Ensure unique model save path in case the file already exists.
modified_model_out_path = config['t_out_path']
save_tries = 0
while os.path.exists(modified_model_out_path):
save_tries += 1
model_out_name = os.path.splitext(config['t_out_path'])[0]
modified_model_out_path = model_out_name + f"_{save_tries}" + ".pt"
# Save the model's state dictionary, optimizer state, and training metadata.
torch.save(
{
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'losses': losses,
'train_loss': train_loss,
'dev_loss': dev_loss,
'steps': len(losses),
},
modified_model_out_path
)
print(f"Saved model to {modified_model_out_path}")
print(f"Finished training. Train loss: {train_loss:.4f}, Dev loss: {dev_loss:.4f}")
对于这款拥有十亿参数的模型,最终训练损失(training loss)为 0.2314,验证损失(dev loss)为 0.643。
当我绘制百万参数模型与十亿参数模型的损失曲线时,可以明显看到二者的差异。
训练损失对比
十亿参数模型在训练初期的损失较高,并且波动较大。尽管最初下降较快,但随后出现震荡,直到逐渐趋于平稳。这表明更大的模型在学习初期更难找到合适的优化方向,可能需要更多的数据以及更精细的超参数设置。当降低学习率(红色曲线)时,损失下降得更加稳定,这表明调整学习率有助于模型优化。
相比之下,百万参数模型的损失从一开始就下降得较为顺利,并且波动较小。当降低学习率时,对损失曲线的影响较小。这可能是因为较小的模型结构更简单,更容易训练并快速收敛。这个对比表明,大型模型的训练复杂度更高,可能需要不同的训练策略和更长的训练时间才能获得较好的效果。
现在,我们已经成功保存了训练完成的模型,终于可以进行推理,看看它能生成怎样的文本。
让我们创建一个文本生成函数,该函数接收已保存的模型路径和编码器作为输入,并返回生成的文本。
def generate_text(model_path, input_text, max_length=512, device="gpu"):
"""
Generate text using a pre-trained model based on the given input text.
Args:
- model_path (str): Path to the model checkpoint.
- device (torch.device): Device to load the model on (e.g., 'cpu' or 'cuda').
- input_text (str): The input text to seed the generation.
- max_length (int, optional): Maximum length of generated text. Defaults to 512.
Returns:
- str: The generated text.
"""
# Load the model checkpoint
checkpoint = torch.load(model_path)
# Initialize the model (you should ensure that the Transformer class is defined elsewhere)
model = Transformer().to(device)
# Load the model's state dictionary
model.load_state_dict(checkpoint['model_state_dict'])
# Load the tokenizer for the GPT model (we use 'r50k_base' for GPT models)
enc = tiktoken.get_encoding('r50k_base')
# Encode the input text along with the end-of-text token
input_ids = torch.tensor(
enc.encode(input_text, allowed_special={'<|endoftext|>'}),
dtype=torch.long
)[None, :].to(device) # Add batch dimension and move to the specified device
# Generate text with the model using the encoded input
with torch.no_grad():
# Generate up to 'max_length' tokens of text
generated_output = model.generate(input_ids, max_length)
# Decode the generated tokens back into text
generated_text = enc.decode(generated_output[0].tolist())
return generated_text
我们需要调用之前定义的 Transformer 结构来加载模型架构,并将已保存的模型作为该架构的状态进行加载。
让我们先不提供任何输入,分别观察百万参数模型和十亿参数模型的随机生成结果,看看它们能输出什么内容。
# Defining the file paths for the pre-trained models
Billion_model_path = 'models/transformer_B.pt'# Path to the Billion model
Million_model_path = 'models/transformer_M.pt'# Path to the Million model
# Using '<|endoftext|>' as input to the models (acts as a prompt that allows the models to generate text freely)
input_text = "<|endoftext|>"
# Call the function to generate text based on the input text using the Billion model
B_output = generate_text(Billion_model_path, input_text)
# Call the function to generate text based on the input text using the Million model
M_output = generate_text(Million_model_path, input_text)
# Print the output generated by both models
print(B_output) # Output from the Billion model
print(M_output) # Output from the Million model
两个 LLM 在处理短小且简单的上下文时,都能够生成清晰准确的文本。例如,在百万参数模型的输出中,句子“The villages were directly linked to cities in China”(这些村庄直接与中国的城市相连)逻辑清晰,表达明确,能够很好地传达信息,并将村庄与城市的联系展现出来。
然而,随着上下文变长、变复杂,文本的连贯性开始下降。例如,在十亿参数模型的输出中,句子“There are two miles east coast from 1037 and 73 million refugees (hypotetus)”以及“blacksmith, musician and boutique hospitality and inspire the strain delivered Canadians”变得难以理解,句子结构不够流畅,信息表达也显得割裂。尽管其中的单词可能都是正确的,但整体意思变得混乱且缺乏逻辑性。
值得注意的是,1300 万以上参数的 LLM 也开始能够生成一定程度上有意义的内容,并且拼写正确。例如,当我输入主题文本时,它会尝试为我生成一封电子邮件。尽管在更广泛的文本生成任务上仍然难以提供连贯的结果,但可以观察到它的初步成效,如下所示:
# Input text
input_text "Subject: "
# Call the Million parameter Mod
m_output = generate_text(Million_model_path, input_text)
print(m_output) # Output from the Million model
Good morning, I hope this message finds you well, as the sun gently peeks through the clouds, ... |
我们的百万参数模型证明了一个可行性,即可以构建一个规模小于 10 亿参数的、目标明确的 LLM。而我们的 10 亿参数模型则表明,在这种规模下,模型架构需要经过深思熟虑的设计,否则训练效果和性能可能不会比百万参数模型更好,甚至可能出现过拟合的情况。除非模型架构足够深入,否则仅仅增加参数规模并不能带来真正的提升。
我建议先训练一个 1300 万以上参数的模型,然后逐步扩展,在此基础上增加 100 个参数,以提升其处理短上下文的能力。具体要训练多少参数取决于你的应用需求。在 10 亿参数以内的范围内,可以尝试在特定领域的数据上进行微调,例如电子邮件写作或论文生成,并观察模型的文本生成效果。
本文所有代码都可以在我的 GitHub 仓库中找到:https://github.com/FareedKhan-dev/train-llm-from-scratch
代码库的组织结构如下:
train-llm-from-scratch/
├── src/
│ ├── models/
│ │ ├── mlp.py # Definition of the Multi-Layer Perceptron (MLP) module
│ │ ├── attention.py # Definitions for attention mechanisms (single-head, multi-head)
│ │ ├── transformer_block.py # Definition of a single Transformer block
│ │ ├── transformer.py # Definition of the main Transformer model
├── config/
│ └── config.py # Contains default configurations (model parameters, file paths, etc.)
├── data_loader/
│ └── data_loader.py # Contains functions for creating data loaders/iterators
├── scripts/
│ ├── train_transformer.py # Script for training the Transformer model
│ ├── data_download.py # Script for downloading the dataset
│ ├── data_preprocess.py # Script for preprocessing the downloaded data
│ ├── generate_text.py # Script for generating text using a trained model
├── data/ # Directory to store the dataset
│ ├── train/ # Contains training data
│ └── val/ # Contains validation data
├── models/ # Directory where trained models are saved