CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ai-forever

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: ai-forever/sber-swap
Path: blob/main/apex/tests/L0/run_mlp/test_mlp.py
Views: 794
1
"""Tests for c++ MLP"""
2
import unittest
3
from time import time
4
import numpy as np
5
6
import torch
7
from torch import nn
8
9
from apex.mlp import MLP
10
11
batch_size = 1024
12
mlp_sizes = [480, 1024, 1024, 512, 256, 1]
13
num_iters = 10
14
15
class TestMLP(unittest.TestCase):
16
17
def test_creation(self):
18
MLP(mlp_sizes)
19
20
def test_numeric(self):
21
mlp = MLP(mlp_sizes).cuda()
22
23
mlp_layers = []
24
for i in range(mlp.num_layers):
25
linear = nn.Linear(mlp_sizes[i], mlp_sizes[i + 1])
26
mlp.weights[i].data.copy_(linear.weight)
27
mlp.biases[i].data.copy_(linear.bias)
28
mlp_layers.append(linear)
29
mlp_layers.append(nn.ReLU(inplace=True))
30
31
ref_mlp = nn.Sequential(*mlp_layers).cuda()
32
33
test_input = torch.empty(batch_size, mlp_sizes[0], device="cuda").uniform_(-1., 1.).requires_grad_()
34
ref_input = test_input.clone().detach().requires_grad_()
35
mlp_out = mlp(test_input)
36
ref_out = ref_mlp(ref_input)
37
np.testing.assert_allclose(
38
mlp_out.detach().cpu().numpy(),
39
ref_out.detach().cpu().numpy(),
40
atol=1e-7, rtol=1e-5)
41
42
# Use mean value as scalar loss. Multiply 10 to make it big enough not zero out
43
mlp_out.mean().mul(10.).backward()
44
ref_out.mean().mul(10.).backward()
45
np.testing.assert_allclose(
46
test_input.grad.detach().cpu().numpy(),
47
ref_input.grad.detach().cpu().numpy(),
48
atol=0, rtol=1e-5)
49
np.testing.assert_allclose(
50
mlp.biases[0].grad.detach().cpu().numpy(),
51
ref_mlp[0].bias.grad.detach().cpu().numpy(),
52
atol=1e-7, rtol=1e-5)
53
54
def test_no_bias(self):
55
for use_activation in ['none', 'relu', 'sigmoid']:
56
mlp = MLP(mlp_sizes, bias=False, activation=use_activation).cuda()
57
58
mlp_layers = []
59
for i in range(mlp.num_layers):
60
linear = nn.Linear(mlp_sizes[i], mlp_sizes[i + 1], bias=False)
61
mlp.weights[i].data.copy_(linear.weight)
62
mlp_layers.append(linear)
63
if use_activation == 'relu':
64
mlp_layers.append(nn.ReLU(inplace=True))
65
if use_activation == 'sigmoid':
66
mlp_layers.append(nn.Sigmoid())
67
68
ref_mlp = nn.Sequential(*mlp_layers).cuda()
69
70
test_input = torch.empty(batch_size, mlp_sizes[0], device="cuda").uniform_(-1., 1.).requires_grad_()
71
ref_input = test_input.clone().detach().requires_grad_()
72
mlp_out = mlp(test_input)
73
ref_out = ref_mlp(ref_input)
74
np.testing.assert_allclose(
75
mlp_out.detach().cpu().numpy(),
76
ref_out.detach().cpu().numpy(),
77
atol=1e-7, rtol=1e-5)
78
79
# Use mean value as scalar loss. Multiply 10 to make it big enough not zero out
80
mlp_out.mean().mul(10.).backward()
81
ref_out.mean().mul(10.).backward()
82
np.testing.assert_allclose(
83
test_input.grad.detach().cpu().numpy(),
84
ref_input.grad.detach().cpu().numpy(),
85
atol=0, rtol=100)
86
np.testing.assert_allclose(
87
mlp.weights[0].grad.detach().cpu().numpy(),
88
ref_mlp[0].weight.grad.detach().cpu().numpy(),
89
atol=1e-7, rtol=100)
90
91
def test_with_bias(self):
92
for use_activation in ['none', 'relu', 'sigmoid']:
93
mlp = MLP(mlp_sizes, bias=True, activation=use_activation).cuda()
94
95
mlp_layers = []
96
for i in range(mlp.num_layers):
97
linear = nn.Linear(mlp_sizes[i], mlp_sizes[i + 1], bias=True)
98
mlp.weights[i].data.copy_(linear.weight)
99
mlp.biases[i].data.copy_(linear.bias)
100
mlp_layers.append(linear)
101
if use_activation == 'relu':
102
mlp_layers.append(nn.ReLU(inplace=True))
103
if use_activation == 'sigmoid':
104
mlp_layers.append(nn.Sigmoid())
105
106
ref_mlp = nn.Sequential(*mlp_layers).cuda()
107
108
test_input = torch.empty(batch_size, mlp_sizes[0], device="cuda").uniform_(-1., 1.).requires_grad_()
109
ref_input = test_input.clone().detach().requires_grad_()
110
mlp_out = mlp(test_input)
111
ref_out = ref_mlp(ref_input)
112
np.testing.assert_allclose(
113
mlp_out.detach().cpu().numpy(),
114
ref_out.detach().cpu().numpy(),
115
atol=1e-7, rtol=1e-5)
116
117
# Use mean value as scalar loss. Multiply 10 to make it big enough not zero out
118
mlp_out.mean().mul(10.).backward()
119
ref_out.mean().mul(10.).backward()
120
np.testing.assert_allclose(
121
test_input.grad.detach().cpu().numpy(),
122
ref_input.grad.detach().cpu().numpy(),
123
atol=0, rtol=1)
124
np.testing.assert_allclose(
125
mlp.weights[0].grad.detach().cpu().numpy(),
126
ref_mlp[0].weight.grad.detach().cpu().numpy(),
127
atol=1e-7, rtol=1)
128
np.testing.assert_allclose(
129
mlp.biases[0].grad.detach().cpu().numpy(),
130
ref_mlp[0].bias.grad.detach().cpu().numpy(),
131
atol=1e-7, rtol=1e-5)
132
133
def test_no_grad(self):
134
mlp = MLP(mlp_sizes).cuda()
135
136
mlp_layers = []
137
for i in range(mlp.num_layers):
138
linear = nn.Linear(mlp_sizes[i], mlp_sizes[i + 1])
139
mlp.weights[i].data.copy_(linear.weight)
140
mlp.biases[i].data.copy_(linear.bias)
141
mlp_layers.append(linear)
142
mlp_layers.append(nn.ReLU(inplace=True))
143
144
ref_mlp = nn.Sequential(*mlp_layers).cuda()
145
146
test_input = torch.empty(batch_size, mlp_sizes[0], device="cuda").uniform_(-1., 1.)
147
ref_input = test_input.clone().detach()
148
mlp_out = mlp(test_input)
149
ref_out = ref_mlp(ref_input)
150
np.testing.assert_allclose(
151
mlp_out.detach().cpu().numpy(),
152
ref_out.detach().cpu().numpy(),
153
atol=1e-7, rtol=1e-5)
154
155
# Use mean value as scalar loss. Multiply 10 to make it big enough not zero out
156
mlp_out.mean().mul(10.).backward()
157
ref_out.mean().mul(10.).backward()
158
np.testing.assert_allclose(
159
mlp.weights[0].grad.detach().cpu().numpy(),
160
ref_mlp[0].weight.grad.detach().cpu().numpy(),
161
atol=1e-7, rtol=1e-5)
162
163
164
def test_performance_half(self):
165
mlp = MLP(mlp_sizes).cuda().half()
166
167
mlp_layers = []
168
for i in range(mlp.num_layers):
169
linear = nn.Linear(mlp_sizes[i], mlp_sizes[i + 1])
170
mlp.weights[i].data.copy_(linear.weight)
171
mlp.biases[i].data.copy_(linear.bias)
172
mlp_layers.append(linear)
173
mlp_layers.append(nn.ReLU(inplace=True))
174
175
ref_mlp = nn.Sequential(*mlp_layers).cuda().half()
176
177
test_input = torch.empty(
178
batch_size, mlp_sizes[0], device="cuda", dtype=torch.half).fill_(10.).requires_grad_()
179
ref_input = torch.empty(
180
batch_size, mlp_sizes[0], device="cuda", dtype=torch.half).fill_(10.).requires_grad_()
181
182
# Warm up GPU
183
for _ in range(100):
184
ref_out = ref_mlp(ref_input)
185
ref_loss = ref_out.mean()
186
ref_mlp.zero_grad()
187
ref_loss.backward()
188
mlp_out = mlp(test_input)
189
test_loss = mlp_out.mean()
190
mlp.zero_grad()
191
test_loss.backward()
192
193
torch.cuda.profiler.start()
194
torch.cuda.synchronize()
195
start_time = time()
196
for _ in range(num_iters):
197
ref_out = ref_mlp(ref_input)
198
ref_loss = ref_out.mean()
199
ref_mlp.zero_grad()
200
ref_loss.backward()
201
torch.cuda.synchronize()
202
stop_time = time()
203
print(F"\nPytorch MLP time {(stop_time - start_time) * 1000. / num_iters:.4f} ms")
204
205
torch.cuda.synchronize()
206
start_time = time()
207
for _ in range(num_iters):
208
mlp_out = mlp(test_input)
209
test_loss = mlp_out.mean()
210
mlp.zero_grad()
211
test_loss.backward()
212
torch.cuda.synchronize()
213
stop_time = time()
214
print(F"C++ MLP time {(stop_time - start_time) * 1000. / num_iters:.4f} ms")
215
torch.cuda.profiler.stop()
216
217
if __name__ == '__main__':
218
unittest.main()
219
220