CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
dlsyscourse

CoCalc provides the best real-time collaborative environment for Jupyter Notebooks, LaTeX documents, and SageMath, scalable from individual users to large groups and classes!

GitHub Repository: dlsyscourse/public_notebooks
Path: blob/main/17_generative_adversarial_networks_implementation.ipynb
Views: 35
Kernel: Python 3

Open In Colab

Lecture 17: Generative adversarial networks implementation

In this lecture, we are going to implement a version of generative adversarial training.

Prepare the codebase

To get started, please clone a version of needle repo from the github. You should be able to use the need repo after finishng HW2

# Code to set up the assignment from google.colab import drive drive.mount('/content/drive') %cd /content/drive/MyDrive/ !mkdir -p 10714f22 %cd /content/drive/MyDrive/10714f22 # comment out the following line if you run it for the second time # as you already have a local copy of code # !git clone https://github.com/myrepo/needle lecture17 !ln -s /content/drive/MyDrive/10714f22/lecture17 /content/needle
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True). /content/drive/MyDrive /content/drive/MyDrive/10714f22 Cloning into 'lecture17'... remote: Enumerating objects: 917, done. remote: Counting objects: 100% (184/184), done. remote: Compressing objects: 100% (115/115), done. remote: Total 917 (delta 104), reused 122 (delta 68), pack-reused 733 Receiving objects: 100% (917/917), 265.21 KiB | 1.99 MiB/s, done. Resolving deltas: 100% (531/531), done.
!python3 -m pip install pybind11
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/ Collecting pybind11 Downloading pybind11-2.10.1-py3-none-any.whl (216 kB) |████████████████████████████████| 216 kB 5.3 MB/s Installing collected packages: pybind11 Successfully installed pybind11-2.10.1

We can then run the following command to make the path to the package available in colab's environment as well as the PYTHONPATH.

%set_env PYTHONPATH /content/needle/python:/env/python import sys sys.path.append("/content/needle/python")
env: PYTHONPATH=/content/needle/python:/env/python

Components of a generative advesarial network

There are two main components in a generative adversarial network

  • A generator GG that takes a random vector zz and maps it to a generated(fake) data G(z)G(z).

  • A discriminator that attempts to tell the difference between the real dataset and the fake one.

image.png

import needle as ndl import numpy as np from needle import nn from matplotlib import pyplot as plt

Parpare the training dataset

For demonstration purpose, we create our "real" dataset as a two dimensional gaussian distribution.

XN(u,Σ),Σ=ATA\begin{equation} X \sim \mathcal{N}(u, \Sigma), \Sigma = A^T A \end{equation}
A = np.array([[1, 2], [-0.2, 0.5]]) mu = np.array([2, 1]) # total number of sample data to generated num_sample = 3200 data = np.random.normal(0, 1, (num_sample, 2)) @ A + mu
plt.scatter(data[:,0], data[:,1], color="blue", label="real data") plt.legend()
<matplotlib.legend.Legend at 0x7fc63ddf0990>
Image in a Jupyter notebook

Our goal is to create a generator that can generate a distribution that matches this distribution.

Generator network GG

Now we are ready to build our generator network G, to keep things simple, we make generator an one layer linear neural network.

model_G = nn.Sequential(nn.Linear(2, 2))
def sample_G(model_G, num_samples): Z = ndl.Tensor(np.random.normal(0, 1, (num_samples, 2))) fake_X = model_G(Z) return fake_X.numpy()
fake_data_init = sample_G(model_G, 3200)
plt.scatter(data[:,0], data[:,1], color="blue", label="real data") plt.scatter(fake_data_init[:,0], fake_data_init[:,1], color="red", label="G(z) at init") plt.legend()
<matplotlib.legend.Legend at 0x7fb60d4aed50>
Image in a Jupyter notebook

At the initialization phase, we just randomly initialized the weight of GG, as a result, it certainly does not match the training data. Our goal is to setup a generative adveserial training to get it to close to the training data.

Discriminator DD

