Konten Overview
- Introduksi
- Setup
- Contoh sederhana pertama
- Menuju tingkat yang lebih rendah
- Mendukung sample_weight & class_weight
- Memberikan langkah evaluasi Anda sendiri
- Wrapping up: contoh GAN end-to-end
Introduksi
Ketika Anda melakukan pengawasan belajar, Anda dapat menggunakanfit()
Dan semuanya berjalan dengan lancar.
Ketika Anda perlu menulis lingkaran pelatihan Anda sendiri dari awal, Anda dapat menggunakanGradientTape
dan mengendalikan setiap detail kecil.
Tetapi bagaimana jika Anda membutuhkan algoritma pelatihan kustom, tetapi Anda masih ingin mendapatkan keuntungan dari fitur nyaman darifit()
, seperti callbacks, dukungan distribusi built-in, atau step fusion?
Prinsip utama dari keras adalahprogressive disclosure of complexityAnda harus selalu dapat masuk ke alur kerja tingkat bawah secara bertahap. Anda tidak harus jatuh dari tebing jika fungsionalitas tingkat tinggi tidak sesuai dengan kasus penggunaan Anda. Anda harus dapat mendapatkan lebih banyak kendali atas detail kecil sambil mempertahankan jumlah yang proporsional dari kenyamanan tingkat tinggi.
Ketika Anda perlu menyesuaikan apa yangfit()
Anda harus, Anda harusoverride the training step function of the Model
Ini adalah fungsi yang disebut denganfit()
untuk setiap batch data. maka Anda akan dapat menghubungifit()
seperti biasa - dan itu akan menjalankan algoritma belajar Anda sendiri.
Perhatikan bahwa pola ini tidak mencegah Anda membangun model dengan API Fungsi.Sequential
model, model API fungsional, atau model subklasifikasi.
Mari kita lihat bagaimana itu bekerja.
Setup
Diperlukan TensorFlow 2.8 atau lebih baru.
import tensorflow as tf
from tensorflow import keras
Contoh sederhana pertama
Mari kita mulai dengan contoh sederhana:
- Kami membuat kelas baru yang subklasifikasi keras.Model.
- Metode yang digunakan adalah train_step (self, data).
- Kami mengembalikan nama metrik (termasuk kerugian) ke nilai saat ini.
Argumentasi inputdata
adalah apa yang ditransmisikan untuk cocok sebagai data pelatihan:
- Jika Anda melewati array Numpy, dengan memanggil fit(x, y, ...), maka data akan menjadi tuple (x, y)
- Jika Anda melewati tf.data.Dataset, dengan memanggil fit(dataset, ...), maka data akan menjadi apa yang dihasilkan oleh dataset pada setiap batch.
Dalam tubuh daritrain_step
metode, kami menerapkan pembaruan pelatihan reguler, mirip dengan apa yang sudah Anda kenal.we compute the loss via self.compute_loss()
yang merugikan orang-orang yang telah merugikan orang-orang yang telah merugikancompile()
.
Demikian pula, kita memanggilmetric.update_state(y, y_pred)
Metrik dariself.metrics
, untuk memperbarui status metrik yang telah ditransmisikan dicompile()
Dan kami mencari hasil dariself.metrics
pada akhirnya untuk mengembalikan nilainya saat ini.
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = self.compute_loss(y=y, y_pred=y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
for metric in self.metrics:
if metric.name == "loss":
metric.update_state(loss)
else:
metric.update_state(y, y_pred)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
Mari kita coba ini:
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 3s 2ms/step - loss: 1.6446
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.7554
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.3924
<keras.src.callbacks.History at 0x7fef5c11ba30>
Menuju tingkat yang lebih rendah
Tentu saja, Anda hanya bisa melewatkan fungsi kehilangan dicompile()
Sebaliknya, lakukan segalanyasecara manualdalamtrain_step
Begitu juga dengan metrik.
Berikut adalah contoh yang lebih rendah, yang hanya menggunakancompile()
Cara Menggunakan Optimizer:
- Kami mulai dengan membuat instansi Metrik untuk melacak kerugian kami dan skor MAE (dalam __init__()).
- Kami menerapkan train_step() yang disesuaikan yang memperbarui status metrik ini (dengan memanggil update_state() di atasnya), kemudian menginterogasi mereka (melalui result()) untuk mengembalikan nilai rata-rata mereka saat ini, untuk ditampilkan oleh bar kemajuan dan untuk ditransfer ke panggilan balik.
- Perhatikan bahwa kita perlu memanggil reset_states() pada metrik kita antara setiap era! Jika tidak, memanggil result() akan mengembalikan rata-rata sejak awal pelatihan, sementara kita biasanya bekerja dengan rata-rata per era. Untungnya, framework dapat melakukannya untuk kami: hanya daftar metrik apa pun yang ingin Anda reset dalam properti metrik model. Model akan memanggil reset_states() pada obyek apa pun yang tercantum di sini pada awal setiap era fit() atau pada awal panggilan untuk mengevaluasi().
class CustomModel(keras.Model):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loss_tracker = keras.metrics.Mean(name="loss")
self.mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute our own loss
loss = keras.losses.mean_squared_error(y, y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Compute our own metrics
self.loss_tracker.update_state(loss)
self.mae_metric.update_state(y, y_pred)
return {"loss": self.loss_tracker.result(), "mae": self.mae_metric.result()}
@property
def metrics(self):
# We list our `Metric` objects here so that `reset_states()` can be
# called automatically at the start of each epoch
# or at the start of `evaluate()`.
# If you don't implement this property, you have to call
# `reset_states()` yourself at the time of your choosing.
return [self.loss_tracker, self.mae_metric]
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# We don't pass a loss or metrics here.
model.compile(optimizer="adam")
# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 2ms/step - loss: 0.3240 - mae: 0.4583
Epoch 2/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2416 - mae: 0.3984
Epoch 3/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2340 - mae: 0.3919
Epoch 4/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2274 - mae: 0.3870
Epoch 5/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2197 - mae: 0.3808
<keras.src.callbacks.History at 0x7fef3c130b20>
DukunganSample = berat
danclass_weight
Sample = berat
kelas / berat
Anda mungkin telah memperhatikan bahwa contoh dasar pertama kami tidak menyebutkan pengukuran sampel.fit()
Argumensample_weight
danclass_weight
Anda hanya akan melakukan hal berikut:
- Unpack sample_weight dari argumen data
- Kirim ke compute_loss & update_state (Tentu saja, Anda juga bisa menerapkannya secara manual jika Anda tidak mengandalkan compile() untuk kerugian & metrik)
- Itulah yang
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compute_loss(
y=y,
y_pred=y_pred,
sample_weight=sample_weight,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
for metric in self.metrics:
if metric.name == "loss":
metric.update_state(loss)
else:
metric.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1298
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1179
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1121
<keras.src.callbacks.History at 0x7fef3c168100>
Memberikan langkah evaluasi Anda sendiri
Bagaimana jika Anda ingin melakukan hal yang sama untuk panggilanmodel.evaluate()
Maka engkau akan melampauitest_step
Begitu juga dengan yang lainnya, berikut ini penampilannya:
class CustomModel(keras.Model):
def test_step(self, data):
# Unpack the data
x, y = data
# Compute predictions
y_pred = self(x, training=False)
# Updates the metrics tracking the loss
self.compute_loss(y=y, y_pred=y_pred)
# Update the metrics.
for metric in self.metrics:
if metric.name != "loss":
metric.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 0.9028
0.9028095006942749
Wrapping up: contoh GAN end-to-end
Mari kita berjalan melalui contoh end-to-end yang memanfaatkan semua yang baru saja Anda pelajari.
Mari kita pertimbangkan :
- Jaringan generator dimaksudkan untuk menghasilkan gambar 28x28x1.
- Sebuah jaringan diskriminator dimaksudkan untuk mengklasifikasikan gambar 28x28x1 ke dalam dua kelas ("palsu" dan "nyata").
- Optimisasi untuk masing-masing
- Fungsi kehilangan untuk melatih diskriminator.
from tensorflow.keras import layers
# Create the discriminator
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# Create the generator
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
Berikut ini adalah fitur lengkap GAN class, overridingcompile()
untuk menggunakan tanda tangan sendiri, dan menerapkan seluruh algoritma GAN dalam 17 baris dalamtrain_step
:
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super().__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
self.d_loss_tracker = keras.metrics.Mean(name="d_loss")
self.g_loss_tracker = keras.metrics.Mean(name="g_loss")
def compile(self, d_optimizer, g_optimizer, loss_fn):
super().compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Sample random points in the latent space
batch_size = tf.shape(real_images)[0]
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Decode them to fake images
generated_images = self.generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(
zip(grads, self.discriminator.trainable_weights)
)
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
# Update metrics and return their value.
self.d_loss_tracker.update_state(d_loss)
self.g_loss_tracker.update_state(g_loss)
return {
"d_loss": self.d_loss_tracker.result(),
"g_loss": self.g_loss_tracker.result(),
}
Mari kita coba mengendalikannya:
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11490434/11490434 [==============================] - 0s 0us/step
100/100 [==============================] - 8s 15ms/step - d_loss: 0.4372 - g_loss: 0.8775
<keras.src.callbacks.History at 0x7feee42ff190>
Ide di balik pembelajaran mendalam sederhana, jadi mengapa implementasinya harus menyakitkan?
Awalnya diterbitkan di situs web TensorFlow, artikel ini muncul di sini dengan judul baru dan berlisensi di bawah CC BY 4.0.
Awalnya diterbitkan di situs web TensorFlow, artikel ini muncul di sini dengan judul baru dan berlisensi di bawah CC BY 4.0.
TensorFlow