「ラベル画像のトリミング(1)」ではLabelMeを使用してラベル画像に対するアノテーションを実施しました。
このような画像とアノテーション結果の組み合わせ(データセット)を多数用意することで、セマンティックセグメンテーションを実施するAIモデルの学習が可能になります。
と言うことで、AIモデルの構築と学習にチャレンジしてみました。
結果から先に言っておくと、現時点では、あまり期待通りの成果は得られていません。
試行錯誤の過程における失敗例も後々何かの参考になる可能性がありますので、記録に残しておこうかと。
なお、以下で提示するPythonの実装内容は全般的にGemini先生作です。
本来であれば内容を十分に理解した上で利用すべきかと思いますが、まずはそれっぽい処理が最後まで動作することを今回の目標としていますので、実装および動作確認を優先しました。
内容の理解やブラッシュアップは今後の課題としておきます。
アノテーション
まずは、学習用データを生成します。
既存ラベル画像100点に対して、LabelMeを使用してアノテーションを実施し、結果(JSON)を生成します。
ラベル画像は「./input/image」に、JSONファイルは「./input/json」に格納しておきます。
JSONファイルの名称は元画像の拡張子(「.jpg」等)を「.json」に変えたものです。
なお、モデルの学習の都合上、対象画像のサイズは一辺が2のn乗ピクセルの正方形であった方が良いようなので、とりあえず既存画像のアスペクト比を無視して、512×512にリサイズしたものを使用しました。
データセット生成
アノテーション結果(JSON)には、ラベル部分を示すPolygon(多角形)を構成する各ポイントの座標が記録されています。
この状態のままではモデルの学習には使用できないため、これを画像化します。
画像化では、ラベル部分を255(白)その他の部分を0(黒)とする二値画像(マスク画像)を生成します。
マスク画像の名称はラベル画像と同じ名前にし、ラベル画像は「./output/images」に、マスク画像は「./output/masks」に格納します。
以下が同処理の実装結果です。
import json
import numpy as np
from PIL import Image, ImageDraw
import os
import cv2
def create_mask_from_json(json_path, image_size):
with open(json_path, 'r') as f:
data = json.load(f)
mask = Image.new('L', image_size, 0)
draw = ImageDraw.Draw(mask)
for shape in data['shapes']:
points = shape['points']
points = [(int(x), int(y)) for x, y in shape['points']]
draw.polygon(points, fill=255)
return np.array(mask) / 255.0
def create_dataset(image_dir, json_dir, output_dir, image_size):
os.makedirs(os.path.join(output_dir, 'images'), exist_ok=True)
os.makedirs(os.path.join(output_dir, 'masks'), exist_ok=True)
image_files = os.listdir(image_dir)
for image_file in image_files:
if image_file.endswith(('.jpg', '.jpeg')):
image_path = os.path.join(image_dir, image_file)
json_path = os.path.join(json_dir, image_file.replace(os.path.splitext(image_file)[1], '.json'))
if os.path.exists(json_path):
image = cv2.imread(image_path)
image = cv2.resize(image, image_size)
mask = create_mask_from_json(json_path, image_size)
cv2.imwrite(os.path.join(output_dir, 'images', image_file), image)
cv2.imwrite(os.path.join(output_dir, 'masks', image_file), mask * 255)
image_dir = "./input/image"
json_dir = "./input/json"
output_dir = "./output"
image_size = (512, 512)
create_dataset(image_dir, json_dir, output_dir, image_size)
上記処理を実行し、outputディレクトリ配下にラベル画像とマスク画像のセットを生成しておきます。
TensorFlowのDataset生成
今回のモデル構築では例によってTensorFlow/Kerasを使用します。
モデルの学習時には前述のoutputディレクトリ配下の画像群を使用するのですが、学習処理の入力とするためにはTensorflow(tf).data.Datasetオブジェクトとして画像データを読み込んでおく必要があります。
また、画像の一部は評価用として使用するよう、学習用とは別のDatasetとしてオブジェクト化します。
上記機能を関数「create_tf_dataset」として実装します。
学習用Dataset「train_dataset」、評価用Dataset「val_dataset」をそれぞれ生成し、関数値として返す仕様になっています。
import tensorflow as tf
import os
import random
def create_file_paths(image_dir, mask_dir):
image_files = sorted(os.listdir(image_dir))
mask_files = sorted(os.listdir(mask_dir))
image_paths = [os.path.join(image_dir, file) for file in image_files]
mask_paths = [os.path.join(mask_dir, file) for file in mask_files]
return image_paths, mask_paths
def load_image_and_mask(image_path, mask_path, image_size):
image = tf.io.read_file(image_path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, image_size)
image = tf.cast(image, tf.float32) / 255.0
mask = tf.io.read_file(mask_path)
mask = tf.image.decode_png(mask, channels=1)
mask = tf.image.resize(mask, image_size, method='nearest')
mask = tf.cast(mask, tf.float32) / 255.0
return image, mask
def create_tf_dataset(image_dir, mask_dir, image_size, batch_size, validation_split=0.2, seed=None):
image_paths, mask_paths = create_file_paths(image_dir, mask_dir)
combined = list(zip(image_paths, mask_paths))
if seed is not None:
random.seed(seed)
random.shuffle(combined)
image_paths_tuple, mask_paths_tuple = zip(*combined)
image_paths = list(image_paths_tuple)
mask_paths = list(mask_paths_tuple)
dataset_size = len(image_paths)
validation_size = int(validation_split * dataset_size)
train_image_paths = image_paths[validation_size:]
train_mask_paths = mask_paths[validation_size:]
val_image_paths = image_paths[:validation_size]
val_mask_paths = mask_paths[:validation_size]
train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_mask_paths))
train_dataset = train_dataset.map(
lambda image_path,
mask_path: load_image_and_mask(image_path, mask_path, image_size)
)
train_dataset = train_dataset.batch(batch_size)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
val_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_mask_paths))
val_dataset = val_dataset.map(
lambda image_path,
mask_path: load_image_and_mask(image_path, mask_path, image_size)
)
val_dataset = val_dataset.batch(batch_size)
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)
return train_dataset, val_dataset
セグメンテーションモデルの構築(U-Netモデル)
学習の準備ができたのは良いとして、そもそもどのようなAIモデルを構築するかと言う点が最も重要になるのですが、セマンティックセグメンテーション用の代表的なモデルの一つである「U-Net」が良さげであるようなので(Gemini先生談)、今回はこのモデルを構築したいと思います。
と言うことで、U-Netモデルを実装してみます。
import tensorflow as tf
from tensorflow.keras import layers
def encoder(inputs, filters, max_pooling=True):
conv = layers.Conv2D(filters, 3, activation='relu', padding='same',
kernel_regularizer=tf.keras.regularizers.l2(1e-4))(inputs)
conv = layers.BatchNormalization()(conv)
conv = layers.Conv2D(filters, 3, activation='relu', padding='same',
kernel_regularizer=tf.keras.regularizers.l2(1e-4))(conv)
conv = layers.BatchNormalization()(conv)
conv = layers.Dropout(0.1)(conv)
if max_pooling:
next = layers.MaxPooling2D(pool_size=(2, 2))(conv)
else:
next = conv
return (next, conv)
def decoder(inputs, skip_connection, filters):
up = layers.Conv2DTranspose(filters, 2, strides=(2, 2), padding='same')(inputs)
merge = layers.concatenate([skip_connection, up], axis=3)
conv = layers.Conv2D(filters, 3, activation='relu', padding='same',
kernel_regularizer=tf.keras.regularizers.l2(1e-4))(merge)
conv = layers.BatchNormalization()(conv)
conv = layers.Conv2D(filters, 3, activation='relu', padding='same',
kernel_regularizer=tf.keras.regularizers.l2(1e-4))(conv)
conv = layers.BatchNormalization()(conv)
conv = layers.Dropout(0.1)(conv)
return conv
def build_unet(input_shape, num_filters=64):
inputs = layers.Input(input_shape)
# エンコーダー
enc1, skip1 = encoder(inputs, num_filters)
enc2, skip2 = encoder(enc1, num_filters * 2)
enc3, skip3 = encoder(enc2, num_filters * 4)
enc4, skip4 = encoder(enc3, num_filters * 8)
conv, _ = encoder(enc4, num_filters * 16, max_pooling=False)
# デコーダー
dec4 = decoder(conv, skip4, num_filters * 8)
dec3 = decoder(dec4, skip3, num_filters * 4)
dec2 = decoder(dec3, skip2, num_filters * 2)
dec1 = decoder(dec2, skip1, num_filters)
outputs = layers.Conv2D(1, 1, activation='sigmoid')(dec1)
return tf.keras.Model(inputs=inputs, outputs=outputs)
内容に関しては、ほぼチンプンカンプンです。
とりあえず、上記「build_unet」関数を呼び出すことで、U-Netモデルが構築される(関数値として返される)ので、それに対して学習を実施することになります。
モデルの学習
上記までで、学習用のデータの準備とモデルの生成ができそうなので、いよいよモデルの学習処理を実装します。
import tensorflow as tf
from tensorflow.keras import backend as K
def dice_coef(y_true, y_pred, smooth=1):
intersection = K.sum(y_true * y_pred, axis=[1, 2, 3])
union = K.sum(y_true, axis=[1, 2, 3]) + K.sum(y_pred, axis=[1, 2, 3])
dice = K.mean((2. * intersection + smooth) / (union + smooth), axis=0)
return dice
def dice_loss(y_true, y_pred):
return 1 - dice_coef(y_true, y_pred)
def train_model(model, dataset, epochs, batch_size, validation_data=None):
model.compile(optimizer='adam', loss=dice_loss, metrics=[dice_coef])
if validation_data is not None:
model.fit(dataset, epochs=epochs, batch_size=batch_size, validation_data=validation_data)
else:
model.fit(dataset, epochs=epochs, batch_size=batch_size)
学習処理は実は単純で、以下の2つの処理のみで構成されます。
- model.compile : 学習時に使用する各種処理(関数)を設定する
- model.fit : 学習を実行する
compileの目的は実装内容にあるようにオプティマイザ(optimizer)、損失関数(loss)、評価指標(metrics)を設定することのようです。
オプティマイザではモデルの重みを更新するためのアルゴリズムを指定するようで、「Adam (Adaptive Moment Estimation)」は「勾配の履歴に基づいて学習率を適応的に調整する高度なオプティマイザ」とのことです。
要はモデルの精度が向上するようにパラメータを更新する、まさに学習の中核となる機能と言えます(多分)。
損失関数はモデルの予測と期待された値(正解)との間の誤差を定量的に評価するための関数で、学習においては、この損失関数の値を小さくすることが目標になります。
なお、セマンティックセグメンテーションの学習においては上記にあるようなDice損失(Dice Loss)を使用すると良いようです(Gemini先生談)。
評価指標は、学習中や評価中にモデルの性能を追跡および評価するために使用される関数で、損失関数とは異なり直接的にモデルの重みを更新するために使用されるわけではないようです。
学習過程では損失関数の値も表示されますし、前述のように学習の目標は損失関数の値を小さくすることなので、そちらの変化だけを見ていけば良いのでは?と思ったりしますが、その辺は今後確認していきたいと思います。
一方、fitの目的は実際に学習を実行することであり、事前に準備した学習用Dataset、評価用Datasetを指定することで、それらを使用した学習が行われます。
また、引数ではepochs(学習回数)とbatch_size(バッチサイズ)も指定しています。
epochsの方は一回のfitの呼び出しで何回の学習を実行するかと言う分かりやすいものです。
batch_sizeに関しては…良く分かりません。宿題として残しておいて、先に進みましょう。
上記までで、学習用Datasetの生成(create_tf_dataset)、モデルの構築(build_unet)、学習の実行(train_model)に関する関数が準備できましたので、これらを呼び出して実際に学習を実施する処理を実装します。
import tensorflow as tf
from create_tf_dataset import create_tf_dataset
from build_unet import build_unet
from train_model import train_model
from train_model import dice_loss
from train_model import dice_coef
import os
IMAGE_DIR = "./output/images"
MASK_DIR = "./output/masks"
IMAGE_SIZE = (512, 512)
BATCH_SIZE = 32
INPUT_SHAPE = (512, 512, 3)
EPOCHS = 10
MODEL_PATH = "./saved_model/unet.keras"
VALIDATION_SPLIT = 0.2
RANDOM_SEED = 42
if os.path.exists(MODEL_PATH):
model = tf.keras.models.load_model(
MODEL_PATH,
custom_objects={'dice_loss': dice_loss, 'dice_coef': dice_coef}
)
else:
model = build_unet(INPUT_SHAPE)
train_dataset, val_dataset = create_tf_dataset(
IMAGE_DIR, MASK_DIR, IMAGE_SIZE, BATCH_SIZE,
validation_split=VALIDATION_SPLIT, seed=RANDOM_SEED
)
train_model(model, train_dataset, epochs=EPOCHS, batch_size=BATCH_SIZE, validation_data=val_dataset)
model.save(MODEL_PATH)
なお、モデルの学習を実施したとして、その結果は基本的にはメモリ上にしか存在しないため、プロセスが終了してしまえば消えてしまいます。
それでは意味がないため、「model.save」により学習結果をMODEL_PATHで指定したファイルに保存するようにしています。
また、保存されたモデルを呼び出して追加学習できるよう、MODEL_PATHで指定したファイルが存在すれば、それを読み込んだ上で再度学習を実行するようにしています。
セグメンテーションの実行
前述の処理で生成した学習済みモデルを使用して、実際にセグメンテーションを実行してみましょう。
具体的には以下のような処理を実行します。
import cv2
import numpy as np
import tensorflow as tf
import os
from train_model import dice_loss
from train_model import dice_coef
def preprocess_image(image_path, image_size):
try:
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Failed to read image: {image_path}")
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, image_size)
image = image / 255.0
return image.astype(np.float32)
except Exception as e:
print(f"Error preprocessing image {image_path}: {e}")
return None
def infer_mask(model, image):
try:
image = np.expand_dims(image, axis=0)
prediction = model.predict(image)
mask = prediction.squeeze()
return mask
except Exception as e:
print(f"Error inferring mask: {e}")
return None
def postprocess_equalization(mask):
mask_scaled = (mask * 255).astype(np.uint8)
equalized_mask = cv2.equalizeHist(mask_scaled)
normalized_mask = equalized_mask / 255.0
return normalized_mask
def postprocess_mask(mask, threshold=0.5):
return (mask > threshold).astype(np.uint8)
def save_mask(mask, save_path, format='png'):
try:
if format == 'png':
cv2.imwrite(save_path, mask * 255)
elif format == 'jpeg':
cv2.imwrite(save_path, mask * 255)
else:
raise ValueError(f"Unsupported format: {format}")
except Exception as e:
print(f"Error saving mask: {e}")
IMAGE_DIR = "./output/images"
IMAGE_SIZE = (512, 512)
OUTPUT_DIR = "./predict"
THRESHOLD = 0.5
MODEL_PATH = "./saved_model/unet.keras"
os.makedirs(OUTPUT_DIR, exist_ok=True)
try:
loaded_model = tf.keras.models.load_model(
MODEL_PATH,
custom_objects={'dice_loss': dice_loss, 'dice_coef': dice_coef}
)
except Exception as e:
print(f"Error loading model: {e}")
exit()
image_files = os.listdir(IMAGE_DIR)
for image_file in image_files:
image_path = os.path.join(IMAGE_DIR, image_file)
save_path = os.path.join(OUTPUT_DIR, os.path.splitext(image_file)[0] + '.png')
image = preprocess_image(image_path, IMAGE_SIZE)
if image is None:
continue
mask = infer_mask(loaded_model, image)
if mask is None:
continue
mask = postprocess_equalization(mask)
#mask = postprocess_mask(mask, THRESHOLD)
save_mask(mask, save_path)
モデルをロードし、画像を読み込み、その画像に対するラベル部分の予測(model.predict)を実施し、結果をpngファイルとして保存すると言うのが大まかな流れです。
本来であれば「postprocess_mask」で二値化するのですが、モデルの素の予測結果(各ピクセルをラベル部分である可能性の高さに準じて255から0までの値で表現した画像)を見てみたかったので、現在はコメントアウトしています。
また、「postprocess_equalization」では予測結果に対してヒストグラム平坦化を行なっています。
これは予測結果が極端に白(ラベルっぽい)寄りであったり、逆に黒(ラベルっぽくない)寄りになり、そのまま特定の閾値(上記実装では0.5)で二値化しても単なる真っ白(真っ黒)な画像になるだけというケースが多かったためです。
強制的に色の範囲を白から黒に幅広く分散するようにして、閾値0.5でもそれっぽく二値化されるようにした訳ですが、そもそもそのような加工をしなければならない時点で予測の精度にかなりの不安がありますが…
まとめ
とりあえず、関連処理をひたすら実装してみたと言うのが今回の内容です。
上記実装内容は実際に動作し、予測画像の出力もできるようになりました。
ただ、結果が今ひとつなんですよね。
その辺に関して、次回以降で触れていきたいと思います。