Now let us build a discriminator network DD that classifies the real data from the fake one. Here we use a three layer neural network. Additionally, we make use of the Softmax loss to measure the classification likelihood. Because we are only classifying two classes. Softmax function becomes the sigmoid function for prediction.

exp(x)exp(x)+exp(y)=11+exp(yx)\begin{equation} \frac{\exp(x)} {\exp(x) +\exp(y)} =\frac{1}{1 + exp(y-x)} \end{equation}

We simply reuse SoftmaxLoss here since this is readily available in our current set of homework iterations. Most implementation will use a binary classification closs instead (BCELoss).

model_D = nn.Sequential( nn.Linear(2, 20), nn.ReLU(), nn.Linear(20, 10), nn.ReLU(), nn.Linear(10, 2) ) loss_D = nn.SoftmaxLoss()

Generative advesarial training

A Generative adversarial training process iteratively update the generator GG and discriminator DD to play a "minimax" game.

minDmaxG{ExDatalogD(x)EzNoiselog(1D(G(z))}\begin{equation} \min_D\max_G\{-E_{x\sim Data} \log D(x) - E_{z\sim Noise} \log(1- D(G(z))\} \end{equation}

Note that however, in practice, the GG update step usually use an alternative objective function.

minG{EzNoiselog(D(G(z))}\begin{equation} \min_G \{-E_{z\sim{Noise}} \log(D(G(z))\} \end{equation}

Generator update

Now we are ready to setup the generator update. In the generator update step, we need to optimize the following goal:

minG{EzNoiselog(D(G(z))}\begin{equation} \min_G \{-E_{z\sim{Noise}} \log(D(G(z))\} \end{equation}

Let us first setup an optimizer for G's parameters.

opt_G = ndl.optim.Adam(model_G.parameters(), lr=0.01)

image.png

To optimize the above loss function, we just need to generate a fake data G(z)G(z), send it through the discriminator DD and compute the negative log-likelihood that the fake dataset is categorized as real. In another word, we will feed in y=1y= 1 as label here.

def update_G(Z, model_G, model_D, loss_D, opt_G): fake_X = model_G(Z) fake_Y = model_D(fake_X) batch_size = Z.shape[0] ones = ndl.ones(batch_size, dtype="int32") loss = loss_D(fake_Y, ones) loss.backward() opt_G.step()

Discriminator update

Now, let us also setup the discriminator update step. The discriminator step optimizes the following objective:

minD{ExDatalogD(x)EzNoiselog(1D(G(z))}\begin{equation} \min_D\{-E_{x\sim Data} \log D(x) - E_{z\sim Noise} \log(1- D(G(z))\} \end{equation}

Let us first setup an optimizer to learn DD's parameters.

opt_D = ndl.optim.Adam(model_D.parameters(), lr=0.01)

image.png

The discriminator loss is also a normal classification loss, by labeling the generated data as y=0y=0(fake) and real data as y=1y=1(real). Importantly, we also do not need to propagate gradient back to the generator in discriminator update, so we will use the detach function to stop the gradient propagation.

def update_D(X, Z, model_G, model_D, loss_D, opt_D): fake_X = model_G(Z).detach() fake_Y = model_D(fake_X) real_Y = model_D(X) assert X.shape[0] == Z.shape[0] batch_size = X.shape[0] ones = ndl.ones(batch_size, dtype="int32") zeros = ndl.zeros(batch_size, dtype="int32") loss = loss_D(real_Y, ones) + loss_D(fake_Y, zeros) loss.backward() opt_D.step()

Putting it together

Now we can put it together, to summarize, the generative adverserial training cycles through the following steps:

  • The discriminator update step

  • Generator update step

def train_gan(data, batch_size, num_epochs): assert data.shape[0] % batch_size == 0 for epoch in range(num_epochs): begin = (batch_size * epoch) % data.shape[0] X = data[begin: begin+batch_size, :] Z = np.random.normal(0, 1, (batch_size, 2)) X = ndl.Tensor(X) Z = ndl.Tensor(Z) update_D(X, Z, model_G, model_D, loss_D, opt_D) update_G(Z, model_G, model_D, loss_D, opt_G) train_gan(data, 32, 2000)

We can plot the generated data of the trained generator after a number of iterations. As we can see, the generated dataset G(z)G(z) after get closer to the real data after training.

fake_data_trained = sample_G(model_G, 3200) plt.scatter(data[:,0], data[:,1], color="blue", label="real data") plt.scatter(fake_data_init[:,0], fake_data_init[:,1], color="red", label="G(z) at init") plt.scatter(fake_data_trained[:,0], fake_data_trained[:,1], color="pink", label="G(z) trained") plt.legend()
<matplotlib.legend.Legend at 0x7fc6354e5b90>
Image in a Jupyter notebook

Inspect the trained generator

We can compare the weight/bias of trained generator GG to the parameters we use to genrate the dataset. Importantly, we need to compare the covariance Σ=ATA\Sigma= A^T A here instead of the transformation matrix.

gA, gmu = model_G.parameters()
A.T @A
array([[1.04, 1.9 ], [1.9 , 4.25]])
gA = gA.numpy() gA.T @ gA
array([[0.80765074, 1.8812442 ], [1.8812442 , 5.0232944 ]], dtype=float32)
A
array([[ 1. , 2. ], [-0.2, 0.5]])
gA
array([[ 0.23406495, -0.22800203], [-0.86767757, -2.2296433 ]], dtype=float32)

We can also compare the mean

gmu, mu
(needle.Tensor([[2.00028 1.2296444]]), array([2, 1]))

Modularizing GAN "Loss"

We can modularize GAN step as in a similar way as loss function. The following codeblock shows one way to do so.

class GANLoss: def __init__(self, model_D, opt_D): self.model_D = model_D self.opt_D = opt_D self.loss_D = nn.SoftmaxLoss() def _update_D(self, real_X, fake_X): real_Y = self.model_D(real_X) fake_Y = self.model_D(fake_X.detach()) batch_size = real_X.shape[0] ones = ndl.ones(batch_size, dtype="int32") zeros = ndl.zeros(batch_size, dtype="int32") loss = self.loss_D(real_Y, ones) + self.loss_D(fake_Y, zeros) loss.backward() self.opt_D.step() def forward(self, fake_X, real_X): self._update_D(real_X, fake_X) fake_Y = self.model_D(fake_X) batch_size = real_X.shape[0] ones = ndl.ones(batch_size, dtype="int32") loss = self.loss_D(fake_Y, ones) return loss
model_G = nn.Sequential(nn.Linear(2, 2)) opt_G = ndl.optim.Adam(model_G.parameters(), lr = 0.01) model_D = nn.Sequential( nn.Linear(2, 20), nn.ReLU(), nn.Linear(20, 10), nn.ReLU(), nn.Linear(10, 2) ) opt_D = ndl.optim.Adam(model_D.parameters(), lr=0.01) gan_loss = GANLoss(model_D, opt_D) def train_gan(data, batch_size, num_epochs): assert data.shape[0] % batch_size == 0 for epoch in range(num_epochs): begin = (batch_size * epoch) % data.shape[0] X = data[begin: begin+batch_size, :] Z = np.random.normal(0, 1, (batch_size, 2)) X = ndl.Tensor(X) Z = ndl.Tensor(Z) fake_X = model_G(Z) loss = gan_loss.forward(fake_X, X) loss.backward() opt_G.step() train_gan(data, 32, 2000)
fake_data_trained = sample_G(model_G, 3200) plt.scatter(data[:,0], data[:,1], color="blue", label="real data") plt.scatter(fake_data_init[:,0], fake_data_init[:,1], color="red", label="G(z) at init") plt.scatter(fake_data_trained[:,0], fake_data_trained[:,1], color="pink", label="G(z) trained") plt.legend()
<matplotlib.legend.Legend at 0x7fc6354da210>
Image in a Jupyter notebook