Add files via upload

This commit is contained in:
sorosoo 2023-04-07 22:29:25 +08:00 committed by GitHub
parent 6422c0bfd5
commit 6cf33e4fd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2885 additions and 0 deletions

8
arguments.py Normal file
View File

@ -0,0 +1,8 @@
import yaml
from pathlib import Path
model_config = Path("config") / "model.yaml"
with model_config.open("r", encoding="utf-8") as f:
model_config = yaml.load(f, yaml.FullLoader)
# 类别
classes = model_config["classes"]

310
bricks.py Normal file
View File

@ -0,0 +1,310 @@
import json
from abc import abstractmethod
import torch
import torch.nn as nn
import torch.nn.functional as F
class DropPath(nn.Module):
def __init__(self, drop_prob=0.):
super(DropPath, self).__init__()
self.drop_prob = drop_prob
def forward(self, x):
if not self.training or self.drop_prob == 0.:
return x
keep_prob = 1 - self.drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1)
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_()
output = x.div(keep_prob) * random_tensor
return output
"""
逐层卷积
"""
class DepthwiseConv(nn.Module):
"""
in_channels: 输入通道数
out_channels: 输出通道数
kernel_size: 卷积核大小元组类型
padding: 补充
stride: 步长
"""
def __init__(self, in_channels, kernel_size=(3, 3), padding=(1, 1), stride=(1, 1), bias=False):
super(DepthwiseConv, self).__init__()
self.conv = nn.Conv2d(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=kernel_size,
padding=padding,
stride=stride,
groups=in_channels,
bias=bias
)
def forward(self, x):
out = self.conv(x)
return out
"""
逐点卷积
"""
class PointwiseConv(nn.Module):
def __init__(self, in_channels, out_channels):
super(PointwiseConv, self).__init__()
self.conv = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=(1, 1),
stride=(1, 1),
padding=(0, 0)
)
def forward(self, x):
out = self.conv(x)
return out
"""
深度可分离卷积
"""
class DepthwiseSeparableConv(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size=(3, 3), padding=(1, 1), stride=(1, 1)):
super(DepthwiseSeparableConv, self).__init__()
self.conv1 = DepthwiseConv(
in_channels=in_channels,
kernel_size=kernel_size,
padding=padding,
stride=stride
)
self.conv2 = PointwiseConv(
in_channels=in_channels,
out_channels=out_channels
)
def forward(self, x):
out = self.conv1(x)
out = self.conv2(out)
return out
"""
下采样
[batch_size, in_channels, height, width] -> [batch_size, out_channels, height // stride, width // stride]
"""
class DownSampling(nn.Module):
"""
in_channels: 输入通道数
out_channels: 输出通道数
kernel_size: 卷积核大小
stride: 步长
norm_layer: 正则化层如果为None使用BatchNorm
"""
def __init__(self, in_channels, out_channels, kernel_size, stride, norm_layer=None):
super(DownSampling, self).__init__()
self.conv = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=(kernel_size[0] // 2, kernel_size[-1] // 2)
)
if norm_layer is None:
self.norm = nn.BatchNorm2d(num_features=out_channels)
else:
self.norm = norm_layer
def forward(self, x):
out = self.conv(x)
out = self.norm(out)
return out
class _MatrixDecomposition2DBase(nn.Module):
def __init__(
self,
args=json.dumps(
{
"SPATIAL": True,
"MD_S": 1,
"MD_D": 512,
"MD_R": 64,
"TRAIN_STEPS": 6,
"EVAL_STEPS": 7,
"INV_T": 100,
"ETA": 0.9,
"RAND_INIT": True,
"return_bases": False,
"device": "cuda"
}
)
):
super(_MatrixDecomposition2DBase, self).__init__()
args: dict = json.loads(args)
for k, v in args.items():
setattr(self, k, v)
@abstractmethod
def _build_bases(self, batch_size):
pass
@abstractmethod
def local_step(self, x, bases, coef):
pass
@torch.no_grad()
def local_inference(self, x, bases):
# (batch_size * MD_S, MD_D, N)^T @ (batch_size * MD_S, MD_D, MD_R) -> (batchszie * MD_S, N, MD_R)
coef = torch.bmm(x.transpose(1, 2), bases)
coef = F.softmax(self.INV_T * coef, dim=-1)
steps = self.TRAIN_STEPS if self.training else self.EVAL_STEPS
for _ in range(steps):
bases, coef = self.local_step(x, bases, coef)
return bases, coef
@abstractmethod
def compute_coef(self, x, bases, coef):
pass
def forward(self, x):
batch_size, channels, height, width = x.shape
# (batch_size, channels, height, width) -> (batch_size * MD_S, MD_D, N)
if self.SPATIAL:
self.MD_D = channels // self.MD_S
N = height * width
x = x.view(batch_size * self.MD_S, self.MD_D, N)
else:
self.MD_D = height * width
N = channels // self.MD_S
x = x.view(batch_size * self.MD_S, N, self.MD_D).transpose(1, 2)
if not self.RAND_INIT and not hasattr(self, 'bases'):
bases = self._build_bases(1)
self.register_buffer('bases', bases)
# (MD_S, MD_D, MD_R) -> (batch_size * MD_S, MD_D, MD_R)
if self.RAND_INIT:
bases = self._build_bases(batch_size)
else:
bases = self.bases.repeat(batch_size, 1, 1)
bases, coef = self.local_inference(x, bases)
# (batch_size * MD_S, N, MD_R)
coef = self.compute_coef(x, bases, coef)
# (batch_size * MD_S, MD_D, MD_R) @ (batch_size * MD_S, N, MD_R)^T -> (batch_size * MD_S, MD_D, N)
x = torch.bmm(bases, coef.transpose(1, 2))
# (batch_size * MD_S, MD_D, N) -> (batch_size, channels, height, width)
if self.SPATIAL:
x = x.view(batch_size, channels, height, width)
else:
x = x.transpose(1, 2).view(batch_size, channels, height, width)
# (batch_size * height, MD_D, MD_R) -> (batch_size, height, N, MD_D)
bases = bases.view(batch_size, self.MD_S, self.MD_D, self.MD_R)
if self.return_bases:
return x, bases
return x
class NMF2D(_MatrixDecomposition2DBase):
def __init__(
self,
args=json.dumps(
{
"SPATIAL": True,
"MD_S": 1,
"MD_D": 512,
"MD_R": 64,
"TRAIN_STEPS": 6,
"EVAL_STEPS": 7,
"INV_T": 1,
"ETA": 0.9,
"RAND_INIT": True,
"return_bases": False,
"device": "cuda"
}
)
):
super(NMF2D, self).__init__(args)
def _build_bases(self, batch_size):
bases = torch.rand((batch_size * self.MD_S, self.MD_D, self.MD_R)).to(self.device)
bases = F.normalize(bases, dim=1)
return bases
# @torch.no_grad()
def local_step(self, x, bases, coef):
# (batch_size * MD_S, MD_D, N)^T @ (batch_size * MD_S, MD_D, MD_R) -> (batch_size * MD_S, N, MD_R)
numerator = torch.bmm(x.transpose(1, 2), bases)
# (batch_size * MD_S, N, MD_R) @ [(batch_size * MD_S, MD_D, MD_R)^T @ (batch_size * MD_S, MD_D, MD_R)]
# -> (batch_size * MD_S, N, MD_R)
denominator = coef.bmm(bases.transpose(1, 2).bmm(bases))
# Multiplicative Update
coef = coef * numerator / (denominator + 1e-6)
# (batch_size * MD_S, MD_D, N) @ (batch_size * MD_S, N, MD_R) -> (batch_size * MD_S, MD_D, MD_R)
numerator = torch.bmm(x, coef)
# (batch_size * MD_S, MD_D, MD_R) @ [(batch_size * MD_S, N, MD_R)^T @ (batch_size * MD_S, N, MD_R)]
# -> (batch_size * MD_S, D, MD_R)
denominator = bases.bmm(coef.transpose(1, 2).bmm(coef))
# Multiplicative Update
bases = bases * numerator / (denominator + 1e-6)
return bases, coef
def compute_coef(self, x, bases, coef):
# (batch_size * MD_S, MD_D, N)^T @ (batch_size * MD_S, MD_D, MD_R) -> (batch_size * MD_S, N, MD_R)
numerator = torch.bmm(x.transpose(1, 2), bases)
# (batch_size * MD_S, N, MD_R) @ (batch_size * MD_S, MD_D, MD_R)^T @ (batch_size * MD_S, MD_D, MD_R)
# -> (batch_size * MD_S, N, MD_R)
denominator = coef.bmm(bases.transpose(1, 2).bmm(bases))
# multiplication update
coef = coef * numerator / (denominator + 1e-6)
return coef
if __name__ == "__main__":
a = torch.ones(2, 3, 128, 128).to(device="cuda")
n = NMF2D(
json.dumps(
{
"SPATIAL": True,
"MD_S": 1,
"MD_D": 512,
"MD_R": 16,
"TRAIN_STEPS": 6,
"EVAL_STEPS": 7,
"INV_T": 1,
"ETA": 0.9,
"RAND_INIT": True,
"return_bases": False,
"device": "cuda"
}
)
)
print(n(a).shape)

442
data_utils.py Normal file
View File

