# Convolutional Neural Networks

In [None]:
# standard imports
import matplotlib.pyplot as plt
import numpy as np
import random

# sklearn data
from sklearn.datasets import make_blobs
from sklearn.datasets import make_circles
from sklearn.datasets import make_classification
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split

# sklearn models
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

# sklearn metrics
from sklearn.metrics import accuracy_score
from sklearn.metrics import ConfusionMatrixDisplay

# pytorch imports
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

## MNIST with Random Forest

- [**Wikipedia**: MNIST](https://en.m.wikipedia.org/wiki/MNIST_database)
- [**Chris Albon**: MNIST Dataset](https://chrisalbon.com/Data/MNIST+Dataset)
- [**Yann Lecun**: MNIST](http://yann.lecun.com/exdb/mnist/index.html)
- [`sklearn.datasets.load_digits`](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_digits.html#sklearn.datasets.load_digits)

In [None]:
# load 8x8 digits data from sklearn, not MNIST, but similar
# smaller images will allow for faster training
digits = load_digits()
dir(digits)

In [None]:
# check an example digit
digits.images[0]

In [None]:
# check target data
digits.target

In [None]:
# plot an example digit
fig, ax = plt.subplots(
    nrows=1,
    ncols=1,
    figsize=(5, 5),
)
ax.set_axis_off()
ax.imshow(
    digits.images[0],
    cmap=plt.cm.gray_r,
)
plt.show()

In [None]:
# plot 20 examples of each possible digit
fig, axes = plt.subplots(
    nrows=10,
    ncols=20,
    figsize=(10, 5),
)
for i in range(10):
    digit_examples = digits.images[digits.target == i]
    for j in range(20):
        axes[i][j].set_axis_off()
        axes[i][j].imshow(
            digit_examples[j],
            cmap=plt.cm.gray_r,
        )
plt.show()

In [None]:
# flatten the images
n_samples = len(digits.images)
data = digits.images.reshape((n_samples, -1))
data

In [None]:
# create a classifier: a random forest classifier
clf = RandomForestClassifier()

In [None]:
# split data into 50% train and 50% test subsets
X_train, X_test, y_train, y_test = train_test_split(
    data,
    digits.target,
    test_size=0.5,
    random_state=1,
)

# learn the digits on the train subset
clf.fit(X_train, y_train)

# predict the value of the digit on the test subset
predicted = clf.predict(X_test)

In [None]:
# get indexes for subset of training data
n_test = len(predicted)
idx = random.sample(range(0, n_test), 4)

# plot digits for test data sample with predicted and actual labels
_, axes = plt.subplots(
    nrows=1,
    ncols=4,
    figsize=(10, 3),
)
for ax, image, true_label, predicted_label in zip(
    axes,
    X_test[idx],
    y_test[idx],
    predicted[idx],
):
    ax.set_axis_off()
    image = image.reshape(8, 8)
    ax.imshow(image, cmap=plt.cm.gray_r)
    ax.set_title(f"True: {true_label}, Predicted: {predicted_label}")

In [None]:
# calculate and display (as array and plot) a test confusion matrix
disp = ConfusionMatrixDisplay.from_predictions(y_test, predicted)
disp.figure_.suptitle("Confusion Matrix")
print(f"Confusion matrix:\n{disp.confusion_matrix}")
plt.show()

In [None]:
# calculate test accuracy
accuracy = accuracy_score(y_test, predicted)
print(f"Test Accuracy: {accuracy}")

## MNIST with a Neural Network

In [None]:
# download training data
training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# download test data
test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [None]:
# check train data
training_data

In [None]:
# define batch size
batch_size = 64

# create train data loader
train_dataloader = DataLoader(
    training_data,
    batch_size=batch_size,
)

# create test data loader
test_dataloader = DataLoader(
    test_data,
    batch_size=batch_size,
)

# check data shapes
for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
# get cpu, gpu or mps device for training
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

print(f"Using {device} device for training!")

In [None]:
# define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


# send model to compute device
model = NeuralNetwork().to(device)

# check model structure
print(model)

In [None]:
# define loss function
loss_fn = nn.CrossEntropyLoss()

# define optimizer
optimizer = torch.optim.SGD(
    model.parameters(),
    lr=0.1,
)

In [None]:
# define train loop
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
# define test loop
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
# train model
epochs = 15
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

In [None]:
# convert to TorchScript
model_scripted = torch.jit.script(model)
# write to disk
model_scripted.save("model.pt")

In [None]:
# get a batch of training data
batch = next(iter(train_dataloader))
images, labels = batch

# plot the first three images in the batch
fig, axs = plt.subplots(1, 3, figsize=(10, 5))
for i in range(3):
    axs[i].set_axis_off()
    axs[i].imshow(images[i].squeeze(), cmap=plt.cm.gray_r)
    axs[i].set_title(f"Label: {labels[i]}")
plt.show()

In [None]:
# get indexes for subset of training data
n_test = len(test_data)
idx = random.sample(range(0, n_test), 10)

# plot digits for test data sample with predicted and actual labels
_, axes = plt.subplots(
    nrows=1,
    ncols=3,
    figsize=(10, 3),
)
for ax, idx in zip(axes, idx):
    image, label = test_data[idx]
    image = image.unsqueeze(0)
    with torch.no_grad():
        model.eval()
        output = model(image.to(device))
        predicted = output.argmax(1).item()
    ax.set_axis_off()
    image = image.squeeze()
    ax.imshow(image, cmap=plt.cm.gray_r)
    ax.set_title(f"True: {label}, Predicted: {predicted}")
plt.show()

## KMNIST

- [`rois-codh/kmnist`](https://github.com/rois-codh/kmnist)
- [`torchvision.datasets.KMNIST`](https://pytorch.org/vision/main/generated/torchvision.datasets.KMNIST.html)


In [None]:
# download training data
training_data = datasets.KMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# download test data
test_data = datasets.KMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [None]:
# define batch size
batch_size = 64

# create train data loader
train_dataloader = DataLoader(
    training_data,
    batch_size=batch_size,
)

# create test data loader
test_dataloader = DataLoader(
    test_data,
    batch_size=batch_size,
)

In [None]:
# check data shapes
for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
# get a batch of training data
batch = next(iter(train_dataloader))
images, labels = batch

# Plot the first batch of images
fig, axs = plt.subplots(8, 8, figsize=(10, 10))
for i in range(8):
    for j in range(8):
        axs[i, j].set_axis_off()
        axs[i, j].imshow(images[i * 8 + j].squeeze(), cmap=plt.cm.gray_r)
plt.show()

In [None]:
# get cpu, gpu or mps device for training
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

print(f"Using {device} device for training!")

In [None]:
# define nn model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model_nn = NeuralNetwork().to(device)
print(model_nn)

In [None]:
# define cnn model
class ConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_stack = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, 10),
        )

    def forward(self, x):
        logits = self.conv_stack(x)
        return logits


model_cnn = ConvNet().to(device)
print(model_cnn)

In [None]:
# define loss function
loss_fn = nn.CrossEntropyLoss()

# define nn optimizer
optimizer_nn = torch.optim.SGD(
    model_nn.parameters(),
    lr=1e-3,
)

# define cnn optimizer
optimizer_cnn = torch.optim.Adam(
    model_cnn.parameters(),
    lr=0.001,
)

In [None]:
# define train loop
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
# define test loop
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
# train model
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model_nn, loss_fn, optimizer_nn)
    test(test_dataloader, model_nn, loss_fn)
print("Done!")

In [None]:
# train model
epochs = 15
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model_cnn, loss_fn, optimizer_cnn)
    test(test_dataloader, model_cnn, loss_fn)
print("Done!")

In [None]:
# convert the neural network to TorchScript
model_scripted = torch.jit.script(model_nn)
# write to disk
model_scripted.save("model_nn.pt")

# convert the convolutional neural network to TorchScript
model_scripted = torch.jit.script(model_cnn)
# write to disk
model_scripted.save("model_cnn.pt")