Toy Generatively Pretrained Transformer (GPT)
Yet another attempt to closely replicate the popular PyTorch code of close on the heels of the exploration of basic
I have ported the code to TensorFlow and may have caused a bug or two which are not evident. But the code works as I expected. But this is just the beginning stage. At this point there is no GPT.
- The flow of the article mirrors the way the code is developed iteratively
- I haven’t coded this to use batches at this time hoping to add it later. Nor have I split the dataset to get a validation set.
- In many cases I had to execute the PyTorch code to understand the shapes. Yet some shape here or there may be wrong but the code executes without errors.
- The final code is here and further improvements will be committed.
- I haven’t specifically trained using a GPU but with some some simple code changes it can be.
Stage 1
The dataset used here is not cleaned. There are characters and numbers in it. So newlines are removed but data can be properly fed at a later stage.
import tensorflow as tf
import tensorflow_probability as tfp
from keras.layers import Embedding
input ="/Users/anu/PycharmProjects/TensorFlow2/shakespeare.txt")
input = tf.strings.strip(input)
input = tf.strings.regex_replace(input,' +', '')
input = tf.strings.regex_replace(input,'\n', '')
length = int(tf.strings.length(input))
vocab = tf.strings.unicode_split_with_offsets(input, 'UTF-8')
elem,idx = tf.unique(vocab[0])
vocab_size = len(elem)
print(f'Size of vocabulary={vocab_size}')
table = tf.lookup.StaticHashTable(
values=tf.constant([idx for idx, inp in enumerate(elem)]),
indextoelem = tf.lookup.StaticHashTable(
keys=tf.strings.as_string([idx for idx, inp in enumerate(elem)]),
def random_sample(text):
rand = tf.random.uniform(shape=[], minval=1, maxval=length - 201)
start = int(rand)
# print(f'Start={int(rand)} Length={length} End={start + 200 + 1}')
return tf.strings.substr(text,start, 201, unit='BYTE')
global samplelist,reversesamplelist
samplelist = []
reversesamplelist = []
def reverse_map_fn(bytes):
return bytes
def map_fn(bytes):
return bytes
def draw_random_sample(block_size):
sample = tf.strings.substr(input,0, block_size, unit='BYTE')
split_sample = tf.strings.bytes_split(sample)
tf.map_fn(map_fn, tf.strings.bytes_split(split_sample))
global samplelist
X,y = (tf.stack(samplelist[:-1]),tf.stack(samplelist[1:]))
samplelist = []
return X,y
def reverse_map(X):
tf.map_fn(reverse_map_fn, X)
X,y = draw_random_sample(9)
vocab_size = len(elem)
def decode(idx):
return idx,indextoelem.lookup(
tf.strings.as_string([inp for inp, inp in enumerate(idx)]))
The model sub-class is simple and accepts the entire sequence of characters everytime. But we are predicting the next character only based on the previous character. This will be addressed later.
tf.Tensor([0], shape=(1,), dtype=int64)
tf.Tensor([0 8], shape=(2,), dtype=int64)
tf.Tensor([ 0 8 43], shape=(3,), dtype=int64)
tf.Tensor([ 0 8 43 67], shape=(4,), dtype=int64)
tf.Tensor([ 0 8 43 67 5], shape=(5,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31], shape=(6,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31], shape=(7,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21], shape=(8,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51], shape=(9,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23], shape=(10,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2], shape=(11,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2], shape=(12,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56], shape=(13,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56 1], shape=(14,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56 1 55], shape=(15,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56 1 55 36], shape=(16,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56 1 55 36 12], shape=(17,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56 1 55 36 12 27], shape=(18,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56 1 55 36 12 27 31], shape=(19,), dtype=int64)
tf.Tensor([ 0 8 43 67 5 31 31 21 51 23 2 2 56 1 55 36 12 27 31 38], shape=(20,), dtype=int64)
The is the TensorFlow code that should produce the same result as the original PyTorch code. The loop structures are all different though.
class BigramModel(tf.keras.Model):
def __init__(self,vocab_size):
self.token_embedding_table = Embedding(vocab_size,vocab_size)
def call(self,idx,targets=None):
if targets is None:
loss = None
bce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
loss = bce(targets,tf.squeeze(logits)).numpy()
return logits, loss
def generate(self,idx,max_new_tokens):
i = tf.constant(0)
c = lambda i, d: tf.less(i, max_new_tokens)
def b(i, idx):
logits,loss = self(idx)
logits = logits[-1]
# print(f'Shape of logits is {tf.shape(logits)}')
probs = tf.nn.softmax(logits)
# print(f'Shape of probs is {tf.shape(probs)}')
idx_next = tfp.distributions.Multinomial(total_count=1,probs=probs)
# print(f'Shape of sample is {tf.shape(idx_next.sample(1))}')
idx = tf.concat([idx,
return tf.add(i, 1), idx
_, idx1 = tf.while_loop(c, b, loop_vars=[i, idx])
return idx1
Last time step
Only the last dimension in this output is considered for predictng the next one character at this time.
So in this example the last dimension is highlighted.
x = tf.random.uniform((4,4))
[[0.6044643 0.9598156 0.84220576 0.6529906 ]
[0.03485656 0.1756084 0.9860773 0.8582853 ]
[0.45344257 0.6370505 0.9529482 0.4074465 ]
[0.27584124 0.44224763 0.7260096 0.16439259]], shape=(4, 4), dtype=float32)
This generates these 20 characters.
m = BigramModel(len(elem))
out,loss = m(tf.reshape(X,(1,8)),tf.reshape(y,(1,8)))
idx, generation = decode(m.generate(tf.zeros((1,),tf.int64),20))
print(["".join(i) for i in generation.numpy()[:].astype(str)])
[‘1’, ‘T’, ‘0’, ‘V’, ‘S’, ‘.’, “’”, ‘.’, ‘n’, ‘U’, ‘t’, ‘8’, ‘l’, ‘M’, “’”, ‘T’, ‘g’, ‘b’, ‘N’, ‘i’, ‘h’]
Training Stage 1
This is the entire code again as small changes have been made to train.
import tensorflow as tf
import tensorflow_probability as tfp
from keras.layers import Embedding
input ="/Users/anu/PycharmProjects/TensorFlow2/shakespeare.txt")
input = tf.strings.strip(input)
input = tf.strings.regex_replace(input,' +', '')
input = tf.strings.regex_replace(input,'\n', '')
length = int(tf.strings.length(input))
vocab = tf.strings.unicode_split_with_offsets(input, 'UTF-8')
elem,idx = tf.unique(vocab[0])
vocab_size = len(elem)
print(f'Size of vocabulary={vocab_size}')
block_size = 9
table = tf.lookup.StaticHashTable(
values=tf.constant([idx for idx, inp in enumerate(elem)]),
indextoelem = tf.lookup.StaticHashTable(
keys=tf.strings.as_string([idx for idx, inp in enumerate(elem)]),
def random_sample(text):
rand = tf.random.uniform(shape=[], minval=1, maxval=length - 201)
start = int(rand)
# print(f'Start={int(rand)} Length={length} End={start + 200 + 1}')
return tf.strings.substr(text,start, 201, unit='BYTE')
global samplelist,reversesamplelist
samplelist = []
reversesamplelist = []
def reverse_map_fn(bytes):
return bytes
def map_fn(bytes):
return bytes
def random_sample(text,block_size):
rand = tf.random.uniform(shape=[], minval=1, maxval=length - (block_size + 1))
start = int(rand)
# print(f'Start={int(rand)} Length={length} End={start + block_size + 1}')
return tf.strings.substr(text,start, block_size, unit='BYTE')
def draw_random_sample(block_size):
sample = random_sample(input,block_size)
split_sample = tf.strings.bytes_split(sample)
tf.map_fn(map_fn, tf.strings.bytes_split(split_sample))
global samplelist
X,y = (tf.stack(samplelist[:-1]),tf.stack(samplelist[1:]))
samplelist = []
return X,y
def reverse_map(X):
tf.map_fn(reverse_map_fn, X)
X,y = draw_random_sample(block_size)
vocab_size = len(elem)
def decode(idx):
return idx,indextoelem.lookup(
tf.strings.as_string([inp for inp, inp in enumerate(idx)]))
class BigramModel(tf.keras.Model):
def __init__(self,vocab_size):
self.token_embedding_table = Embedding(vocab_size,vocab_size)
def call(self,idx,targets=None):
if targets is None:
loss = None
bce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# loss = bce(targets,tf.squeeze(logits)).numpy()
loss = bce(targets, tf.squeeze(logits))
return logits, loss
def generate(self,idx,max_new_tokens):
i = tf.constant(0)
c = lambda i, d: tf.less(i, max_new_tokens)
def b(i, idx):
logits,loss = self(idx)
logits = logits[-1]
# print(f'Shape of logits is {tf.shape(logits)}')
probs = tf.nn.softmax(logits)
# print(f'Shape of probs is {tf.shape(probs)}')
idx_next = tfp.distributions.Multinomial(total_count=1,probs=probs)
# print(f'Shape of sample is {tf.shape(idx_next.sample(1))}')
idx = tf.concat([idx,
return tf.add(i, 1), idx
_, idx1 = tf.while_loop(c, b, loop_vars=[i, idx])
return idx1
m = BigramModel(len(elem))
out,loss = m(tf.reshape(X,(1,block_size -1)),tf.reshape(y,(1,block_size-1)))
idx, generation = decode(m.generate(tf.zeros((1,),tf.int64),20))
print(["".join(i) for i in generation.numpy()[:].astype(str)])
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
for step in range(20000):
with tf.GradientTape() as tape:
x,y = draw_random_sample(block_size)
logits,loss = m(tf.reshape(x, (1, block_size - 1)), tf.reshape(y, (1, block_size - 1)))
grads = tape.gradient(loss, m.trainable_weights)
# Run one step of gradient descent by updating
# the value of the variables to minimize the loss.
optimizer.apply_gradients(zip(grads, m.trainable_weights))
# Log every 200 batches.
if step % 200 == 0:
"Training loss at step %d: %.4f"
% (step, float(loss))
print("Seen so far: %s samples" % ((step + 1)))
_, generation = decode(m.generate(tf.zeros((1,),tf.int64),20))
print(["".join(i) for i in generation.numpy()[:].astype(str)])
Matrix multiplication trick using triangular matrix
I am reusing this code I contributed to Stackoverflow years back with appropriate changes introduced by TensorFlow 2 API. This code can be refactored and improved but at this time it works.
What does the resulting matrix look like ? The logic if the code now creates a lower triangular matrix( or upper triangular matrix, if we want it) like this.
x = tf.constant(tf.ones(10,))
ones = tf.ones((5,5),dtype=tf.int64) #size of the output matrix
mask_a = tf.linalg.band_part(ones, -1, 0) # Upper triangular matrix of 0s and 1s
mask_b = tf.linalg.band_part(ones, 0, 0) # Diagonal matrix of 0s and 1s
mask = tf.subtract(mask_a, mask_b) # Mask of upper triangle above diagonal
zero = tf.constant(0, dtype=tf.int64)
non_zero = tf.not_equal(mask, zero) #Conversion of mask to Boolean matrix
indices = tf.where(non_zero) # Extracting the indices of upper trainagle elements
out = tf.SparseTensor(indices,x,dense_shape=tf.cast((5,5),dtype=tf.int64))
dense = tf.slice(tf.sparse.to_dense(out), [1, 0], [3, 3])
dense = dense / tf.reduce_sum(dense,1,keepdims=True)
random_ints = tf.random.uniform(shape=(3,2), minval=1., maxval=5.)
The output is this.
[[1 0 0]
[1 1 0]
[1 1 1]]
[[1. 0. 0. ]
[0.5 0.5 0. ]
[0.33333334 0.33333334 0.33333334]], shape=(3, 3), dtype=float32)
[[2.1591725 2.7532902]
[3.748231 4.6269817]
[2.0407896 2.2444978]], shape=(3, 2), dtype=float32)
[[2.1591725 2.7532902]
[2.9537017 3.690136 ]
[2.6493979 3.2082565]], shape=(3, 2), dtype=float32)
The trick give us a matrix that has rows based on weighted averages like this.
Explanation will be added in due course as this code implements the first view of self-attention. This is the test code executed separately.
Please note that the elaborate example shown above can be shortened for our purposes to
wei = tf.linalg.band_part(wei, -1, 0)
Test code is this.
head_size = 16
head_size = 16
B,T,C = 1,8,32 # batch, time, channels
x = tf.random.normal((B,T,C))
key = tf.keras.layers.Dense( head_size, input_shape=(32,), activation=None,use_bias=False)
query = tf.keras.layers.Dense(head_size,input_shape=(32,),activation=None,use_bias=False)
value = tf.keras.layers.Dense(head_size,input_shape=(32,), activation=None,use_bias=False)
k = key(x) # (B, T, 16)
q = query(x) # (B, T, 16)
wei = tf.matmul(q,tf.transpose(k,perm=[0,2,1]))
wei = tf.linalg.band_part(wei, -1, 0)
wei = tf.where(
tf.equal(wei,tf.constant(0, dtype=tf.float32)),
wei = tf.nn.softmax(wei,-1)
v = value(x)
out = tf.matmul(wei, v)
Single-head self-attention
After the code shown in the previous section is understood we can integrate it with the main body like this. But from this point onwards I feel the shapes or matrices could have introduced a bug or two even though the code executed without errors. Further investigation is needed. Primarily the mechanism used to port the original Pytorch to TensorFlow is somewhat arduous.
Here I have shown only the relevant changes. I changed block_size to 32 and I also noticed I could generate only upto 32 characters after training.
head_size = 16
dropout = 0.0
n_embd = 32
block_size = 32
class BigramModel(tf.keras.Model):
def __init__(self,vocab_size):
self.token_embedding_table = Embedding(vocab_size,n_embd)
self.position_embedding_table = Embedding(block_size, n_embd)
self.sa_head = Head(n_embd)
self.lm_head = tf.keras.layers.Dense(vocab_size, input_shape=(n_embd,), activation=None, use_bias=False)
def call(self,idx,targets=None):
# print(f'idx in call is {idx} and shape is {tf.shape(idx)}')
B = 1
if tf.size(tf.shape(idx)) == 1:
T = tf.shape(idx)
T = tf.shape(idx)[1]
tok_emb = self.token_embedding_table(idx)
pos_emb = self.position_embedding_table(tf.range(T))
x = tf.add(tok_emb, tf.expand_dims(pos_emb,axis=0)) # (B,T,C)
# print(f'Shape of tf.add(tok_emb, pos_emb) is {tf.shape(x)}')
x = self.sa_head(x) # (B,T,C)
logits = self.lm_head(x) # (B,T,vocab_size)
if targets is None:
loss = None
bce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# loss = bce(targets,tf.squeeze(logits)).numpy()
loss = bce(targets, tf.squeeze(logits))
return logits, loss
def generate(self,idx,max_new_tokens):
i = tf.constant(0)
c = lambda i, d: tf.less(i, max_new_tokens)
def b(i, idx):
# print(idx)
logits,loss = self(idx)
# print(f'Shape of logits is {tf.shape(logits)}')
logits = logits[:,-1,:]
probs = tf.nn.softmax(logits)
# print(f'Shape of probs is {tf.shape(probs)}')
idx_next = tfp.distributions.Multinomial(total_count=1,probs=probs)
idx = tf.concat([idx,
return tf.add(i, 1), idx
_, idx = tf.while_loop(c, b, loop_vars=[i, idx])
# print(f'idx in generate is {idx}')
return idx
class Head(tf.keras.Model):
def __init__(self, head_size):
self.key = tf.keras.layers.Dense(head_size, input_shape=(n_embd,), activation=None, use_bias=False)
self.query = tf.keras.layers.Dense(head_size, input_shape=(n_embd,), activation=None, use_bias=False)
self.value = tf.keras.layers.Dense(head_size, input_shape=(n_embd,), activation=None, use_bias=False)
self.dropout = tf.keras.layers.Dropout(dropout)
def call(self, x):
B = 1
T = 8
C = 32
k = self.key(x) # (B, T, 16)
q = self.query(x) # (B, T, 16)
transpose = tf.transpose(k,perm=[0,2,1])
matmul = tf.matmul(q,transpose)
wei = tf.divide(matmul, 1/tf.sqrt(tf.cast(C,tf.float32)))
tril = tf.linalg.band_part(wei, -1, 0)
tril = tf.where(
tf.equal(tril,tf.constant(0, dtype=tf.float32)),
wei = tf.nn.softmax(tril,-1)
# print(wei)
v = self.value(x)
out = tf.matmul(wei, v)
# print(f'Shape of wei is {tf.shape(out)}')
return out
Even though there is no error and the loss goes down there is something missing here.
block_size value is the context that we want to keep track of. But if we want to generate more than the block_size,
pos_emb = self.position_embedding_table(tf.range(T))
does not have that in its scope. So I clip the stream of characters like this. This fixes the problem.
idx_cond = idx[-block_size:]
logits,loss = self(idx_cond)
Last 2 can be selected like this example shows.
Multi-head attention
At this stage I find that the TensorFlow code looks almost similar to the original Pytorch code and it is easier to reason about. It also works without posing significant dificulties.
The change to the BigramModel is just one line when we introduce multiple heads of attention. A diagram or two will make this clear.
class BigramModel(tf.keras.Model):
def __init__(self,vocab_size):
self.token_embedding_table = Embedding(vocab_size,n_embd)
self.position_embedding_table = Embedding(block_size, n_embd)
self.sa_head = MultiHeadAttention(n_head, head_size) #Head(n_embd)
self.lm_head = tf.keras.layers.Dense(vocab_size, input_shape=(n_embd,), activation=None, use_bias=False)
The class that implements it is this. n_head is set to 4. I use my laptop’s CPU and till this stage it works. But if I tune these hyperparameters and make the network deeper I will need a GPU.
class MultiHeadAttention(tf.keras.Model):
""" multiple heads of self-attention in parallel """
def __init__(self, num_heads, head_size):
self.heads = [Head(head_size) for _ in range(num_heads)]
self.proj = tf.keras.layers.Dense(n_embd, input_shape=(n_embd,), activation=None, use_bias=False)
self.dropout = tf.keras.layers.Dropout(dropout)
def call(self, x):
out = tf.concat([h(x) for h in self.heads],-1)
out = self.dropout(self.proj(out))
return out
I also changed the dataset( and it seamlessly worked after that.
input ="/Users/anu/PycharmProjects/TensorFlow2/input.txt")
length = int(tf.strings.length(input))
The code prints properly now.
_, generation = decode(m.generate(tf.zeros((1,),tf.int64),300))
array = np.array(["".join(i) for i in generation.numpy()[:].astype(str)])
s = ''.join(array)
The output is this. It is still not trained sufficiently but the loss goes down further than before.
FERENIO: Phath athill bof: aser EA E: Mureadest whhere De Ave Po. YiH,Ud Pre SO S: Dos Ton Kosed I I I Hond I LO EcVAwe RURE: I H Kun Id Rhatlt I. Pal Gasnt I, VU HE VISNE
KRING E ITIRIRCGOBEUUSESIMSIOFURI: Lop E: Arse, LOLOS: Khas Ame I I RI: SO Ar-Bu’teo Ofshup Otm 3 Sunscand Reten, Cutxy Tou fl
Further architectural changes
I followed the original Pytorch code closely and this is the final piece of the puzzle. A few changes are introduced to fine-tune the networks. But we have to remember that I am training this using a GPU. So the loss is lower than before now but the text is still gibberish.
Changes to the BigramModel
class BigramModel(tf.keras.Model):
def __init__(self,vocab_size):
self.token_embedding_table = Embedding(vocab_size,n_embd)
self.position_embedding_table = Embedding(block_size, n_embd)
# self.sa_head = MultiHeadAttention(n_head, head_size) #Head(n_embd)
self.blocks = Block(n_embd, n_head=n_head)
self.ln_f = tf.keras.layers.LayerNormalization() # final layer norm
self.lm_head = tf.keras.layers.Dense(vocab_size, input_shape=(n_embd,), activation=None, use_bias=False)
def call(self,idx,targets=None):
# print(f'idx in call is {idx} and shape is {tf.shape(idx)}')
B = 1
if tf.size(tf.shape(idx)) == 1:
T = tf.shape(idx)
T = tf.shape(idx)[1]
tok_emb = self.token_embedding_table(idx)
pos_emb = self.position_embedding_table(tf.range(T))
x = tf.add(tok_emb, tf.expand_dims(pos_emb,axis=0)) # (B,T,C)
# print(f'Shape of tf.add(tok_emb, pos_emb) is {tf.shape(x)}')
x = self.blocks(x) # (B,T,C)
x = self.ln_f(x) # (B,T,C)
logits = self.lm_head(x) # (B,T,vocab_size)
New Model classes
class FeedFoward(tf.keras.Model):
""" a simple linear layer followed by a non-linearity """
def __init__(self, n_embd):
super().__init__() = tf.keras.Sequential(
tf.keras.layers.Dense(4 * n_embd, input_shape=(None,n_embd), activation=None, use_bias=False),
tf.keras.layers.Dense(n_embd, input_shape=(4 * n_embd,), activation=None, use_bias=False),
def call(self, x):
class Block(tf.keras.Model):
""" Transformer block: communication followed by computation """
def __init__(self, n_embd, n_head):
# n_embd: embedding dimension, n_head: the number of heads we'd like
head_size = n_embd // n_head = MultiHeadAttention(n_head, head_size)
self.ffwd = FeedFoward(n_embd)
self.ln1 = tf.keras.layers.LayerNormalization()
self.ln2 = tf.keras.layers.LayerNormalization()
def call(self, x):
x = tf.add(x ,
x = tf.add(x , self.ffwd(self.ln2(x)))
return x
This exercise helped me understand most of the Transformer architecture. But I coudln’t code this myself after reading research papers directly. I have to watch some videos or port code to TensorFlow. I can only work with an existing knowledge base. I will push the code to Git and look at other architectures as the field is advancing rapidly.