-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathEncoderDecoder.py
More file actions
211 lines (179 loc) · 9.23 KB
/
EncoderDecoder.py
File metadata and controls
211 lines (179 loc) · 9.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import numpy as np
import h5py
import matplotlib.pyplot as plt
import tensorflow as tf
import os
import random
def encoder_layer(num_filters, apply_batchnorm=True,apply_dropout=False, dropout_prob=0.5):
initializer = tf.random_normal_initializer(0., 0.02)
model = tf.keras.Sequential()
model.add(tf.keras.layers.Conv2D(num_filters,3,strides=1,padding='same',kernel_initializer=initializer,use_bias=False))
model.add(tf.keras.layers
.MaxPooling2D(pool_size=(2, 2), strides=None, padding='valid'))
if apply_batchnorm:
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.LeakyReLU())
if apply_dropout:
model.add(tf.keras.layers.Dropout(dropout_prob))
return model
def decoder_layer(num_filters, apply_batchnorm=True,apply_dropout=False, dropout_prob=0.5):
initializer = tf.random_normal_initializer(0., 0.02)
model = tf.keras.Sequential()
model.add(tf.keras.layers.Conv2DTranspose(num_filters,3,strides=2,padding='same',kernel_initializer=initializer,use_bias=False))
if apply_batchnorm:
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.LeakyReLU())
if apply_dropout:
model.add(tf.keras.layers.Dropout(dropout_prob))
return model
def EncoderP2E():
inputs = tf.keras.layers.Input(shape=[256,256,3])
outputs = inputs
for i in range(2):
layer = encoder_layer(256, apply_batchnorm=False, apply_dropout=True, dropout_prob=0.2)
outputs = layer(outputs)
for i in range(3):
layer = encoder_layer(128, apply_dropout=True, dropout_prob=0.2)
outputs = layer(outputs)
for i in range(2):
layer = encoder_layer(64, apply_dropout=True, dropout_prob=0.2)
outputs = layer(outputs)
layer = tf.keras.layers.Conv2D(1,3,strides=2,padding='same',kernel_initializer=tf.random_normal_initializer(0., 0.02),
activation='sigmoid') #Make fully connected
outputs = layer(outputs)
return tf.keras.Model(inputs=inputs, outputs=outputs, name = "EncoderP2E")
def EncoderD2E():
inputs = tf.keras.layers.Input(shape=[256,256,3])
outputs = inputs
for i in range(2):
layer = encoder_layer(256, apply_batchnorm=False, apply_dropout=True, dropout_prob=0.2)
outputs = layer(outputs)
for i in range(3):
layer = encoder_layer(128, apply_dropout=True, dropout_prob=0.2)
outputs = layer(outputs)
for i in range(2):
layer = encoder_layer(64, apply_dropout=True, dropout_prob=0.2)
outputs = layer(outputs)
layer = tf.keras.layers.Conv2D(1,3,strides=2,padding='same',kernel_initializer=tf.random_normal_initializer(0., 0.02),
activation='sigmoid') #Make fully connected
outputs = layer(outputs)
return tf.keras.Model(inputs=inputs, outputs=outputs, name = "EncoderD2E")
def load_minibatch(num=1000, start=-1, n=5):
#Assuming this loads a minibatch of 1000 (picture + doodles + augment pairs) * 3 (examples positive and negative, indicated by y_train)
# Assumes the in output.hdf5 pairs are already shuffled and randomized. negative examples are created by adding a
train_dataset = h5py.File('output.hdf5', "r")
if start < 0:
start = random.randint(0,2000-n)
train_set_P_orig = np.array(train_dataset["image_dataset"][start:start+num],dtype='float32')
train_set_D_orig = np.array(train_dataset["sketch_dataset"][start:start+num],dtype='float32')
train_set_P = train_set_P_orig
train_set_D = train_set_D_orig
y_train = np.ones([num,1],dtype='float32')
#for i in range(1, n):
train_set_D_orig_false = np.array(train_dataset["bad_sketch_dataset"][start:start+num],dtype='float32') # your train set labels
train_set_P = tf.concat([train_set_P, train_set_P_orig], 0)
train_set_D = tf.concat([train_set_D, train_set_D_orig_false], 0)
y_train = tf.concat([y_train, np.zeros([num,1],dtype='float32')],axis=0)
print("y_train:",y_train.shape)
return train_set_P/255, train_set_D/255, y_train
def EmbeddingCost(inputs):
# E_P is embedding output of picture, E_D is embedding output of Doodle, y_train indiciates whether it is positive or negative example, margin is triplet margin, n is number fo negaitve examples per positive example
# Theres a problem with this. I think we need to use a keras layer to compute the cost
E_P, E_D, y_train, margin, n = inputs
m = np.squeeze(tf.size(y_train))
print(m)
distance_norm = tf.keras.backend.sqrt(tf.keras.backend.sum(tf.keras.backend.square(E_D - E_P), axis = 2))
labels = (1+1/n)*y_train - 1/n
loss_func = tf.keras.backend.sum(tf.keras.backend.relu(tf.multiply(distance_norm,labels)+margin))
# This should give norm(ED-EP) when y = 1, and 1/n*norm(ED-EP) when y is 0.
return loss_func
def main():
#noise = tf.random.normal([1,256,256,3])
batch_size = 2 #Normally 1000
batch_num = 1 #Normally 100
n = tf.constant(1.0) #Number of negative examples per positive example
margin = tf.constant(20.0) #triplet margin
#test_x, test_y = load_minibatch(num=5, start=0)
img_rows, img_cols, nc = 256, 256, 3
input_shape = (img_rows, img_cols, nc)
P = tf.keras.layers.Input(input_shape, name="P")
D = tf.keras.layers.Input(input_shape, name="D")
y_true = tf.keras.layers.Input([None,1], name="y_true")
Model_EncD2E = EncoderD2E()
Model_EncP2E = EncoderP2E()
D2E_optimizer = tf.keras.optimizers.Adam(0.0001, beta_1=0.9)
P2E_optimizer = tf.keras.optimizers.Adam(0.0001, beta_1=0.9)
P_codes = Model_EncP2E(P)
D_codes = Model_EncD2E(D)
Loss_func = tf.keras.layers.Lambda(EmbeddingCost)([P_codes, D_codes, y_true, margin, n])
network_train = tf.keras.Model(inputs=[P,D, y_true],outputs=Loss_func)
#Keep track of Losses
Encoding_losses = []
for epoch in range(3000):
train_P, train_D, y_train = load_minibatch(batch_size, start=-1, n=5)
print("epoch: ", epoch)
average_encoding_cost = 0
average_D2E_cost = 0
counter = 0
for image in range(0,train_P.shape[0]-1,batch_size):
print("image: ", image)
with tf.GradientTape() as P2E_tape, tf.GradientTape() as D2E_tape:
# Find codes for Photos
#P_codes = Model_EncP2E(train_P[image:min(image+batch_size,train_P.shape[0]-1)],training=True)
P_inp = train_P[image:min(image+batch_size,train_P.shape[0]-1)]
D_inp = train_D[image:min(image+batch_size,train_D.shape[0]-1)]
# Find codes for Doodles
#D_codes = Model_EncP2E(train_D[image:min(image+batch_size,train_D.shape[0]-1)],training=True)
#Loss
#encodingLoss = EmbeddingCost(P_codes, D_codes, y_train, 1, n)
encodingLoss = network_train([P_inp, D_inp, y_train])
#Tracking loss
average_encoding_cost += encodingLoss
counter += 1
### Gradient Decent ###
#P2E Encoder
gradients_P2E = P2E_tape.gradient(encodingLoss, Model_EncP2E.trainable_variables)
P2E_optimizer.apply_gradients(zip(gradients_P2E, Model_EncP2E.trainable_variables))
#D2E Encoder
gradients_D2E = D2E_tape.gradient(encodingLoss, Model_EncD2E.trainable_variables)
D2E_optimizer.apply_gradients(zip(gradients_D2E, Model_EncD2E.trainable_variables))
print("Embedding Cost: ", average_disc1_cost/counter)
Encoding_losses.append(np.mean(average_encoding_cost)/counter)
#np.savetxt('losses_data',np.array([gen_losses,disc1_losses,disc1_human_losses,disc1_gen_losses,disc2_losses,disc2_human_losses,disc2_gen_losses]))
# Plot loss and save train/test images on every iteration
#output = generator_model(test_x[test_x.shape[0]-1:test_x.shape[0]], training=False)
#plt.imshow(output[0, :, :, :])
#plt.savefig("test-" + str(len(gen_losses)) + ".png")
#plt.clf()
#output = generator_model(test_x[0:1], training=False)
#plt.imshow(output[0, :, :, :])
#plt.savefig("train-" + str(len(gen_losses)) + ".png")
#plt.clf()
#plt.plot(gen_losses)
#plt.savefig("gen_losses.png")
#plt.clf()
#plt.plot(disc1_losses)
#plt.savefig("disc1_losses.png")
#plt.clf()
#plt.plot(disc1_gen_losses)
#plt.savefig("disc1_gen_losses.png")
#plt.clf()
#plt.plot(disc1_human_losses)
#plt.savefig("disc1_human_losses.png")
#plt.clf()
#plt.plot(disc2_losses)
#plt.savefig("disc2_losses.png")
#plt.clf()
#plt.plot(disc2_gen_losses)
#plt.savefig("disc2_gen_losses.png")
#plt.clf()
#plt.plot(disc2_human_losses)
#plt.savefig("disc2_human_losses.png")
#plt.clf()
# plt.imshow(output[0, :, :, :])
# plt.show()
# Save a checkpoint every other iteration
#if epoch % 2 == 0:
# checkpoint.save(file_prefix=checkpoint_prefix)
if __name__ == "__main__":
main()