@ -0,0 +1,442 @@
import json
import math
from torch.utils.data import Dataset
from pathlib import Path
from torch.utils.data.dataset import T_co
from PIL import Image
import numpy as np
import torch
from torchvision import transforms
import arguments
import utils
import torch.nn.functional as F
"""
处理labels中1~224的像素即进行如下处理
224 -> 1
223 -> 2
...
labels: 标签集合/模型预测集合[batch_size, channels=1, height, width]
返回值
labels, [batch_size, channels=1, height, width]
"""
@torch.no_grad()
def converge_labels(labels: torch.Tensor, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
assert len(labels.shape) == 4 and labels.shape[1] == 1
labels = labels.to(device)
for num in range(254, 127, -1):
labels[labels == num] = 255 - num
return labels
"""
对labels进行独热编码
classes_num: 编码的类别数量
labels: 标签集合, [batch_size, channels=1, height, width]
返回值独热编码后的矩阵, [batch_size, height * width, classes_num]
"""
@torch.no_grad()
def one_hot(
classes_num: int,
labels: torch.Tensor,
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
assert len(labels.shape) == 4 and labels.shape[1] == 1
labels = labels.to(device)
# (batch_size, channels, height, width) -> (batch_size, channels, height * width)
labels = torch.flatten(labels, start_dim=-2)
# (batch_size, channels, height * width) -> (batch_size, height * width, channels)
labels = torch.transpose(labels, -2, -1)
assert labels.shape[-1] == 1
# (batch_size, height * width, channels) -> (batch_size, height * width)
labels = torch.squeeze(labels, dim=-1).long()
# (batch_size, height * width, classes_num)
one_hot_labels = torch.zeros(*labels.shape, classes_num).to(device)
return torch.scatter(input=one_hot_labels, dim=-1, index=torch.unsqueeze(labels, -1), value=1.)
"""
将模型的输出反独热编码
outputs: [batch_size, classes_num, height, width]
返回值
反独热编码后的张量, [batch_size, 1, height, width]
"""
@torch.no_grad()
def inv_one_hot_of_outputs(
outputs,
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
):
assert len(outputs.shape) == 4
result = torch.argmax(
F.log_softmax(
input=outputs.to(device).permute(0, 2, 3, 1),
dim=-1
),
dim=-1,
keepdim=True
).permute(0, 3, 1, 2)
return result
"""
将PIL读取格式的图片或np转换为tensor格式同时将维度顺序和数量进行转换
返回值[channels, height, width]
"""
@torch.no_grad()
def pil2tensor(pil, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
to_tensor = transforms.ToTensor()
return to_tensor(pil).to(device)
class Pic2PicDataset(Dataset):
"""
root: 数据集存放的目录该目录中存放了数据(x)及其对应的标签(y)
x_dir_name: root下数据(x)所处的目录名
y_dir_name: root下标签(y)所处的目录名
"""
def __init__(self, root: str, x_dir_name="images", y_dir_name="labels", device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
super(Pic2PicDataset, self).__init__()
self.device = device
x_paths = (Path(root) / x_dir_name).glob(pattern="*")
y_paths = (Path(root) / y_dir_name).glob(pattern="*")
self.x2y_paths = list(zip(x_paths, y_paths))
def __len__(self):
return len(self.x2y_paths)
def __getitem__(self, index) -> T_co:
item = self.x2y_paths[index]
x_path, y_path = item
x = Image.open(x_path)
y = Image.open(y_path)
y_np = np.array(y)
y.close()
y = converge_labels(torch.from_numpy(y_np).unsqueeze(0).unsqueeze(0), device=self.device)
return pil2tensor(x, self.device), y.squeeze(0)
class ConfusionMatrix:
def __init__(self, classes_num):
self.classes_num = classes_num
# matrix的维度[classes_num, classes_num]
self.matrix = None
"""
计算混淆矩阵
labels: 真实标签[batch_size, channels=1, height, width]
labels已经经过converge_labels()处理其中的像素值都是类别对应的较小label
predictions: 预测值[batch_size, channels=1, height, width]
predictions也已经经过converge_labels()处理其中的像素值也已经被处理为类别对应的较小label
"""
@torch.no_grad()
def update(self, labels, predictions, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
assert len(labels.shape) == 4 and len(predictions.shape) == 4 and labels.shape[1] == 1 and predictions.shape[1] == 1
if self.matrix is None:
labels = labels.to(device)
predictions = predictions.to(device)
# [batch_size, channels=1, height, width] -> [batch_size, height, width]
labels = torch.squeeze(labels, dim=1)
# [batch_size, channels=1, height, width] -> [batch_size, height, width]
predictions = torch.squeeze(predictions, dim=1)
# mask: [batch_size, height, width]
mask = (labels < self.classes_num) | (predictions < self.classes_num)
# labels_masked: [batch_size, height, width]
labels_masked = labels[mask]
# predictions_masked: [batch_size, height, width]
predictions_masked = predictions[mask]
assert labels_masked.shape == predictions_masked.shape
# matrix: [classes_num, classes_num], all ele is 0
self.matrix = torch.zeros(self.classes_num, self.classes_num, dtype=torch.float32, device=device)
for row in range(0, self.classes_num):
for col in range(0, self.classes_num):
cnt = torch.sum((labels_masked == row) & (predictions_masked == col))
self.matrix[row, col] = cnt
"""
清空混淆矩阵
"""
def reset(self):
self.matrix = None
"""
获取计算出的混淆矩阵
"""
def get_confusion_matrix(self):
assert self.matrix is not None
return self.matrix
"""
计算某一个标签对应的类别的精度
label_of_cls: 类别的标签值
返回值
(cls_name, precision)
"""
@torch.no_grad()
def adjust_cls_precision(self, label_of_cls):
assert self.matrix is not None and 0 <= label_of_cls < self.classes_num
result = (
utils.get_cls_of_label(arguments.classes, label_of_cls),
(self.matrix[label_of_cls, label_of_cls] / torch.sum(self.matrix[:, label_of_cls])).item()
)
return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.)
"""
计算所有类别的精度
返回值
列表, [(cls_name, precision), ...]
"""
@torch.no_grad()
def adjust_classes_precision(self):
cls_precision_list = []
# 0是background(背景)的标签值
for label_of_cls in range(0, self.classes_num):
cls_precision_list.append(self.adjust_cls_precision(label_of_cls))
return cls_precision_list
"""
计算平均预测精度
返回值
precision
"""
@torch.no_grad()
def adjust_avg_precision(self):
assert self.matrix is not None
try:
return math.fsum([tp[-1] for tp in self.adjust_classes_precision()]) / self.classes_num
except ZeroDivisionError as e:
return 0.
"""
计算某一个标签对应的类别的召回率
返回值
(cls_name, recall)
"""
@torch.no_grad()
def adjust_cls_recall(self, label_of_cls):
assert self.matrix is not None and 0 <= label_of_cls < self.classes_num
result = (
utils.get_cls_of_label(arguments.classes, label_of_cls),
(self.matrix[label_of_cls, label_of_cls] / torch.sum(self.matrix[label_of_cls, :])).item()
)
return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.)
"""
计算所有类别的召回率
返回值
列表, [(cls_name, recall), ...]
"""
@torch.no_grad()
def adjust_classes_recall(self):
cls_recall_list = []
# 0是background(背景)的标签值
for label_of_cls in range(0, self.classes_num):
cls_recall_list.append(self.adjust_cls_recall(label_of_cls))
return cls_recall_list
"""
计算平均召回率
返回值
recall
"""
@torch.no_grad()
def adjust_avg_recall(self):
assert self.matrix is not None
try:
return math.fsum([tp[-1] for tp in self.adjust_classes_recall()]) / self.classes_num
except ZeroDivisionError as e:
return 0.
"""
计算准确率
"""
@torch.no_grad()
def adjust_accuracy(self):
assert self.matrix is not None
try:
return (torch.sum(torch.diag(self.matrix)) / torch.sum(self.matrix)).item()
except ZeroDivisionError as e:
return 0.
"""
计算某一个标签对应的类别的iou
返回值
(cls_name, iou)
"""
@torch.no_grad()
def adjust_cls_iou(self, label_of_cls):
assert self.matrix is not None and 0 <= label_of_cls < self.classes_num
result = (
utils.get_cls_of_label(arguments.classes, label_of_cls),
(self.matrix[label_of_cls, label_of_cls] /
(torch.sum(
torch.cat(
[
self.matrix[label_of_cls, :].view(-1),
self.matrix[:, label_of_cls].view(-1)
]
)
) - self.matrix[label_of_cls, label_of_cls])).item()
)
return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.)
"""
计算所有类别的iou
返回值
列表, [(cls_name, iou), ...]
"""
@torch.no_grad()
def adjust_classes_iou(self):
cls_iou_list = []
# 0是background(背景)的标签值
for label_of_cls in range(0, self.classes_num):
cls_iou_list.append(self.adjust_cls_iou(label_of_cls))
return cls_iou_list
"""
计算平均iou
返回值
iou
"""
@torch.no_grad()
def adjust_avg_iou(self):
assert self.matrix is not None
try:
return math.fsum([tp[-1] for tp in self.adjust_classes_iou()]) / self.classes_num
except ZeroDivisionError as e:
return 0.
"""
返回评价指标
一个函数全部包括
返回值
字典
{
"classes_precision": [(cls_name, precision), ...],
"avg_precision": precision,
"classes_recall": [(cls_name, recall), ...],
"avg_recall": recall,
"classes_iou": [(cls_name, iou), ...],
"avg_iou": iou,
"accuracy": accuracy
}
"""
@torch.no_grad()
def get_scores(self):
return {
"classes_precision": self.adjust_classes_precision(),
"avg_precision": self.adjust_avg_precision(),
"classes_recall": self.adjust_classes_recall(),
"avg_recall": self.adjust_avg_recall(),
"classes_iou": self.adjust_classes_iou(),
"avg_iou": self.adjust_avg_iou(),
"accuracy": self.adjust_accuracy()
}
"""
对图片的每个通道进行标准化
result = (pixel_value - mean) / std
images: 输入的图像, [batch_size, channels, height, width]
返回值
标准化后的张量, std: [batch_size=1, channels, height, width], mean: [batch_size=1, channels, height, width]
"""
@torch.no_grad()
def normalize_channels(images):
assert len(images.shape) == 4
std_mean_tuple = torch.std_mean(
input=images,
dim=0
)
images = (images - std_mean_tuple[0]) / std_mean_tuple[1]
return images, *std_mean_tuple
if __name__ == "__main__":
pass
# labels = torch.tensor(
# [
# [
# [
# [1, 2, 3, 4],
# [3, 3, 4, 0]
# ]
# ],
# [
# [
# [1, 2, 3, 3],
# [2, 0, 4, 4]
# ]
# ]
# ]
# )
#
# predictions = torch.tensor(
# [
# [
# [
# [1, 4, 3, 2],
# [2, 2, 4, 3]
# ]
# ],
# [
# [
# [1, 4, 4, 2],
# [0, 1, 4, 3]
# ]
# ]
# ]
# )
#
# print(labels.shape)
# print(predictions.shape)
#
# cm = ConfusionMatrix(classes_num=5)
# cm.update(labels, predictions)
# scores = cm.get_scores()
#
# utils.confusion_matrix_scores2table(scores)
#
# utils.avg_confusion_matrix_scores_list(
# [scores, scores]
# )
# utils.confusion_matrix_scores2table(scores)
# data = torch.ones(2, 3, 4, 5).to(device="cuda", dtype=torch.float32)
# print(normalize_channels(data)[0])
# a = np.ones((224, 224, 3))
# print(pil2tensor(a).shape)

317
learning_rate_scheduler.py Normal file
View File

@ -0,0 +1,317 @@
import math
from enum import Enum
import numpy as np
import torch
import torch.optim as optim
class SchedulerType(Enum):
STEP_SCHEDULER = "step",
MULTI_STEP_SCHEDULER = "multi_step",
EXPONENTIAL_SCHEDULER = "exponential",
COSINE_ANNEALING_SCHEDULER = "cosine_annealing",
LINEAR_WARMUP_THEN_POLY_SCHEDULER = "linear_warmup_then_poly"
class StepScheduler:
"""
optimizer: 优化器
step_size: 每间隔多少步就去计算优化器的学习率并将其更新
gamma: lr_(t+1) = lr_(t) * gamma
verbose: 是否跟踪学习率的变化并打印到控制台中默认False(不跟踪)
"""
def __init__(self, optimizer, step_size=30, gamma=0.1, verbose=False):
self.optimizer = optimizer
self.step_size = step_size
self.gamma = gamma
self.verbose = verbose
self.lr_scheduler = optim.lr_scheduler.StepLR(
optimizer=self.optimizer,
step_size=self.step_size,
gamma=self.gamma,
last_epoch=-1,
verbose=self.verbose
)
"""
调用学习率调度器
"""
def step(self):
self.lr_scheduler.step()
"""
获得学习率调度器的状态
"""
def get_state_dict(self):
return self.lr_scheduler.state_dict()
"""
加载学习率调度器的状态字典
"""
def load_state_dict(self, state_dict: dict):
self.lr_scheduler.load_state_dict(state_dict)
class MultiStepScheduler:
"""
optimizer: 优化器
milestones: 列表列表内的数据必须是整数且递增每一个数表示调度器被执行了对应次数后就更新优化器的学习率
gamma: lr_(t+1) = lr_(t) * gamma
verbose: 是否跟踪学习率的变化并打印到控制台中默认False(不跟踪)
"""
def __init__(self, optimizer, milestones, gamma, verbose=False):
self.optimizer = optimizer
self.milestones = milestones
self.gamma = gamma
self.verbose = verbose
self.lr_scheduler = optim.lr_scheduler.MultiStepLR(
optimizer=self.optimizer,
milestones=self.milestones,
gamma=gamma,
last_epoch=-1,
verbose=self.verbose
)
"""
调用学习率调度器
"""
def step(self):
self.lr_scheduler.step()
"""
获得学习率调度器的状态
"""
def get_state_dict(self):
return self.lr_scheduler.state_dict()
"""
加载学习率调度器的状态字典
"""
def load_state_dict(self, state_dict: dict):
self.lr_scheduler.load_state_dict(state_dict)
class ExponentialScheduler:
"""
optimizer: 优化器
gamma: lr_(t+1) = lr_(t) * gamma, 每一次调用优化器的学习率都会更新
verbose: 是否跟踪学习率的变化并打印到控制台中默认False(不跟踪)
"""
def __init__(self, optimizer, gamma=0.95, verbose=False):
self.optimizer = optimizer
self.gamma = gamma
self.verbose = verbose
self.lr_scheduler = optim.lr_scheduler.ExponentialLR(
optimizer=self.optimizer,
gamma=self.gamma,
last_epoch=-1,
verbose=self.verbose
)
"""
调用学习率调度器
"""
def step(self):
self.lr_scheduler.step()
"""
获得学习率调度器的状态
"""
def get_state_dict(self):
return self.lr_scheduler.state_dict()
"""
加载学习率调度器的状态字典
"""
def load_state_dict(self, state_dict: dict):
self.lr_scheduler.load_state_dict(state_dict)
class CosineAnnealingScheduler:
"""
optimizer: 优化器优化器中有一个已经设定的初始学习率这个初始学习率就是调度器能达到的最大学习率(max_lr)
t_max: 周期调度器每被调用2 * t_max优化器的学习率就会从max_lr -> min_lr -> max_lr
min_lr: 最小学习率
verbose: 是否跟踪学习率的变化并打印到控制台中默认False(不跟踪)
"""
def __init__(self, optimizer, t_max=5, min_lr=0, verbose=False):
self.optimizer = optimizer
self.t_max = t_max
self.min_lr = min_lr
self.verbose = verbose
self.lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(
optimizer=self.optimizer,
T_max=self.t_max,
eta_min=self.min_lr,
last_epoch=-1,
verbose=self.verbose
)
"""
调用学习率调度器
"""
def step(self):
self.lr_scheduler.step()
"""
获得学习率调度器的状态
"""
def get_state_dict(self):
return self.lr_scheduler.state_dict()
"""
加载学习率调度器的状态字典
"""
def load_state_dict(self, state_dict: dict):
self.lr_scheduler.load_state_dict(state_dict)
class LinearWarmupThenPolyScheduler:
"""
预热阶段采用Linear之后采用Poly
optimizer: 优化器
warmup_iters: 预热步数
total_iters: 总训练步数
min_lr: 最低学习率
"""
def __init__(self, optimizer, warmup_iters=1500, total_iters=2000, warmup_ratio=1e-6, min_lr=0., power=1.):
self.optimizer = optimizer
self.current_iters = 0
self.warmup_iters = warmup_iters
self.total_iters = total_iters
self.warmup_ration = warmup_ratio
self.min_lr = min_lr
self.power = power
self.base_lr = None
self.regular_lr = None
self.warmup_lr = None
def get_base_lr(self):
return np.array([param_group.setdefault("initial_lr", param_group["lr"]) for param_group in self.optimizer.param_groups])
def get_lr(self):
coeff = (1 - self.current_iters / self.total_iters) ** self.power
return (self.base_lr - np.full_like(self.base_lr, self.min_lr)) * coeff + np.full_like(self.base_lr, self.min_lr)
def get_regular_lr(self):
return self.get_lr()
def get_warmup_lr(self):
k = (1 - self.current_iters / self.warmup_iters) * (1 - self.warmup_ration)
return (1 - k) * self.regular_lr
def update(self):
assert 0 <= self.current_iters < self.total_iters
self.current_iters = self.current_iters + 1
self.base_lr = self.get_base_lr()
self.regular_lr = self.get_regular_lr()
self.warmup_lr = self.get_warmup_lr()
def set_lr(self):
if self.current_iters <= self.warmup_iters:
for idx, param_group in enumerate(self.optimizer.param_groups):
param_group["lr"] = self.warmup_lr[idx]
elif self.current_iters <= self.total_iters:
for idx, param_group in enumerate(self.optimizer.param_groups):
param_group["lr"] = self.regular_lr[idx]
def step(self):
self.update()
self.set_lr()
"""
获取学习率调度器
optimizer: 使用学习率调度器的优化器
scheduler_type: 要获取的调度器的类型
kwargs: 参数字典作用于调度器
需要改变优化器的参数在该方法中调整
"""
def get_lr_scheduler(optimizer: optim, scheduler_type: SchedulerType, kwargs=None):
if kwargs is None:
# 返回默认设置的调度器
if scheduler_type == SchedulerType.STEP_SCHEDULER:
return StepScheduler(
optimizer=optimizer,
step_size=30,
gamma=0.1,
verbose=False
)
elif scheduler_type == SchedulerType.MULTI_STEP_SCHEDULER:
return MultiStepScheduler(
optimizer=optimizer,
milestones=[30, 60, 90],
gamma=0.1,
verbose=False
)
elif scheduler_type == SchedulerType.EXPONENTIAL_SCHEDULER:
return ExponentialScheduler(
optimizer=optimizer,
gamma=0.95,
verbose=False
)
elif scheduler_type == SchedulerType.COSINE_ANNEALING_SCHEDULER:
return CosineAnnealingScheduler(
optimizer=optimizer,
t_max=5,
min_lr=0,
verbose=False
)
elif scheduler_type == SchedulerType.LINEAR_WARMUP_THEN_POLY_SCHEDULER:
return LinearWarmupThenPolyScheduler(
optimizer=optimizer,
warmup_iters=1500,
total_iters=2000,
warmup_ratio=1e-6,
min_lr=0.,
power=1.
)
else:
# 返回自定义设置的调度器
if scheduler_type == SchedulerType.STEP_SCHEDULER:
return StepScheduler(
optimizer=optimizer,
**kwargs
)
elif scheduler_type == SchedulerType.MULTI_STEP_SCHEDULER:
return MultiStepScheduler(
optimizer=optimizer,
**kwargs
)
elif scheduler_type == SchedulerType.EXPONENTIAL_SCHEDULER:
return ExponentialScheduler(
optimizer=optimizer,
**kwargs
)
elif scheduler_type == SchedulerType.COSINE_ANNEALING_SCHEDULER:
return CosineAnnealingScheduler(
optimizer=optimizer,
**kwargs
)
elif scheduler_type == SchedulerType.LINEAR_WARMUP_THEN_POLY_SCHEDULER:
return LinearWarmupThenPolyScheduler(
optimizer=optimizer,
**kwargs
)

28
losses.py Normal file
View File

@ -0,0 +1,28 @@
import torch
import torch.nn as nn
class FocalLoss(nn.Module):
"""
weight: 每一种类别的权重越大说明该类别越重要
[weight_1, weight_2, ...]
len(weight) = classes_num
gamma: 为0表示关闭该参数的影响如果需要使用范围应为(0.5, 10.0)
"""
def __init__(self, weight=None, reduction='mean', gamma=0, eps=1e-7):
super(FocalLoss, self).__init__()
self.gamma = gamma
self.eps = eps
self.ce = torch.nn.CrossEntropyLoss(weight=weight, reduction=reduction)
def forward(self, x, y):
logp = self.ce(x, y)
p = torch.exp(-logp)
loss = (1 - p) ** self.gamma * logp
return loss.mean()
if __name__ == "__main__":
pass

23
main.py Normal file
View File

@ -0,0 +1,23 @@
import yaml
from pathlib import Path
import utils
import torch
if __name__ == "__main__":
model_config = Path("config") / "model.yaml"
with model_config.open("r", encoding="utf-8") as f:
model_config = yaml.load(f, yaml.FullLoader)
# 类别
classes = model_config["classes"]
# 类别对应的语义颜色,按照顺序对应
colors = utils.get_colors(len(classes))
train_config = Path("config") / "train.yaml"
with train_config.open("r", encoding="utf-8") as f:
train_config = yaml.load(f, yaml.FullLoader)
# 类别对应的权重
weight = torch.tensor(train_config["weight"]) if len(train_config["weight"]) != 1 else torch.ones(len(classes))

534
model.py Normal file
View File

@ -0,0 +1,534 @@
import json
import math
import torch.nn as nn
import torch
import bricks
import torch.nn.functional as F
from abc import *
import utils
"""
[batch_size, in_channels, height, width] -> [batch_size, out_channels, height // 4, width // 4]
"""
class StemConv(nn.Module):
def __init__(self, in_channels, out_channels, norm_layer=None):
super(StemConv, self).__init__()
self.proj = nn.Sequential(
bricks.DownSampling(
in_channels=in_channels,
out_channels=out_channels // 2,
kernel_size=(3, 3),
stride=(2, 2),
norm_layer=norm_layer
),
bricks.DownSampling(
in_channels=out_channels // 2,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(2, 2),
norm_layer=norm_layer
),
)
def forward(self, x):
out = self.proj(x)
return out
class MSCA(nn.Module):
def __init__(self, in_channels):
super(MSCA, self).__init__()
self.conv = bricks.DepthwiseConv(
in_channels=in_channels,
kernel_size=(5, 5),
padding=(2, 2),
bias=True
)
self.conv7 = nn.Sequential(
bricks.DepthwiseConv(
in_channels=in_channels,
kernel_size=(1, 7),
padding=(0, 3),
bias=True
),
bricks.DepthwiseConv(
in_channels=in_channels,
kernel_size=(7, 1),
padding=(3, 0),
bias=True
)
)
self.conv11 = nn.Sequential(
bricks.DepthwiseConv(
in_channels=in_channels,
kernel_size=(1, 11),
padding=(0, 5),
bias=True
),
bricks.DepthwiseConv(
in_channels=in_channels,
kernel_size=(11, 1),
padding=(5, 0),
bias=True
)
)
self.conv21 = nn.Sequential(
bricks.DepthwiseConv(
in_channels=in_channels,
kernel_size=(1, 21),
padding=(0, 10),
bias=True
),
bricks.DepthwiseConv(
in_channels=in_channels,
kernel_size=(21, 1),
padding=(10, 0),
bias=True
)
)
self.fc = nn.Conv2d(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=(1, 1)
)
def forward(self, x):
u = x
out = self.conv(x)
branch1 = self.conv7(out)
branch2 = self.conv11(out)
branch3 = self.conv21(out)
out = self.fc(out + branch1 + branch2 + branch3)
out = out * u
return out
class Attention(nn.Module):
def __init__(self, in_channels):
super(Attention, self).__init__()
self.fc1 = nn.Conv2d(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=(1, 1)
)
self.msca = MSCA(in_channels=in_channels)
self.fc2 = nn.Conv2d(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=(1, 1)
)
def forward(self, x):
out = F.gelu(self.fc1(x))
out = self.msca(out)
out = self.fc2(out)
return out
class FFN(nn.Module):
def __init__(self, in_features, hidden_features, out_features, drop_prob=0.):
super(FFN, self).__init__()
self.fc1 = nn.Conv2d(
in_channels=in_features,
out_channels=hidden_features,
kernel_size=(1, 1)
)
self.dw = bricks.DepthwiseConv(
in_channels=hidden_features,
kernel_size=(3, 3),
bias=True
)
self.fc2 = nn.Conv2d(
in_channels=hidden_features,
out_channels=out_features,
kernel_size=(1, 1)
)
self.dropout = nn.Dropout(drop_prob)
def forward(self, x):
out = self.fc1(x)
out = F.gelu(self.dw(out))
out = self.fc2(out)
out = self.dropout(out)
return out
class Block(nn.Module):
def __init__(self, in_channels, expand_ratio, drop_prob=0., drop_path_prob=0.):
super(Block, self).__init__()
self.norm1 = nn.BatchNorm2d(num_features=in_channels)
self.attention = Attention(in_channels=in_channels)
self.drop_path = bricks.DropPath(drop_prob=drop_path_prob if drop_path_prob >= 0 else nn.Identity)
self.norm2 = nn.BatchNorm2d(num_features=in_channels)
self.ffn = FFN(
in_features=in_channels,
hidden_features=int(expand_ratio * in_channels),
out_features=in_channels,
drop_prob=drop_prob
)
layer_scale_init_value = 1e-2
self.layer_scale1 = nn.Parameter(
layer_scale_init_value * torch.ones(in_channels),
requires_grad=True
)
self.layer_scale2 = nn.Parameter(
layer_scale_init_value * torch.ones(in_channels),
requires_grad=True
)
def forward(self, x):
out = self.norm1(x)
out = self.attention(out)
out = x + self.drop_path(
self.layer_scale1.unsqueeze(-1).unsqueeze(-1) * out
)
x = out
out = self.norm2(out)
out = self.ffn(out)
out = x + self.drop_path(
self.layer_scale2.unsqueeze(-1).unsqueeze(-1) * out
)
return out
class Stage(nn.Module):
def __init__(
self,
stage_id,
in_channels,
out_channels,
expand_ratio,
blocks_num,
drop_prob=0.,
drop_path_prob=[0.]
):
super(Stage, self).__init__()
assert blocks_num == len(drop_path_prob)
if stage_id == 0:
self.down_sampling = StemConv(
in_channels=in_channels,
out_channels=out_channels
)
else:
self.down_sampling = bricks.DownSampling(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(2, 2)
)
self.blocks = nn.Sequential(
*[
Block(
in_channels=out_channels,
expand_ratio=expand_ratio,
drop_prob=drop_prob,
drop_path_prob=drop_path_prob[i]
) for i in range(0, blocks_num)
]
)
self.norm = nn.LayerNorm(out_channels)
def forward(self, x):
out = self.down_sampling(x)
out = self.blocks(out)
# [batch_size, channels, height, width] -> [batch_size, channels, height * width]
batch_size, channels, height, width = out.shape
out = out.view(batch_size, channels, -1)
# [batch_size, channels, height * width] -> [batch_size, height * width, channels]
out = torch.transpose(out, -2, -1)
out = self.norm(out)
# [batch_size, height * width, channels] -> [batch_size, channels, height * width]
out = torch.transpose(out, -2, -1)
# [batch_size, channels, height * width] -> [batch_size, channels, height, width]
out = out.view(batch_size, -1, height, width)
return out
class MSCAN(nn.Module):
def __init__(
self,
embed_dims=[3, 32, 64, 160, 256],
expand_ratios=[8, 8, 4, 4],
depths=[3, 3, 5, 2],
drop_prob=0.1,
drop_path_prob=0.1
):
super(MSCAN, self).__init__()
dpr = [x.item() for x in torch.linspace(0, drop_path_prob, sum(depths))]
self.stages = nn.Sequential(
*[
Stage(
stage_id=stage_id,
in_channels=embed_dims[stage_id],
out_channels=embed_dims[stage_id + 1],
expand_ratio=expand_ratios[stage_id],
blocks_num=depths[stage_id],
drop_prob=drop_prob,
drop_path_prob=dpr[sum(depths[: stage_id]): sum(depths[: stage_id + 1])]
) for stage_id in range(0, len(depths))
]
)
def forward(self, x):
out = x
outputs = []
for idx, stage in enumerate(self.stages):
out = stage(out)
if idx != 0:
outputs.append(out)
# outputs: [output_of_stage1, output_of_stage2, output_of_stage3]
# output_of_stage1: [batch_size, embed_dims[2], height / 8, width / 8]
# output_of_stage2: [batch_size, embed_dims[3], height / 16, width / 16]
# output_of_stage3: [batch_size, embed_dims[4], height / 32, width / 32]
return [x, *outputs]
class Hamburger(nn.Module):
def __init__(
self,
hamburger_channels=256,
nmf2d_config=json.dumps(
{
"SPATIAL": True,
"MD_S": 1,
"MD_D": 512,
"MD_R": 64,
"TRAIN_STEPS": 6,
"EVAL_STEPS": 7,
"INV_T": 1,
"ETA": 0.9,
"RAND_INIT": True,
"return_bases": False,
"device": "cuda"
}
)
):
super(Hamburger, self).__init__()
self.ham_in = nn.Sequential(
nn.Conv2d(
in_channels=hamburger_channels,
out_channels=hamburger_channels,
kernel_size=(1, 1)
)
)
self.ham = bricks.NMF2D(args=nmf2d_config)
self.ham_out = nn.Sequential(
nn.Conv2d(
in_channels=hamburger_channels,
out_channels=hamburger_channels,
kernel_size=(1, 1),
bias=False
),
nn.GroupNorm(
num_groups=32,
num_channels=hamburger_channels
)
)
def forward(self, x):
out = self.ham_in(x)
out = self.ham(out)
out = self.ham_out(out)
out = F.relu(x + out)
return out
class LightHamHead(nn.Module):
def __init__(
self,
in_channels_list=[64, 160, 256],
hidden_channels=256,
out_channels=256,
classes_num=150,
drop_prob=0.1,
nmf2d_config=json.dumps(
{
"SPATIAL": True,
"MD_S": 1,
"MD_D": 512,
"MD_R": 64,
"TRAIN_STEPS": 6,
"EVAL_STEPS": 7,
"INV_T": 1,
"ETA": 0.9,
"RAND_INIT": True,
"return_bases": False,
"device": "cuda"
}
)
):
super(LightHamHead, self).__init__()
self.cls_seg = nn.Sequential(
nn.Dropout2d(drop_prob),
nn.Conv2d(
in_channels=out_channels,
out_channels=classes_num,
kernel_size=(1, 1)
)
)
self.squeeze = nn.Sequential(
nn.Conv2d(
in_channels=sum(in_channels_list),
out_channels=hidden_channels,
kernel_size=(1, 1),
bias=False
),
nn.GroupNorm(
num_groups=32,
num_channels=hidden_channels,
),
nn.ReLU()
)
self.hamburger = Hamburger(
hamburger_channels=hidden_channels,
nmf2d_config=nmf2d_config
)
self.align = nn.Sequential(
nn.Conv2d(
in_channels=hidden_channels,
out_channels=out_channels,
kernel_size=(1, 1),
bias=False
),
nn.GroupNorm(
num_groups=32,
num_channels=out_channels
),
nn.ReLU()
)
# inputs: [x, x_1, x_2, x_3]
# x: [batch_size, channels, height, width]
def forward(self, inputs):
assert len(inputs) >= 2
o = inputs[0]
batch_size, _, standard_height, standard_width = inputs[1].shape
standard_shape = (standard_height, standard_width)
inputs = [
F.interpolate(
input=x,
size=standard_shape,
mode="bilinear",
align_corners=False
)
for x in inputs[1:]
]
# x: [batch_size, channels_1 + channels_2 + channels_3, standard_height, standard_width]
x = torch.cat(inputs, dim=1)
# out: [batch_size, channels_1 + channels_2 + channels_3, standard_height, standard_width]
out = self.squeeze(x)
out = self.hamburger(out)
out = self.align(out)
# out: [batch_size, classes_num, standard_height, standard_width]
out = self.cls_seg(out)
_, _, original_height, original_width = o.shape
# out: [batch_size, original_height * original_width, classes_num]
out = F.interpolate(
input=out,
size=(original_height, original_width),
mode="bilinear",
align_corners=False
)
out = torch.transpose(out.view(batch_size, -1, original_height * original_width), -2, -1)
return out
class SegNeXt(nn.Module):
def __init__(
self,
embed_dims=[3, 32, 64, 160, 256],
expand_rations=[8, 8, 4, 4],
depths=[3, 3, 5, 2],
drop_prob_of_encoder=0.1,
drop_path_prob=0.1,
hidden_channels=256,
out_channels=256,
classes_num=150,
drop_prob_of_decoder=0.1,
nmf2d_config=json.dumps(
{
"SPATIAL": True,
"MD_S": 1,
"MD_D": 512,
"MD_R": 64,
"TRAIN_STEPS": 6,
"EVAL_STEPS": 7,
"INV_T": 1,
"ETA": 0.9,
"RAND_INIT": False,
"return_bases": False,
"device": "cuda"
}
)
):
super(SegNeXt, self).__init__()
self.encoder = MSCAN(
embed_dims=embed_dims,
expand_ratios=expand_rations,
depths=depths,
drop_prob=drop_prob_of_encoder,
drop_path_prob=drop_path_prob
)
self.decoder = LightHamHead(
in_channels_list=embed_dims[-3:],
hidden_channels=hidden_channels,
out_channels=out_channels,
classes_num=classes_num,
drop_prob=drop_prob_of_decoder,
nmf2d_config=nmf2d_config
)
def forward(self, x):
out = self.encoder(x)
out = self.decoder(out)
return out

291
model_utils.py Normal file
View File

@ -0,0 +1,291 @@
import copy
import math
import os.path
from pathlib import Path
import torch.nn as nn
import torch
import yaml
import model
import json
import re
import torch.optim as optim
import learning_rate_scheduler
"""
获取模型
@:param train: 是否获取模型进行训练
如果为True使用模型进行训练
如果为False使用模型进行预测
@:param model_config: 模型配置文件路径
@:param train_config: 训练配置文件路径
@:param predict_config: 预测配置文件路径
@:return 实例化模型
"""
def get_model(
train: bool,
model_config=Path("config") / "model.yaml",
train_config=Path("config") / "train.yaml",
predict_config=Path("config") / "predict.yaml"
):
with model_config.open("r", encoding="utf-8") as mcf:
model_config = yaml.load(mcf, Loader=yaml.FullLoader)
nmf2d_config = model_config["nmf2d_config"]
if train:
with train_config.open("r", encoding="utf-8") as tcf:
train_config = yaml.load(tcf, Loader=yaml.FullLoader)
device = train_config["device"]
else:
with predict_config.open("r", encoding="utf-8") as pcf:
predict_config = yaml.load(pcf, Loader=yaml.FullLoader)
device = predict_config["device"]
nmf2d_config["device"] = device
net = model.SegNeXt(
embed_dims=model_config["embed_dims"],
expand_rations=model_config["expand_rations"],
depths=model_config["depths"],
drop_prob_of_encoder=model_config["drop_prob_of_encoder"],
drop_path_prob=model_config["drop_path_prob"],
hidden_channels=model_config["channels_of_hamburger"],
out_channels=model_config["channels_of_hamburger"],
classes_num=len(model_config["classes"]),
drop_prob_of_decoder=model_config["drop_prob_of_decoder"],
nmf2d_config=json.dumps(nmf2d_config)
).to(device=device)
return net
"""
分割模型中的参数
named_parameters: 带名称的参数
regex_expr: 正则表达式(r"")
返回值
target, left
target: 表示符合正则表达式的参数
left: 表示不符合正则表达式的参数
"""
def split_parameters(named_parameters, regex_expr):
target = []
left = []
pattern = re.compile(regex_expr)
for name, param in named_parameters:
if pattern.fullmatch(name):
target.append((name, param))
else:
left.append((name, param))
return target, left
"""
获取优化器
@:param net: 网络模型
@:param optimizer_config: 优化器配置文件路径
@:return 优化器
"""
def get_optimizer(
net,
optimizer_config=Path("config") / "optimizer.yaml"
):
with optimizer_config.open("r", encoding="utf-8") as f:
optimizer_config = yaml.load(f, Loader=yaml.FullLoader)
base_config = optimizer_config["base_config"]
lr = eval(base_config["kwargs"])["lr"]
weight_decay = eval(base_config["kwargs"])["weight_decay"]
parameters_config = optimizer_config["parameters"][1:]
left = net.named_parameters()
parameters = []
for params_config in parameters_config[1:]:
params, left = split_parameters(
named_parameters=left,
regex_expr=r'' + next(iter(params_config.values()))["regex_expr"]
)
params = list(
map(
lambda tp: tp[-1], params
)
)
parameters.append(params)
parameters = [
list(
map(
lambda tp: tp[-1], left
)
),
*parameters
]
params = [
{
'params': param,
'lr': lr * next(iter(params_config.values())).setdefault('lr_mult', 1.0),
'weight_decay': weight_decay * next(iter(params_config.values())).setdefault('weight_decay', 0.)
}
for idx, params_config in enumerate(parameters_config) for param in parameters[idx]
]
optimizer = eval(f"optim.{base_config['optim_type']}")(params, **eval(base_config["kwargs"]))
return optimizer
"""
获取学习率调度器
@:param optimizer: 优化器
@:param lr_scheduler_config: 学习率调度器配置文件路径
@:return 学习率调度器
"""
def get_lr_scheduler(
optimizer,
lr_scheduler_config=Path("config") / "lr_scheduler.yaml"
):
lr_scheduler = None
with lr_scheduler_config.open("r", encoding="utf-8") as f:
lr_scheduler_config = yaml.load(f, yaml.FullLoader)
lr_scheduler = learning_rate_scheduler.get_lr_scheduler(
optimizer=optimizer,
scheduler_type=eval(f"learning_rate_scheduler.SchedulerType.{lr_scheduler_config['scheduler_type']}"),
kwargs=eval(lr_scheduler_config["kwargs"])
)
return lr_scheduler
"""
搜寻模型权重文件和自己创建的模型中第一个不同的参数
left: 元组("模型名称": state_dict)
right: 元组("模型名称": state_dict)
ignore_counts: 忽略不同的数目
列表
{
"row_num": 0,
"模型名称1": "name1",
"模型名称2": "name2"
}
"""
def first_diff(left: tuple, right: tuple, ignore_counts=0):
left = copy.deepcopy(left)
left_name, left_state = left
left_state = list(left_state.keys())
left_ord = 0
right = copy.deepcopy(right)
right_name, right_state = right
right_state = list(right_state.keys())
right_ord = 0
response = None
while left_ord < len(left_state) and right_ord < len(right_state):
left_sign = left_state[left_ord].split(".")[-1]
right_sign = right_state[right_ord].split(".")[-1]
print(f"{left_ord}: {left_state[left_ord]} --> {right_state[right_ord]}")
if left_sign != right_sign:
if ignore_counts != 0:
ignore_counts -= 1
left_ord += 1
right_ord += 1
continue
assert left_ord == right_ord
response = {
"row_num": left_ord,
left_name: left_state[left_ord],
right_name: right_state[right_ord]
}
return response
left_ord += 1
right_ord += 1
while ignore_counts:
left_ord += 1
right_ord += 1
ignore_counts -= 1
if left_ord < len(left_state) and right_ord >= len(right_state):
response = {
"row_num": left_ord,
left_name: left_state[left_ord],
right_name: "None"
}
if left_ord >= len(left_state) and right_ord < len(right_state):
response = {
"row_num": right_ord,
left_name: "None",
right_name: right_state[right_ord]
}
if left_ord >= len(left_state) and right_ord >= len(right_state):
response = {
"row_num": -1,
left_name: "same",
right_name: "same"
}
print(f"{response['row_num']}: {response[left_name]} --> {response[right_name]}")
return response
"""
初始化模型
@:param train:
True表示初始化用来训练的网络
False表示初始化用来预测的网络.
net: 网络模型
optimizer: 优化器
pretrained: 是否加载预训练权重
@:param train_config: 训练配置文件路径
"""
def init_model(
train,
net,
optimizer=None,
train_config=Path("config") / "train.yaml",
predict_config=Path("config") / "predict.yaml"
):
# 初始化权重
for m in net.modules():
if isinstance(m, nn.Linear):
if m.weight is not None:
nn.init.trunc_normal_(m.weight, std=.02)
if m.bias is not None:
nn.init.constant_(m.bias, 0.)
elif isinstance(m, nn.LayerNorm):
if m.weight is not None:
nn.init.constant_(m.weight, 1.0)
if m.bias is not None:
nn.init.constant_(m.bias, 0.)
elif isinstance(m, nn.Conv2d):
fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
fan_out //= m.groups
if m.weight is not None:
nn.init.normal_(m.weight, math.sqrt(2.0 / fan_out))
if m.bias is not None:
nn.init.normal_(m.bias, 0.)
if train:
with train_config.open("r", encoding="utf-8") as tcf:
config = yaml.load(tcf, yaml.FullLoader)
else:
with predict_config.open("r", encoding="utf-8") as pcf:
config = yaml.load(pcf, yaml.FullLoader)
mode = config["mode"]
if mode == -1:
return
checkpoint = torch.load(os.path.sep.join(config["checkpoint"]))
if mode == 0:
for regex_expr in config["regex_expr"]:
checkpoint["state_dict"] = {
tp[0]: tp[-1]
for tp in zip(net.state_dict().keys(), checkpoint["state_dict"].values())
if re.compile(r"" + regex_expr).fullmatch(tp[0])
}
checkpoint["optimizer"]["state"] = dict()
net.load_state_dict(checkpoint["state_dict"], strict=False)
if train:
optimizer.load_state_dict(checkpoint["optimizer"])

115
predict.py Normal file
View File

@ -0,0 +1,115 @@
import os
import numpy as np
import yaml
from PIL import Image
import data_utils
import torch
from pathlib import Path
import model_utils
import utils
from matplotlib import pyplot as plt
"""
预测
@:param net: 网络模型
@:param image: 图像
@:param cls_name: 类别名
@:param predict_config: 预测配置文件路径
@:param model_config: 模型配置文件路径
@:return mask: [image_height, image_width]元素类型为bool
"""
def predict(
net,
image: Image,
cls_name,
predict_config=Path("config") / "predict.yaml",
model_config=Path("config") / "model.yaml"
):
with model_config.open("r", encoding="utf-8") as mcf:
model_config = yaml.load(mcf, Loader=yaml.FullLoader)
classes = model_config["classes"]
with predict_config.open("r", encoding="utf-8") as pcf:
predict_config = yaml.load(pcf, yaml.FullLoader)
device = predict_config["device"]
image = data_utils.pil2tensor(image, device)
if len(image.shape) == 3:
image = torch.unsqueeze(image, dim=0)
batch_size, _, image_height, image_width = image.shape
prediction = data_utils.inv_one_hot_of_outputs(
torch.transpose(
net(image),
-2,
-1
).reshape(batch_size, len(classes), image_height, image_width),
device
)
mask = torch.squeeze(
prediction == utils.get_label_of_cls(classes, cls_name)[0]
)
return mask
"""
将预测结果与原图混合
@:param net: 神经网络模型
@:param image: 原图
@:param mask: predict的对应某一类别的mask
@:param mask: 神经网络的预测结果
@:param classes: 所有类别
@:param cls_name: 类别
@:param colors: 所有类别对应的颜色列表
@:return 混合后的图像
"""
def blend(
image: Image,
mask,
classes,
cls_name,
colors
):
mask = mask.to(device="cpu").numpy()
new_image = np.zeros((*mask.shape, 3), dtype=np.uint8)
new_image[mask] = utils.get_color_of_cls(classes, colors, cls_name)
new_image = Image.fromarray(new_image)
blend_image = Image.blend(image, new_image, 0.5)
return blend_image
"""
展示图像
@:param 需要进行展示的图像图像尺寸应为[height, width, channels=3]
"""
def show_image(image):
plt.imshow(image)
plt.show()
if __name__ == "__main__":
with Path(os.path.sep.join(["config", "model.yaml"])).open("r", encoding="utf-8") as f:
model_config = yaml.load(f, Loader=yaml.FullLoader)
classes = model_config["classes"]
colors = utils.get_colors(len(classes))
image_path = os.path.sep.join([
"dataset", "test", "biomass_image_train_0233_8.jpg"
])
cls_name = "leaf"
net = model_utils.get_model(False)
model_utils.init_model(False, net)
image = Image.open(image_path)
mask = predict(net, image, cls_name)
show_image(blend(image, mask, classes, cls_name, colors))

280
train.py Normal file
View File

@ -0,0 +1,280 @@
import math
import os.path
import numpy as np
import torch
import yaml
from pathlib import Path
from tqdm import tqdm
import utils
import data_utils
import model_utils
from torch.utils.data import DataLoader
import losses
from datetime import datetime
"""
1 epoch train
@:param epochs: 总共的epoch数
@:param epoch: 当前epoch
@:param net: 神经网络模型
@:param train_data_loader: 训练数据加载器
@:param image_size: 图片大小
@:param classes_num: 类别数
@:param loss_fn: 损失函数
@:param lr_scheduler: 学习率调度器
@:param optimizer: 优化器
@:param device: 运行场地
@:return 1 epoch train avg loss, 1 epoch train avg scores
"""
def fit(
epochs,
epoch,
net,
train_data_loader,
image_size,
classes_num,
loss_fn,
lr_scheduler,
optimizer,
device="cuda"
):
matrix = data_utils.ConfusionMatrix(classes_num)
scores_list = []
loss_list = []
progress_bar = tqdm(train_data_loader)
for idx, data in enumerate(progress_bar):
images, labels = data
lr_scheduler.step()
optimizer.zero_grad()
predictions = torch.transpose(net(images), -2, -1).view(-1, classes_num, *image_size)
matrix.update(labels, data_utils.inv_one_hot_of_outputs(predictions, device), device)
scores = matrix.get_scores()
matrix.reset()
scores_list.append(scores)
loss = loss_fn(
predictions,
torch.squeeze(labels, dim=1).to(dtype=torch.long)
)
loss_value = loss.item()
if np.isnan(loss_value):
loss_value = max(loss_list) if len(loss_list) != 0 else 1.0
loss_list.append(loss_value)
loss.backward()
optimizer.step()
progress_bar.set_description(
f"train --> Epoch {epoch + 1} / {epochs}, batch_loss: {loss_value:.3f}, batch_iou: {scores['avg_iou']:.3f}, batch_accuracy: {scores['accuracy']:.3f}"
)
progress_bar.close()
return sum(loss_list) / len(loss_list), utils.avg_confusion_matrix_scores_list(scores_list)
"""
1 epoch train
@:param epochs: 总共的epoch数
@:param epoch: 当前epoch
@:param net: 神经网络模型
@:param train_data_loader: 验证数据加载器
@:param image_size: 图片大小
@:param classes_num: 类别数
@:param loss_fn: 损失函数
@:param device: 运行场地
@:return val avg loss, val avg scores
"""
@torch.no_grad()
def val(
epochs,
epoch,
net,
val_data_loader,
image_size,
classes_num,
loss_fn,
device="cuda"
):
matrix = data_utils.ConfusionMatrix(classes_num)
scores_list = []
loss_list = []
progress_bar = tqdm(val_data_loader)
for idx, data in enumerate(progress_bar):
images, labels = data
predictions = torch.transpose(net(images), -2, -1).view(-1, classes_num, *image_size)
matrix.update(labels, data_utils.inv_one_hot_of_outputs(predictions, device), device)
scores = matrix.get_scores()
matrix.reset()
scores_list.append(scores)
loss = loss_fn(
predictions,
torch.squeeze(labels, dim=1).to(dtype=torch.long)
)
loss_value = loss.item()
if np.isnan(loss_value):
loss_value = max(loss_list) if len(loss_list) != 0 else 1.0
loss_list.append(loss_value)
progress_bar.set_description(
f"val ---> Epoch {epoch + 1} / {epochs}, batch_loss: {loss_value:.3f}, batch_iou: {scores['avg_iou']:.3f}, batch_accuracy: {scores['accuracy']:.3f}"
)
progress_bar.close()
return sum(loss_list) / len(loss_list), utils.avg_confusion_matrix_scores_list(scores_list)
"""
模型训练
net: 网络模型
optimizer: 优化器,
lr_scheduler: 学习率调度器,
weight: 每一类的权重
root_path: 存储训练数据和验证数据的根目录
train_dir_names: 存储训练数据的目录元组形式(images_path, labels_path)
val_dir_names: 存储验证数据的目录, 元组形式(images_path, labels_path)
classes_num: 类别数量
yaml_path: 配置文件路径
"""
def train(
net,
optimizer,
lr_scheduler,
train_config=Path("config") / "train.yaml",
model_config=Path("config") / "model.yaml"
):
with model_config.open("r", encoding="utf-8") as mcf:
model_config = yaml.load(mcf, yaml.FullLoader)
classes_num = len(model_config["classes"])
with train_config.open("r", encoding="utf-8") as tcf:
train_config = yaml.load(tcf, Loader=yaml.Loader)
device = train_config["device"]
epochs = train_config["epochs"]
train_images_dataset = data_utils.Pic2PicDataset(
root=os.path.sep.join(train_config["root"]),
x_dir_name=Path(os.path.sep.join(train_config["train_dir_name"])) / train_config["images_dir_name"],
y_dir_name=Path(os.path.sep.join(train_config["train_dir_name"])) / train_config["labels_dir_name"]
)
train_data_loader = DataLoader(
dataset=train_images_dataset,
batch_size=train_config["batch_size"],
shuffle=True,
num_workers=train_config["workers"]
)
val_images_dataset = data_utils.Pic2PicDataset(
root=os.path.sep.join(train_config["root"]),
x_dir_name=Path(os.path.sep.join(train_config["val_dir_name"])) / train_config["images_dir_name"],
y_dir_name=Path(os.path.sep.join(train_config["val_dir_name"])) / train_config["labels_dir_name"]
)
val_data_loader = DataLoader(
dataset=val_images_dataset,
batch_size=train_config["batch_size"],
shuffle=False,
num_workers=train_config["workers"]
)
image_height, image_width = train_config["image_height"], train_config["image_width"]
weight = torch.tensor(train_config["weight"]) if len(train_config["weight"]) != 1 else torch.ones(classes_num)
loss_fn = losses.FocalLoss(
weight=weight.to(device)
)
max_train_iou, max_val_iou = -np.inf, -np.inf
best_train_model, best_val_model = None, None
for epoch in range(0, epochs):
# 训练
net.train()
train_avg_loss, train_avg_scores = fit(
epochs=epochs,
epoch=epoch,
net=net,
train_data_loader=train_data_loader,
image_size=(image_height, image_width),
classes_num=classes_num,
loss_fn=loss_fn,
lr_scheduler=lr_scheduler,
optimizer=optimizer,
device=device
)
print()
print(utils.confusion_matrix_scores2table(train_avg_scores))
print(f"train_avg_loss: {train_avg_loss:.3f}")
if max_train_iou < train_avg_scores["avg_iou"]:
max_train_iou = train_avg_scores["avg_iou"]
best_train_model = {
"state_dict": net.state_dict(),
"optimizer": optimizer.state_dict(),
"avg_iou": max_train_iou
}
# 验证
if (epoch + 1) % train_config["eval_every_n_epoch"] == 0:
net.eval()
val_avg_loss, val_avg_scores = val(
epochs=epochs,
epoch=epoch,
net=net,
val_data_loader=val_data_loader,
image_size=(image_height, image_width),
classes_num=classes_num,
loss_fn=loss_fn,
device=device
)
print()
print(utils.confusion_matrix_scores2table(val_avg_scores))
print(f"val_avg_loss: {val_avg_loss:.3f}")
if max_val_iou < val_avg_scores["avg_iou"]:
max_val_iou = val_avg_scores["avg_iou"]
best_val_model = {
"state_dict": net.state_dict(),
"optimizer": optimizer.state_dict(),
"avg_iou": max_val_iou
}
m = {
"state_dict": net.state_dict(),
"optimizer": optimizer.state_dict(),
"avg_iou": val_avg_scores["avg_iou"]
}
torch.save(
obj=m,
f=f"{os.path.sep.join(train_config['save_path'])}_Iou{100 * best_val_model['avg_iou']:.3f}_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.pth"
)
torch.save(
obj=best_train_model,
f=f"{os.path.sep.join(train_config['save_path'])}_train_Iou{100 * best_train_model['avg_iou']:.3f}_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.pth"
)
torch.save(
obj=best_train_model,
f=f"{os.path.sep.join(train_config['save_path'])}_val_Iou{100 * best_val_model['avg_iou']:.3f}_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.pth"
)
if __name__ == "__main__":
net = model_utils.get_model(True)
optimizer = model_utils.get_optimizer(net)
lr_scheduler = model_utils.get_lr_scheduler(optimizer=optimizer)
model_utils.init_model(
train=True,
net=net,
optimizer=optimizer
)
train(
net=net,
optimizer=optimizer,
lr_scheduler=lr_scheduler
)

537
utils.py Normal file
View File

@ -0,0 +1,537 @@
import colorsys
import copy
import json
import math
import os
from pathlib import Path
import numpy as np
import torch
from PIL import Image, ImageDraw
from tabulate import tabulate
from torchvision.transforms import transforms, InterpolationMode
"""
生成num种颜色
返回值: color list返回的color list的第一个数值永远是(0, 0, 0)
"""
def get_colors(num: int):
assert num >= 1
if num <= 21:
colors = [
(0, 0, 0),
(128, 0, 0),
(0, 128, 0),
(128, 128, 0),
(0, 0, 128),
(128, 0, 128),
(0, 128, 128),
(128, 128, 128),
(64, 0, 0),
(192, 0, 0),
(64, 128, 0),
(192, 128, 0),
(64, 0, 128),
(192, 0, 128),
(64, 128, 128),
(192, 128, 128),
(0, 64, 0),
(128, 64, 0),
(0, 192, 0),
(128, 192, 0),
(0, 64, 128),
(128, 64, 12)
]
else:
hsv_tuples = [(x / num, 1., 1.) for x in range(0, num - 1)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
if (0, 0, 0) in colors:
colors.remove((0, 0, 0))
colors = [(0, 0, 0), *colors]
return colors
"""
获取某种颜色对应的标签
返回值标签值
"""
def get_label_of_color(colors, color):
low_label = colors.index(color)
return low_label, 255 - low_label
"""
获取某个标签值对应的颜色
返回值元组(r, g, b)
"""
def get_color_of_label(colors, label):
low_label = label if label < 255 - label else 255 - label
return colors[low_label]
"""
获取某种类别对应的标签
返回值标签值
"""
def get_label_of_cls(classes, cls):
low_label = classes.index(cls)
return low_label, 255 - low_label
"""
获取某个标签值对应的类别
返回值类别
"""
def get_cls_of_label(classes, label):
low_label = label if label < 255 - label else 255 - label
return classes[low_label]
"""
获取某种颜色对应的类别
返回值类别
color: (r, g, b)
"""
def get_cls_of_color(classes, colors, color):
idx = colors.index(color)
return get_cls_of_label(classes, idx)
"""
获取某种类别对应的颜色
返回值颜色(r, g, b)
"""
def get_color_of_cls(classes, colors, cls):
idx = classes.index(cls)
return get_color_of_label(colors, idx)
def draw_mask(draw, points, shape_type, label, out_line_value, line_width=10, point_width=5):
points = [tuple(point) for point in points]
if shape_type == 'circle':
assert len(points) == 2, 'Shape of shape_type=circle must have 2 points'
(cx, cy), (px, py) = points
d = math.sqrt((cx - px) ** 2 + (cy - py) ** 2)
draw.ellipse([cx - d, cy - d, cx + d, cy + d], outline=out_line_value, fill=label)
elif shape_type == 'rectangle':
assert len(points) == 2, 'Shape of shape_type=rectangle must have 2 points'
draw.rectangle(points, outline=out_line_value, fill=label)
elif shape_type == 'line':
assert len(points) == 2, 'Shape of shape_type=line must have 2 points'
greater_label = out_line_value
draw.line(xy=points, fill=greater_label, width=line_width)
elif shape_type == 'linestrip':
greater_label = out_line_value
draw.line(xy=points, fill=greater_label, width=line_width)
elif shape_type == 'point':
assert len(points) == 1, 'Shape of shape_type=point must have 1 points'
cx, cy = points[0]
r = point_width
draw.ellipse([cx - r, cy - r, cx + r, cy + r], outline=out_line_value, fill=label)
else:
assert len(points) > 2, 'Polygon must have points more than 2'
draw.polygon(xy=points, outline=out_line_value, fill=label)
"""
负责将labelme的标记转换成(mask)图像
classes: 类别列表
"""
def labelme_json2mask(classes, json_path: str, mask_saved_path: str):
assert classes is not None and classes[0] == "background"
json_path = Path(json_path)
if json_path.exists() and json_path.is_file():
with json_path.open(mode="r") as f:
json_data = json.load(f)
image_height = json_data["imageHeight"]
image_width = json_data["imageWidth"]
image_path = json_data["imagePath"]
shapes = json_data["shapes"]
cls_info_list = []
for shape in shapes:
cls_name_in_json = shape["label"]
assert cls_name_in_json in classes
points = shape["points"]
shape_type = shape["shape_type"]
label_of_cls = classes.index(cls_name_in_json)
cls_info_list.append(
{
"cls_name": cls_name_in_json,
"label": label_of_cls,
"points": points,
"shape_type": shape_type
}
)
mask = np.zeros(shape=(image_height, image_width), dtype=np.uint8)
mask = Image.fromarray(mask)
draw = ImageDraw.Draw(mask)
for cls_info in cls_info_list:
points = cls_info["points"]
shape_type = cls_info["shape_type"]
label = cls_info["label"]
draw_mask(draw, points, shape_type, label, 255 - label)
mask = np.array(mask)
mask = Image.fromarray(mask)
mask.save(str(Path(mask_saved_path) / (str(image_path).split(".")[0] + ".png")))
os.remove(json_path)
"""
将root_path下labelme生成的json文件全部进行处理
1. 有原图匹配的json文件会转换成mask存储到mask_saved_path路径下
2. 没有原图但是有json文件的直接删除该json文件
3. 有原图但是没有json文件的会在mask_saved_path下生成一个纯黑背景图片
root_path: 存储着原图和json文件原图后缀名尽量为.jpg
"""
def convert_labelme_jsons2masks(classes, root_path: str, mask_saved_path: str, original_image_suffix=".jpg"):
assert 0 < len(classes) <= 128
original_images = set(
map(
lambda name: str(name).split(".")[0],
Path(root_path).glob(pattern=f"*{original_image_suffix}")
)
)
json_files = Path(root_path).glob(pattern="*.json")
for json_file in json_files:
filename = str(json_file).split(".")[0]
if filename in original_images:
labelme_json2mask(classes, str(json_file), mask_saved_path)
original_images.remove(filename)
else:
os.remove(json_file)
if len(original_images) != 0:
for image_filename in original_images:
image_path = image_filename + f"{original_image_suffix}"
image = Image.open(image_path)
height, width = image.height, image.width
image.close()
mask = np.zeros((height, width), dtype=np.uint8)
mask = Image.fromarray(mask)
mask.save(str(Path(mask_saved_path) / (os.path.basename(image_filename) + ".png")))
"""
将混淆矩阵得到的尺度(scores)组合成表格形式输出到控制台
scores: 混淆矩阵的尺度(scores)
"""
def confusion_matrix_scores2table(scores):
assert scores is not None and isinstance(scores, dict)
classes = [tp[0] for tp in scores["classes_precision"]]
cls_precision_list = [tp[-1] for tp in scores["classes_precision"]]
cls_recall_list = [tp[-1] for tp in scores["classes_recall"]]
cls_iou_list = [tp[-1] for tp in scores["classes_iou"]]
table1 = tabulate(
tabular_data=np.concatenate(
(
np.asarray(classes).reshape(-1, 1),
np.asarray(cls_precision_list).reshape(-1, 1),
np.asarray(cls_recall_list).reshape(-1, 1),
np.asarray(cls_iou_list).reshape(-1, 1)
), 1
),
headers=["classes", "precision", "recall", "iou"],
tablefmt="grid"
)
avg_precision = scores["avg_precision"]
avg_recall = scores["avg_recall"]
avg_iou = scores["avg_iou"]
accuracy = scores["accuracy"]
table2 = tabulate(
tabular_data=[(avg_precision, avg_recall, avg_iou, accuracy)],
headers=["avg_precision", "avg_recall", "avg_iou", "accuracy"],
tablefmt="grid"
)
table = tabulate(
tabular_data=np.concatenate(
(
np.asarray(["single", "overall"]).reshape(-1, 1),
np.asarray([table1, table2]).reshape(-1, 1)
), 1
),
headers=["table type", "table"],
tablefmt="grid"
)
return table
"""
相加混淆矩阵得到的两个scores
返回值
相加后的混淆矩阵
"""
def sum_2_confusion_matrix_scores(scores_left: dict, scores_right: dict):
scores_left["classes_precision"] = [
(tp[0][0], tp[0][-1] + tp[-1][-1]) for tp in zip(scores_left["classes_precision"], scores_right["classes_precision"])
]
scores_left["classes_recall"] = [
(tp[0][0], tp[0][-1] + tp[-1][-1]) for tp in zip(scores_left["classes_recall"], scores_right["classes_recall"])
]
scores_left["classes_iou"] = [
(tp[0][0], tp[0][-1] + tp[-1][-1]) for tp in zip(scores_left["classes_iou"], scores_right["classes_iou"])
]
scores_left["avg_precision"] = scores_left["avg_precision"] + scores_right["avg_precision"]
scores_left["avg_recall"] = scores_left["avg_recall"] + scores_right["avg_recall"]
scores_left["avg_iou"] = scores_left["avg_iou"] + scores_right["avg_iou"]
scores_left["accuracy"] = scores_left["accuracy"] + scores_right["accuracy"]
return scores_left
"""
将混淆矩阵列表内的scores进行相加
@:param scores_list: 得分列表
@:return 相加后的得分
"""
def sum_confusion_matrix_scores_list(scores_list):
if len(scores_list) == 1:
return scores_list[0]
result = scores_list[0]
for i in range(1, len(scores_list)):
result = sum_2_confusion_matrix_scores(result, scores_list[i])
return result
"""
对混淆矩阵得出的scores_list相加后求平均
返回值
相加后求平均的scores
"""
def avg_confusion_matrix_scores_list(scores_list):
assert scores_list is not None and len(scores_list) >= 1
result = sum_confusion_matrix_scores_list(scores_list)
result["classes_precision"] = [
(tp[0], tp[-1] / len(scores_list)) for tp in result["classes_precision"]
]
result["classes_recall"] = [
(tp[0], tp[-1] / len(scores_list)) for tp in result["classes_recall"]
]
result["classes_iou"] = [
(tp[0], tp[-1] / len(scores_list)) for tp in result["classes_iou"]
]
result["avg_precision"] = result["avg_precision"] / len(scores_list)
result["avg_recall"] = result["avg_recall"] / len(scores_list)
result["avg_iou"] = result["avg_iou"] / len(scores_list)
result["accuracy"] = result["accuracy"] / len(scores_list)
return result
"""
对原始作为x的输入图像进行增强预处理产生相同大小的图片(旋转翻转亮度调整)
ts是pytorch工具包经过该工具包处理后图像如果和原本的不同
就会保存在磁盘上以达到增强数据的目的请先执行该函数之后再对原始数
据图像进行人工标注
root_path目录下的数据只有图片且图片后缀名一致
root_path: 作为x的原始输入图像所在目录
ts: 预处理策略
"""
def augment_raw_images2(
root_path,
ts=transforms.Compose(
[
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.RandomRotation(degrees=30),
transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.3)
]
)
):
image_paths = Path(root_path).glob(pattern="*")
for image_path in image_paths:
counter = 0
image_filename, image_suffix = os.path.splitext(image_path)
image = Image.open(image_path)
image_np = np.array(image)
for transform in ts.transforms:
new_image = transform(Image.fromarray(image_np))
new_image_np = np.array(new_image)
if not np.array_equal(image_np, new_image_np):
new_image_copy = Image.fromarray(new_image_np)
new_image_copy.save(str(Path(f"{image_filename}_{counter}{image_suffix}")))
new_image_copy.close()
counter += 1
new_image.close()
image.close()
"""
对原始作为x的输入图像进行增强预处理产生image_cropped_shape大小的图片
现将图像resize为image_resized_shape大小然后进行1次裁剪和1次随机裁剪裁剪的图像保留下来原始图像放入to_path中
ts是pytorch工具包经过该工具包处理后图像如果和原本的不同
就会保存在磁盘上以达到增强数据的目的请先执行该函数之后再对原始数
据图像进行人工标注
from_path目录下的数据只有图片且图片后缀名一致
from_path: 作为x的原始输入图像所在目录
to_path: 处理后的原始图像放入哪里如果为None就删除原始图像
image_resized_shape: 图像resize之后的大小, image_cropped_shape每个维度必须小于image_resized_shape
image_cropped_shape: 图像裁剪后的大小image_cropped_shape每个维度必须小于image_resized_shape
ts: 预处理策略
"""
def augment_raw_images(
from_path,
to_path="to/path",
image_resized_shape=(256, 256),
image_cropped_shape=(224, 224),
ts=None
):
if ts is None:
ts = transforms.Compose(
[
transforms.Resize(image_resized_shape, interpolation=InterpolationMode.BILINEAR),
transforms.RandomCrop(image_cropped_shape),
transforms.RandomResizedCrop(image_cropped_shape)
]
)
image_paths = Path(from_path).glob("*")
for image_path in image_paths:
counter = 0
image_filename, image_suffix = os.path.splitext(image_path)
with Image.open(image_path) as image:
image = ts.transforms[0](image)
image_copy_np = copy.deepcopy(np.array(image))
for transform in ts.transforms[0:]:
image = transform(image)
image_np = np.array(image)
if not np.array_equal(image_np, image_copy_np):
image.save(str(Path(f"{image_filename}_{counter}{image_suffix}")))
counter = counter + 1
image.close()
image = Image.fromarray(image_copy_np)
if to_path:
Path(image_path).rename(Path(to_path) / f"{os.path.basename(image_path)}")
else:
Path(image_path).unlink()
"""
对验证数据集中的图片进行大小的统一以便其拥有统一的大小可以进行批次训练
from_path: 验证数据集所在的目录
to_path: 原始数据应该转移到哪里
resized_shape: (height, width), resize后的大小
"""
def resize_val_images(from_path, to_path, resized_shape):
image_paths = Path(from_path).glob(pattern="*")
for image_path in image_paths:
original_image = Image.open(image_path)
original_image_np = np.array(original_image)
resized_image = Image.fromarray(original_image_np).resize(resized_shape)
original_image.close()
if not to_path:
Path(image_path).unlink(missing_ok=True)
else:
Path(image_path).rename(Path(to_path) / os.path.basename(image_path))
resized_image.save(image_path)
resized_image.close()
"""
将一张图片按照尺寸裁剪为多张图片
@:param image: 图片
@:param crop_size: 裁剪尺寸为tuple(image_height, image_width)
@:return 裁剪之后的图片列表
"""
def crop_image2images(image: Image, crop_size):
image_np = np.array(image)
image_height, image_width = image_np.shape[:-1]
left_image_height, left_image_width = image_np.shape[:-1]
crop_height, crop_width = crop_size
left_upper = (0, 0)
right_lower = (crop_width, crop_height)
image_list = []
while left_image_width / crop_width >= 1 or left_image_height / crop_height >= 1:
if left_image_width / crop_width >= 1 and left_image_height / crop_height >= 1:
new_image = image.crop((*left_upper, *right_lower))
left_image_width -= crop_width
left_upper = (left_upper[0] + crop_width, left_upper[-1])
right_lower = (right_lower[0] + crop_width, right_lower[-1])
image_list.append(new_image)
elif left_image_height / crop_height >= 1:
left_image_width = image_width
left_image_height -= crop_height
left_upper = (0, image_height - left_image_height)
right_lower = (crop_width, image_height - left_image_height + crop_height)
else:
break
return image_list
"""
将目录下的所有图片进行裁剪
@:param root_path: 图片的目录
@:param to: 原图片应该转移到哪里
@:param crop_size: 裁剪大小, tuple(crop_height, crop_width)
"""
def crop_images2small_images(root_path, to, crop_size):
image_paths = Path(root_path).glob(pattern="*")
for image_path in image_paths:
image = Image.open(image_path)
image_cropped_list = crop_image2images(image, crop_size)
for idx, image_cropped in enumerate(image_cropped_list):
image_cropped.save(
f"_{idx}".join(os.path.splitext(image_path))
)
image_cropped.close()
image.close()
if to is None:
Path(image_path).unlink(missing_ok=True)
else:
Path(image_path).rename(
str(
Path(to) / os.path.basename(image_path)
)
)
"""
判断是否能够多gpu分布式并行运算
"""
def distributed_enabled():
return torch.cuda.is_available() and torch.cuda.device_count() > 1 and torch.__version__ >= "0.4.0"
if __name__ == "__main__":
# crop_images2small_images(
# root_path="dataset/train/images",
# to=None,
# crop_size=(512, 512)
# )
# augment_raw_images2(root_path="dataset/train/images")
crop_images2small_images(
root_path="dataset/test",
to=None,
crop_size=(512, 512)
)
# augment_raw_images2(root_path="dataset/val/images")
# resize_val_images(
# from_path="dataset/test",
# to_path=None,
# resized_shape=(1024, 1024)
# )
# convert_labelme_jsons2masks(
# classes=[
# "background",
# "leaf"
# ],
# root_path="dataset/train/images",
# mask_saved_path="dataset/train/labels"
# )