5.12【机器学习】卷积模型搭建
softmax输出时不可能为所有模型提供精确且数值稳定的损失计算
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
打开一个遍历个周期的FOR循环
对于每个周期,打开一个分批遍历数据集的FOR循环
glob,返回所有匹配的文件路径列表,需要一个参数用来指定匹配的路径字符串(字符串可以为绝对路径,也可以为相对路径),其返回的文件名只包括当前目录里的文件名,不包括子文件夹里的文件
glob.glob(r'c:*.txt')
可以根据层将要运算的输入的形状启用变量创建,根据层将要运算的输入的形状启用变量创建
而在__init__则意味着需要指定创建变量所需的形状
卷积、批次归一化和捷径的组合
_ = layer(tf.zeros([10, 5])) # Calling the layer `.builds` it.
class ResnetIdentityBlock(tf.keras.Model):
def __init__(self, kernel_size, filters):
super(ResnetIdentityBlock, self).__init__(name='')
filters1, filters2, filters3 = filters
self.conv2a = tf.keras.layers.Conv2D(filters1, (1, 1))
self.bn2a = tf.keras.layers.BatchNormalization()
self.conv2b = tf.keras.layers.Conv2D(filters2, kernel_size, padding='same')
self.bn2b = tf.keras.layers.BatchNormalization()
self.conv2c = tf.keras.layers.Conv2D(filters3, (1, 1))
self.bn2c = tf.keras.layers.BatchNormalization()
def call(self, input_tensor, training=False):
x = self.conv2a(input_tensor)
x = self.bn2a(x, training=training)
x = tf.nn.relu(x)
x = self.conv2b(x)
x = self.bn2b(x, training=training)
x = tf.nn.relu(x)
x = self.conv2c(x)
x = self.bn2c(x, training=training)
x += input_tensor
return tf.nn.relu(x)
block = ResnetIdentityBlock(1, [1, 2, 3])
自己的训练循环分为三个步骤,迭代Python生成器或tf.data.Dataset获得样本批次
使用tf.G收集梯度
tf.opt将权重更新应用于模型
tf.random.set_seed(2345)
current_time = datetime.datetime.now().strftime(('%Y%m%d-%H%M%S'))
log_dir = 'logs/'+current_time
summary_writer = tf.summary.create_file_writer(log_dir)
def preprocess(x, y):
# [0~1]
x = 2 * tf.cast(x, dtype=tf.float32) / 255. - 1
y = tf.cast(y, dtype=tf.int32)
return x, y
data_dir = 'D:\\MachineLearning\\exp3\\flowers'
batch_size = 32
img_height = 32
img_width = 32
#从磁盘中获取数据并进行划分
train_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=123,
image_size=(img_height, img_width),
batch_size=batch_size)
val_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=123,
image_size=(img_height, img_width),
batch_size=batch_size)
class_names = train_ds.class_names
print(class_names)
# # Set `num_parallel_calls` so multiple images are loaded/processed in parallel.
# train_ds = train_ds.map(process_path, num_parallel_calls=AUTOTUNE)
# val_ds = val_ds.map(process_path, num_parallel_calls=AUTOTUNE)
# for image, label in train_ds.take(1):
# print("Image shape: ", image.numpy().shape)
# print("Label: ", label.numpy())
# for image_batch, labels_batch in train_ds:
# print(image_batch.shape)
# print(labels_batch.shape)
# break
def configure_for_performance(ds):
ds = ds.cache()
ds = ds.shuffle(buffer_size=1000)
ds = ds.batch(batch_size)
ds = ds.prefetch(buffer_size=AUTOTUNE)
return ds
# train_ds = configure_for_performance(train_ds)
# val_ds = configure_for_performance(val_ds)
# train_ds= tf.squeeze(train_ds, axis=1)
# val_ds= tf.squeeze(val_ds, axis=1)
# (x, y), (x_test, y_test) = datasets.cifar10.load_data()
# y = tf.squeeze(y, axis=1)
# y_test = tf.squeeze(y_test, axis=1)
# print(x.shape, y.shape, x_test.shape, y_test.shape)
#
# train_db = tf.data.Dataset.from_tensor_slices((x, y))
# train_db = train_db.shuffle(1000).map(preprocess).batch(256)
#
# test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test))
# test_db = test_db.map(preprocess).batch(256)
#
# sample = next(iter(train_db))
# print('sample:', sample[0].shape, sample[1].shape,
# tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))
def main():
# [b, 32, 32, 3] => [b, 1, 1, 512]
model = ResNetmodel()
model.build([None, 32, 32, 3])
# model.summary() # 统计网络参数
optimizer = optimizers.Adam(learning_rate=1e-3)
# [1, 2] + [3, 4] => [1, 2, 3, 4]
variables = model.trainable_variables
for epoch in range(100):
for step, (x, y) in enumerate(train_ds):
with tf.GradientTape() as tape:
# [b, 32, 32, 3] => [b, 1, 1, 512]
out = model(x)
# [b] => [b, 5]
y_onehot = tf.one_hot(y, depth=5)
# compute loss
loss = tf.losses.categorical_crossentropy(y_onehot, out, from_logits=True)
loss = tf.reduce_mean(loss)
grads = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(grads, variables))
if step % 100 == 0:
with summary_writer.as_default():
tf.summary.scalar('loss', loss, step=step)
total_num = 0
total_correct = 0
for x, y in val_ds:
out = model(x)
prob = tf.nn.softmax(out, axis=1)
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
acc = total_correct / total_num
with summary_writer.as_default():
tf.summary.scalar('acc', float(acc), step=epoch)
if __name__ == '__main__':
main()
需要在每个周期之间对指标调用
optimizer = tf.keras.optimizers.Adam(0.001)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
@tf.function
def train_step(inputs, labels):
with tf.GradientTape() as tape:
predictions = model(inputs, training=True)
regularization_loss=tf.math.add_n(model.losses)
pred_loss=loss_fn(labels, predictions)
total_loss=pred_loss + regularization_loss
gradients = tape.gradient(total_loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for epoch in range(NUM_EPOCHS):
for inputs, labels in train_ds:
train_step(inputs, labels)
print("Finished epoch", epoch)