From 6cf33e4fd6c652e1356e68b539ed489548c50535 Mon Sep 17 00:00:00 2001 From: sorosoo <115395471+sorosoo@users.noreply.github.com> Date: Fri, 7 Apr 2023 22:29:25 +0800 Subject: [PATCH] Add files via upload --- arguments.py | 8 + bricks.py | 310 +++++++++++++++++++++ data_utils.py | 442 ++++++++++++++++++++++++++++++ learning_rate_scheduler.py | 317 ++++++++++++++++++++++ losses.py | 28 ++ main.py | 23 ++ model.py | 534 ++++++++++++++++++++++++++++++++++++ model_utils.py | 291 ++++++++++++++++++++ predict.py | 115 ++++++++ train.py | 280 +++++++++++++++++++ utils.py | 537 +++++++++++++++++++++++++++++++++++++ 11 files changed, 2885 insertions(+) create mode 100644 arguments.py create mode 100644 bricks.py create mode 100644 data_utils.py create mode 100644 learning_rate_scheduler.py create mode 100644 losses.py create mode 100644 main.py create mode 100644 model.py create mode 100644 model_utils.py create mode 100644 predict.py create mode 100644 train.py create mode 100644 utils.py diff --git a/arguments.py b/arguments.py new file mode 100644 index 0000000..3d6566e --- /dev/null +++ b/arguments.py @@ -0,0 +1,8 @@ +import yaml +from pathlib import Path + +model_config = Path("config") / "model.yaml" +with model_config.open("r", encoding="utf-8") as f: + model_config = yaml.load(f, yaml.FullLoader) + # 类别 + classes = model_config["classes"] \ No newline at end of file diff --git a/bricks.py b/bricks.py new file mode 100644 index 0000000..5347121 --- /dev/null +++ b/bricks.py @@ -0,0 +1,310 @@ +import json +from abc import abstractmethod + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +class DropPath(nn.Module): + def __init__(self, drop_prob=0.): + super(DropPath, self).__init__() + self.drop_prob = drop_prob + + def forward(self, x): + if not self.training or self.drop_prob == 0.: + return x + keep_prob = 1 - self.drop_prob + shape = (x.shape[0],) + (1,) * (x.ndim - 1) + random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) + random_tensor.floor_() + output = x.div(keep_prob) * random_tensor + return output + +""" + 逐层卷积 +""" +class DepthwiseConv(nn.Module): + + """ + in_channels: 输入通道数 + out_channels: 输出通道数 + kernel_size: 卷积核大小,元组类型 + padding: 补充 + stride: 步长 + """ + def __init__(self, in_channels, kernel_size=(3, 3), padding=(1, 1), stride=(1, 1), bias=False): + super(DepthwiseConv, self).__init__() + + self.conv = nn.Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=kernel_size, + padding=padding, + stride=stride, + groups=in_channels, + bias=bias + ) + + def forward(self, x): + out = self.conv(x) + return out + +""" + 逐点卷积 +""" +class PointwiseConv(nn.Module): + + def __init__(self, in_channels, out_channels): + super(PointwiseConv, self).__init__() + + self.conv = nn.Conv2d( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=(1, 1), + stride=(1, 1), + padding=(0, 0) + ) + + def forward(self, x): + out = self.conv(x) + return out + + +""" + 深度可分离卷积 +""" +class DepthwiseSeparableConv(nn.Module): + + def __init__(self, in_channels, out_channels, kernel_size=(3, 3), padding=(1, 1), stride=(1, 1)): + super(DepthwiseSeparableConv, self).__init__() + + self.conv1 = DepthwiseConv( + in_channels=in_channels, + kernel_size=kernel_size, + padding=padding, + stride=stride + ) + + self.conv2 = PointwiseConv( + in_channels=in_channels, + out_channels=out_channels + ) + + def forward(self, x): + out = self.conv1(x) + out = self.conv2(out) + return out + + + +""" + 下采样 + [batch_size, in_channels, height, width] -> [batch_size, out_channels, height // stride, width // stride] +""" +class DownSampling(nn.Module): + + """ + in_channels: 输入通道数 + out_channels: 输出通道数 + kernel_size: 卷积核大小 + stride: 步长 + norm_layer: 正则化层,如果为None,使用BatchNorm + """ + def __init__(self, in_channels, out_channels, kernel_size, stride, norm_layer=None): + super(DownSampling, self).__init__() + + self.conv = nn.Conv2d( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + stride=stride, + padding=(kernel_size[0] // 2, kernel_size[-1] // 2) + ) + + if norm_layer is None: + self.norm = nn.BatchNorm2d(num_features=out_channels) + else: + self.norm = norm_layer + + def forward(self, x): + out = self.conv(x) + out = self.norm(out) + return out + +class _MatrixDecomposition2DBase(nn.Module): + def __init__( + self, + args=json.dumps( + { + "SPATIAL": True, + "MD_S": 1, + "MD_D": 512, + "MD_R": 64, + "TRAIN_STEPS": 6, + "EVAL_STEPS": 7, + "INV_T": 100, + "ETA": 0.9, + "RAND_INIT": True, + "return_bases": False, + "device": "cuda" + } + ) + ): + super(_MatrixDecomposition2DBase, self).__init__() + args: dict = json.loads(args) + for k, v in args.items(): + setattr(self, k, v) + + + @abstractmethod + def _build_bases(self, batch_size): + pass + + @abstractmethod + def local_step(self, x, bases, coef): + pass + + @torch.no_grad() + def local_inference(self, x, bases): + # (batch_size * MD_S, MD_D, N)^T @ (batch_size * MD_S, MD_D, MD_R) -> (batchszie * MD_S, N, MD_R) + coef = torch.bmm(x.transpose(1, 2), bases) + coef = F.softmax(self.INV_T * coef, dim=-1) + + steps = self.TRAIN_STEPS if self.training else self.EVAL_STEPS + for _ in range(steps): + bases, coef = self.local_step(x, bases, coef) + + return bases, coef + + @abstractmethod + def compute_coef(self, x, bases, coef): + pass + + def forward(self, x): + + batch_size, channels, height, width = x.shape + + # (batch_size, channels, height, width) -> (batch_size * MD_S, MD_D, N) + if self.SPATIAL: + self.MD_D = channels // self.MD_S + N = height * width + x = x.view(batch_size * self.MD_S, self.MD_D, N) + else: + self.MD_D = height * width + N = channels // self.MD_S + x = x.view(batch_size * self.MD_S, N, self.MD_D).transpose(1, 2) + + if not self.RAND_INIT and not hasattr(self, 'bases'): + bases = self._build_bases(1) + self.register_buffer('bases', bases) + + # (MD_S, MD_D, MD_R) -> (batch_size * MD_S, MD_D, MD_R) + if self.RAND_INIT: + bases = self._build_bases(batch_size) + else: + bases = self.bases.repeat(batch_size, 1, 1) + + bases, coef = self.local_inference(x, bases) + + # (batch_size * MD_S, N, MD_R) + coef = self.compute_coef(x, bases, coef) + + # (batch_size * MD_S, MD_D, MD_R) @ (batch_size * MD_S, N, MD_R)^T -> (batch_size * MD_S, MD_D, N) + x = torch.bmm(bases, coef.transpose(1, 2)) + + # (batch_size * MD_S, MD_D, N) -> (batch_size, channels, height, width) + if self.SPATIAL: + x = x.view(batch_size, channels, height, width) + else: + x = x.transpose(1, 2).view(batch_size, channels, height, width) + + # (batch_size * height, MD_D, MD_R) -> (batch_size, height, N, MD_D) + bases = bases.view(batch_size, self.MD_S, self.MD_D, self.MD_R) + + if self.return_bases: + return x, bases + return x + + +class NMF2D(_MatrixDecomposition2DBase): + def __init__( + self, + args=json.dumps( + { + "SPATIAL": True, + "MD_S": 1, + "MD_D": 512, + "MD_R": 64, + "TRAIN_STEPS": 6, + "EVAL_STEPS": 7, + "INV_T": 1, + "ETA": 0.9, + "RAND_INIT": True, + "return_bases": False, + "device": "cuda" + } + ) + ): + super(NMF2D, self).__init__(args) + + def _build_bases(self, batch_size): + + bases = torch.rand((batch_size * self.MD_S, self.MD_D, self.MD_R)).to(self.device) + bases = F.normalize(bases, dim=1) + + return bases + + # @torch.no_grad() + def local_step(self, x, bases, coef): + # (batch_size * MD_S, MD_D, N)^T @ (batch_size * MD_S, MD_D, MD_R) -> (batch_size * MD_S, N, MD_R) + numerator = torch.bmm(x.transpose(1, 2), bases) + # (batch_size * MD_S, N, MD_R) @ [(batch_size * MD_S, MD_D, MD_R)^T @ (batch_size * MD_S, MD_D, MD_R)] + # -> (batch_size * MD_S, N, MD_R) + denominator = coef.bmm(bases.transpose(1, 2).bmm(bases)) + # Multiplicative Update + coef = coef * numerator / (denominator + 1e-6) + + # (batch_size * MD_S, MD_D, N) @ (batch_size * MD_S, N, MD_R) -> (batch_size * MD_S, MD_D, MD_R) + numerator = torch.bmm(x, coef) + # (batch_size * MD_S, MD_D, MD_R) @ [(batch_size * MD_S, N, MD_R)^T @ (batch_size * MD_S, N, MD_R)] + # -> (batch_size * MD_S, D, MD_R) + denominator = bases.bmm(coef.transpose(1, 2).bmm(coef)) + # Multiplicative Update + bases = bases * numerator / (denominator + 1e-6) + + return bases, coef + + def compute_coef(self, x, bases, coef): + # (batch_size * MD_S, MD_D, N)^T @ (batch_size * MD_S, MD_D, MD_R) -> (batch_size * MD_S, N, MD_R) + numerator = torch.bmm(x.transpose(1, 2), bases) + # (batch_size * MD_S, N, MD_R) @ (batch_size * MD_S, MD_D, MD_R)^T @ (batch_size * MD_S, MD_D, MD_R) + # -> (batch_size * MD_S, N, MD_R) + denominator = coef.bmm(bases.transpose(1, 2).bmm(bases)) + # multiplication update + coef = coef * numerator / (denominator + 1e-6) + return coef + + + + +if __name__ == "__main__": + a = torch.ones(2, 3, 128, 128).to(device="cuda") + n = NMF2D( + json.dumps( + { + "SPATIAL": True, + "MD_S": 1, + "MD_D": 512, + "MD_R": 16, + "TRAIN_STEPS": 6, + "EVAL_STEPS": 7, + "INV_T": 1, + "ETA": 0.9, + "RAND_INIT": True, + "return_bases": False, + "device": "cuda" + } + ) + ) + print(n(a).shape) \ No newline at end of file diff --git a/data_utils.py b/data_utils.py new file mode 100644 index 0000000..2a838c0 --- /dev/null +++ b/data_utils.py @@ -0,0 +1,442 @@ +import json +import math + +from torch.utils.data import Dataset +from pathlib import Path +from torch.utils.data.dataset import T_co +from PIL import Image +import numpy as np +import torch +from torchvision import transforms +import arguments +import utils +import torch.nn.functional as F + + +""" + 处理labels中1~224的像素,即进行如下处理: + 224 -> 1 + 223 -> 2 + ... + labels: 标签集合/模型预测集合,[batch_size, channels=1, height, width] + + 返回值: + labels, [batch_size, channels=1, height, width] +""" +@torch.no_grad() +def converge_labels(labels: torch.Tensor, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")): + assert len(labels.shape) == 4 and labels.shape[1] == 1 + labels = labels.to(device) + for num in range(254, 127, -1): + labels[labels == num] = 255 - num + return labels + + +""" + 对labels进行独热编码 + classes_num: 编码的类别数量 + labels: 标签集合, [batch_size, channels=1, height, width] + + 返回值:独热编码后的矩阵, [batch_size, height * width, classes_num] +""" +@torch.no_grad() +def one_hot( + classes_num: int, + labels: torch.Tensor, + device=torch.device("cuda" if torch.cuda.is_available() else "cpu")): + assert len(labels.shape) == 4 and labels.shape[1] == 1 + labels = labels.to(device) + # (batch_size, channels, height, width) -> (batch_size, channels, height * width) + labels = torch.flatten(labels, start_dim=-2) + # (batch_size, channels, height * width) -> (batch_size, height * width, channels) + labels = torch.transpose(labels, -2, -1) + assert labels.shape[-1] == 1 + # (batch_size, height * width, channels) -> (batch_size, height * width) + labels = torch.squeeze(labels, dim=-1).long() + # (batch_size, height * width, classes_num) + one_hot_labels = torch.zeros(*labels.shape, classes_num).to(device) + return torch.scatter(input=one_hot_labels, dim=-1, index=torch.unsqueeze(labels, -1), value=1.) + +""" + 将模型的输出反独热编码 + outputs: [batch_size, classes_num, height, width] + + 返回值: + 反独热编码后的张量, [batch_size, 1, height, width] +""" +@torch.no_grad() +def inv_one_hot_of_outputs( + outputs, + device=torch.device("cuda" if torch.cuda.is_available() else "cpu") +): + assert len(outputs.shape) == 4 + + result = torch.argmax( + F.log_softmax( + input=outputs.to(device).permute(0, 2, 3, 1), + dim=-1 + ), + dim=-1, + keepdim=True + ).permute(0, 3, 1, 2) + return result + +""" + 将PIL读取格式的图片或np转换为tensor格式,同时将维度顺序和数量进行转换 + + 返回值:[channels, height, width] +""" +@torch.no_grad() +def pil2tensor(pil, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")): + to_tensor = transforms.ToTensor() + return to_tensor(pil).to(device) + + +class Pic2PicDataset(Dataset): + """ + root: 数据集存放的目录,该目录中存放了数据(x)及其对应的标签(y) + x_dir_name: root下数据(x)所处的目录名 + y_dir_name: root下标签(y)所处的目录名 + + """ + def __init__(self, root: str, x_dir_name="images", y_dir_name="labels", device=torch.device("cuda" if torch.cuda.is_available() else "cpu")): + super(Pic2PicDataset, self).__init__() + + self.device = device + x_paths = (Path(root) / x_dir_name).glob(pattern="*") + y_paths = (Path(root) / y_dir_name).glob(pattern="*") + + self.x2y_paths = list(zip(x_paths, y_paths)) + + + def __len__(self): + return len(self.x2y_paths) + + def __getitem__(self, index) -> T_co: + item = self.x2y_paths[index] + x_path, y_path = item + x = Image.open(x_path) + y = Image.open(y_path) + y_np = np.array(y) + y.close() + y = converge_labels(torch.from_numpy(y_np).unsqueeze(0).unsqueeze(0), device=self.device) + return pil2tensor(x, self.device), y.squeeze(0) + +class ConfusionMatrix: + + def __init__(self, classes_num): + self.classes_num = classes_num + # matrix的维度:[classes_num, classes_num] + self.matrix = None + + + """ + 计算混淆矩阵 + labels: 真实标签,[batch_size, channels=1, height, width] + labels已经经过converge_labels()处理,其中的像素值都是类别对应的较小label + + predictions: 预测值,[batch_size, channels=1, height, width] + predictions也已经经过converge_labels()处理,其中的像素值也已经被处理为类别对应的较小label + """ + @torch.no_grad() + def update(self, labels, predictions, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")): + assert len(labels.shape) == 4 and len(predictions.shape) == 4 and labels.shape[1] == 1 and predictions.shape[1] == 1 + if self.matrix is None: + labels = labels.to(device) + predictions = predictions.to(device) + # [batch_size, channels=1, height, width] -> [batch_size, height, width] + labels = torch.squeeze(labels, dim=1) + # [batch_size, channels=1, height, width] -> [batch_size, height, width] + predictions = torch.squeeze(predictions, dim=1) + # mask: [batch_size, height, width] + mask = (labels < self.classes_num) | (predictions < self.classes_num) + # labels_masked: [batch_size, height, width] + labels_masked = labels[mask] + # predictions_masked: [batch_size, height, width] + predictions_masked = predictions[mask] + assert labels_masked.shape == predictions_masked.shape + + # matrix: [classes_num, classes_num], all ele is 0 + self.matrix = torch.zeros(self.classes_num, self.classes_num, dtype=torch.float32, device=device) + + for row in range(0, self.classes_num): + for col in range(0, self.classes_num): + cnt = torch.sum((labels_masked == row) & (predictions_masked == col)) + self.matrix[row, col] = cnt + + """ + 清空混淆矩阵 + """ + def reset(self): + self.matrix = None + + """ + 获取计算出的混淆矩阵 + """ + def get_confusion_matrix(self): + assert self.matrix is not None + return self.matrix + + """ + 计算某一个标签对应的类别的精度 + + label_of_cls: 类别的标签值 + 返回值: + (cls_name, precision) + """ + @torch.no_grad() + def adjust_cls_precision(self, label_of_cls): + assert self.matrix is not None and 0 <= label_of_cls < self.classes_num + result = ( + utils.get_cls_of_label(arguments.classes, label_of_cls), + (self.matrix[label_of_cls, label_of_cls] / torch.sum(self.matrix[:, label_of_cls])).item() + ) + return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.) + + """ + 计算所有类别的精度 + + 返回值: + 列表, [(cls_name, precision), ...] + """ + @torch.no_grad() + def adjust_classes_precision(self): + cls_precision_list = [] + # 0是background(背景)的标签值 + for label_of_cls in range(0, self.classes_num): + cls_precision_list.append(self.adjust_cls_precision(label_of_cls)) + return cls_precision_list + + + """ + 计算平均预测精度 + + 返回值: + precision + """ + @torch.no_grad() + def adjust_avg_precision(self): + assert self.matrix is not None + try: + return math.fsum([tp[-1] for tp in self.adjust_classes_precision()]) / self.classes_num + except ZeroDivisionError as e: + return 0. + + + """ + 计算某一个标签对应的类别的召回率 + + 返回值: + (cls_name, recall) + """ + @torch.no_grad() + def adjust_cls_recall(self, label_of_cls): + assert self.matrix is not None and 0 <= label_of_cls < self.classes_num + result = ( + utils.get_cls_of_label(arguments.classes, label_of_cls), + (self.matrix[label_of_cls, label_of_cls] / torch.sum(self.matrix[label_of_cls, :])).item() + ) + + return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.) + + + """ + 计算所有类别的召回率 + + 返回值: + 列表, [(cls_name, recall), ...] + """ + @torch.no_grad() + def adjust_classes_recall(self): + cls_recall_list = [] + # 0是background(背景)的标签值 + for label_of_cls in range(0, self.classes_num): + cls_recall_list.append(self.adjust_cls_recall(label_of_cls)) + return cls_recall_list + + + """ + 计算平均召回率 + + 返回值: + recall + """ + @torch.no_grad() + def adjust_avg_recall(self): + assert self.matrix is not None + try: + return math.fsum([tp[-1] for tp in self.adjust_classes_recall()]) / self.classes_num + except ZeroDivisionError as e: + return 0. + + """ + 计算准确率 + """ + @torch.no_grad() + def adjust_accuracy(self): + assert self.matrix is not None + try: + return (torch.sum(torch.diag(self.matrix)) / torch.sum(self.matrix)).item() + except ZeroDivisionError as e: + return 0. + + + """ + 计算某一个标签对应的类别的iou + + 返回值: + (cls_name, iou) + """ + @torch.no_grad() + def adjust_cls_iou(self, label_of_cls): + assert self.matrix is not None and 0 <= label_of_cls < self.classes_num + result = ( + utils.get_cls_of_label(arguments.classes, label_of_cls), + (self.matrix[label_of_cls, label_of_cls] / + (torch.sum( + torch.cat( + [ + self.matrix[label_of_cls, :].view(-1), + self.matrix[:, label_of_cls].view(-1) + ] + ) + ) - self.matrix[label_of_cls, label_of_cls])).item() + ) + return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.) + + + """ + 计算所有类别的iou + + 返回值: + 列表, [(cls_name, iou), ...] + """ + @torch.no_grad() + def adjust_classes_iou(self): + cls_iou_list = [] + # 0是background(背景)的标签值 + for label_of_cls in range(0, self.classes_num): + cls_iou_list.append(self.adjust_cls_iou(label_of_cls)) + return cls_iou_list + + """ + 计算平均iou + + 返回值: + iou + """ + + @torch.no_grad() + def adjust_avg_iou(self): + assert self.matrix is not None + try: + return math.fsum([tp[-1] for tp in self.adjust_classes_iou()]) / self.classes_num + except ZeroDivisionError as e: + return 0. + + """ + 返回评价指标 + 一个函数全部包括 + + 返回值: + 字典 + { + "classes_precision": [(cls_name, precision), ...], + "avg_precision": precision, + "classes_recall": [(cls_name, recall), ...], + "avg_recall": recall, + "classes_iou": [(cls_name, iou), ...], + "avg_iou": iou, + "accuracy": accuracy + } + """ + @torch.no_grad() + def get_scores(self): + return { + "classes_precision": self.adjust_classes_precision(), + "avg_precision": self.adjust_avg_precision(), + "classes_recall": self.adjust_classes_recall(), + "avg_recall": self.adjust_avg_recall(), + "classes_iou": self.adjust_classes_iou(), + "avg_iou": self.adjust_avg_iou(), + "accuracy": self.adjust_accuracy() + } + + +""" + 对图片的每个通道进行标准化 + result = (pixel_value - mean) / std + + images: 输入的图像, [batch_size, channels, height, width] + + 返回值: + 标准化后的张量, std: [batch_size=1, channels, height, width], mean: [batch_size=1, channels, height, width] +""" +@torch.no_grad() +def normalize_channels(images): + assert len(images.shape) == 4 + + std_mean_tuple = torch.std_mean( + input=images, + dim=0 + ) + + images = (images - std_mean_tuple[0]) / std_mean_tuple[1] + + return images, *std_mean_tuple + + +if __name__ == "__main__": + pass + # labels = torch.tensor( + # [ + # [ + # [ + # [1, 2, 3, 4], + # [3, 3, 4, 0] + # ] + # ], + # [ + # [ + # [1, 2, 3, 3], + # [2, 0, 4, 4] + # ] + # ] + # ] + # ) + # + # predictions = torch.tensor( + # [ + # [ + # [ + # [1, 4, 3, 2], + # [2, 2, 4, 3] + # ] + # ], + # [ + # [ + # [1, 4, 4, 2], + # [0, 1, 4, 3] + # ] + # ] + # ] + # ) + # + # print(labels.shape) + # print(predictions.shape) + # + # cm = ConfusionMatrix(classes_num=5) + # cm.update(labels, predictions) + # scores = cm.get_scores() + # + # utils.confusion_matrix_scores2table(scores) + # + # utils.avg_confusion_matrix_scores_list( + # [scores, scores] + # ) + # utils.confusion_matrix_scores2table(scores) + + # data = torch.ones(2, 3, 4, 5).to(device="cuda", dtype=torch.float32) + # print(normalize_channels(data)[0]) + # a = np.ones((224, 224, 3)) + # print(pil2tensor(a).shape) \ No newline at end of file diff --git a/learning_rate_scheduler.py b/learning_rate_scheduler.py new file mode 100644 index 0000000..70aa797 --- /dev/null +++ b/learning_rate_scheduler.py @@ -0,0 +1,317 @@ +import math +from enum import Enum + +import numpy as np +import torch +import torch.optim as optim + + +class SchedulerType(Enum): + STEP_SCHEDULER = "step", + MULTI_STEP_SCHEDULER = "multi_step", + EXPONENTIAL_SCHEDULER = "exponential", + COSINE_ANNEALING_SCHEDULER = "cosine_annealing", + LINEAR_WARMUP_THEN_POLY_SCHEDULER = "linear_warmup_then_poly" + + +class StepScheduler: + """ + optimizer: 优化器 + step_size: 每间隔多少步,就去计算优化器的学习率并将其更新 + gamma: lr_(t+1) = lr_(t) * gamma + verbose: 是否跟踪学习率的变化并打印到控制台中,默认False(不跟踪) + """ + def __init__(self, optimizer, step_size=30, gamma=0.1, verbose=False): + self.optimizer = optimizer + self.step_size = step_size + self.gamma = gamma + self.verbose = verbose + self.lr_scheduler = optim.lr_scheduler.StepLR( + optimizer=self.optimizer, + step_size=self.step_size, + gamma=self.gamma, + last_epoch=-1, + verbose=self.verbose + ) + + + """ + 调用学习率调度器 + """ + def step(self): + self.lr_scheduler.step() + + + + """ + 获得学习率调度器的状态 + """ + def get_state_dict(self): + return self.lr_scheduler.state_dict() + + """ + 加载学习率调度器的状态字典 + """ + def load_state_dict(self, state_dict: dict): + self.lr_scheduler.load_state_dict(state_dict) + + +class MultiStepScheduler: + """ + optimizer: 优化器 + milestones: 列表,列表内的数据必须是整数且递增,每一个数表示调度器被执行了对应次数后,就更新优化器的学习率 + gamma: lr_(t+1) = lr_(t) * gamma + verbose: 是否跟踪学习率的变化并打印到控制台中,默认False(不跟踪) + """ + def __init__(self, optimizer, milestones, gamma, verbose=False): + self.optimizer = optimizer + self.milestones = milestones + self.gamma = gamma + self.verbose = verbose + self.lr_scheduler = optim.lr_scheduler.MultiStepLR( + optimizer=self.optimizer, + milestones=self.milestones, + gamma=gamma, + last_epoch=-1, + verbose=self.verbose + ) + + """ + 调用学习率调度器 + """ + def step(self): + self.lr_scheduler.step() + + + """ + 获得学习率调度器的状态 + """ + def get_state_dict(self): + return self.lr_scheduler.state_dict() + + + """ + 加载学习率调度器的状态字典 + """ + def load_state_dict(self, state_dict: dict): + self.lr_scheduler.load_state_dict(state_dict) + + +class ExponentialScheduler: + + """ + optimizer: 优化器 + gamma: lr_(t+1) = lr_(t) * gamma, 每一次调用,优化器的学习率都会更新 + verbose: 是否跟踪学习率的变化并打印到控制台中,默认False(不跟踪) + """ + def __init__(self, optimizer, gamma=0.95, verbose=False): + self.optimizer = optimizer + self.gamma = gamma + self.verbose = verbose + self.lr_scheduler = optim.lr_scheduler.ExponentialLR( + optimizer=self.optimizer, + gamma=self.gamma, + last_epoch=-1, + verbose=self.verbose + ) + + """ + 调用学习率调度器 + """ + + def step(self): + self.lr_scheduler.step() + + """ + 获得学习率调度器的状态 + """ + + def get_state_dict(self): + return self.lr_scheduler.state_dict() + + """ + 加载学习率调度器的状态字典 + """ + + def load_state_dict(self, state_dict: dict): + self.lr_scheduler.load_state_dict(state_dict) + + +class CosineAnnealingScheduler: + + """ + optimizer: 优化器,优化器中有一个已经设定的初始学习率,这个初始学习率就是调度器能达到的最大学习率(max_lr) + t_max: 周期,调度器每被调用2 * t_max,优化器的学习率就会从max_lr -> min_lr -> max_lr + min_lr: 最小学习率 + verbose: 是否跟踪学习率的变化并打印到控制台中,默认False(不跟踪) + """ + def __init__(self, optimizer, t_max=5, min_lr=0, verbose=False): + self.optimizer = optimizer + self.t_max = t_max + self.min_lr = min_lr + self.verbose = verbose + self.lr_scheduler = optim.lr_scheduler.CosineAnnealingLR( + optimizer=self.optimizer, + T_max=self.t_max, + eta_min=self.min_lr, + last_epoch=-1, + verbose=self.verbose + ) + + """ + 调用学习率调度器 + """ + def step(self): + self.lr_scheduler.step() + + + """ + 获得学习率调度器的状态 + """ + def get_state_dict(self): + return self.lr_scheduler.state_dict() + + + """ + 加载学习率调度器的状态字典 + """ + def load_state_dict(self, state_dict: dict): + self.lr_scheduler.load_state_dict(state_dict) + +class LinearWarmupThenPolyScheduler: + + """ + 预热阶段采用Linear,之后采用Poly + optimizer: 优化器 + warmup_iters: 预热步数 + total_iters: 总训练步数 + min_lr: 最低学习率 + """ + def __init__(self, optimizer, warmup_iters=1500, total_iters=2000, warmup_ratio=1e-6, min_lr=0., power=1.): + self.optimizer = optimizer + self.current_iters = 0 + self.warmup_iters = warmup_iters + self.total_iters = total_iters + self.warmup_ration = warmup_ratio + self.min_lr = min_lr + self.power = power + + self.base_lr = None + self.regular_lr = None + self.warmup_lr = None + + def get_base_lr(self): + return np.array([param_group.setdefault("initial_lr", param_group["lr"]) for param_group in self.optimizer.param_groups]) + + def get_lr(self): + coeff = (1 - self.current_iters / self.total_iters) ** self.power + return (self.base_lr - np.full_like(self.base_lr, self.min_lr)) * coeff + np.full_like(self.base_lr, self.min_lr) + + def get_regular_lr(self): + return self.get_lr() + + def get_warmup_lr(self): + k = (1 - self.current_iters / self.warmup_iters) * (1 - self.warmup_ration) + return (1 - k) * self.regular_lr + + def update(self): + assert 0 <= self.current_iters < self.total_iters + self.current_iters = self.current_iters + 1 + self.base_lr = self.get_base_lr() + self.regular_lr = self.get_regular_lr() + self.warmup_lr = self.get_warmup_lr() + + def set_lr(self): + if self.current_iters <= self.warmup_iters: + for idx, param_group in enumerate(self.optimizer.param_groups): + param_group["lr"] = self.warmup_lr[idx] + elif self.current_iters <= self.total_iters: + for idx, param_group in enumerate(self.optimizer.param_groups): + param_group["lr"] = self.regular_lr[idx] + + def step(self): + self.update() + self.set_lr() + + + + + + + + + +""" + 获取学习率调度器 + optimizer: 使用学习率调度器的优化器 + scheduler_type: 要获取的调度器的类型 + kwargs: 参数字典,作用于调度器 + + 需要改变优化器的参数,在该方法中调整 +""" +def get_lr_scheduler(optimizer: optim, scheduler_type: SchedulerType, kwargs=None): + if kwargs is None: + # 返回默认设置的调度器 + if scheduler_type == SchedulerType.STEP_SCHEDULER: + return StepScheduler( + optimizer=optimizer, + step_size=30, + gamma=0.1, + verbose=False + ) + elif scheduler_type == SchedulerType.MULTI_STEP_SCHEDULER: + return MultiStepScheduler( + optimizer=optimizer, + milestones=[30, 60, 90], + gamma=0.1, + verbose=False + ) + elif scheduler_type == SchedulerType.EXPONENTIAL_SCHEDULER: + return ExponentialScheduler( + optimizer=optimizer, + gamma=0.95, + verbose=False + ) + elif scheduler_type == SchedulerType.COSINE_ANNEALING_SCHEDULER: + return CosineAnnealingScheduler( + optimizer=optimizer, + t_max=5, + min_lr=0, + verbose=False + ) + elif scheduler_type == SchedulerType.LINEAR_WARMUP_THEN_POLY_SCHEDULER: + return LinearWarmupThenPolyScheduler( + optimizer=optimizer, + warmup_iters=1500, + total_iters=2000, + warmup_ratio=1e-6, + min_lr=0., + power=1. + ) + else: + # 返回自定义设置的调度器 + if scheduler_type == SchedulerType.STEP_SCHEDULER: + return StepScheduler( + optimizer=optimizer, + **kwargs + ) + elif scheduler_type == SchedulerType.MULTI_STEP_SCHEDULER: + return MultiStepScheduler( + optimizer=optimizer, + **kwargs + ) + elif scheduler_type == SchedulerType.EXPONENTIAL_SCHEDULER: + return ExponentialScheduler( + optimizer=optimizer, + **kwargs + ) + elif scheduler_type == SchedulerType.COSINE_ANNEALING_SCHEDULER: + return CosineAnnealingScheduler( + optimizer=optimizer, + **kwargs + ) + elif scheduler_type == SchedulerType.LINEAR_WARMUP_THEN_POLY_SCHEDULER: + return LinearWarmupThenPolyScheduler( + optimizer=optimizer, + **kwargs + ) \ No newline at end of file diff --git a/losses.py b/losses.py new file mode 100644 index 0000000..717b3ff --- /dev/null +++ b/losses.py @@ -0,0 +1,28 @@ +import torch +import torch.nn as nn + + +class FocalLoss(nn.Module): + """ + weight: 每一种类别的权重,越大,说明该类别越重要 + [weight_1, weight_2, ...] + len(weight) = classes_num + gamma: 为0表示关闭该参数的影响,如果需要使用,范围应为(0.5, 10.0) + """ + def __init__(self, weight=None, reduction='mean', gamma=0, eps=1e-7): + super(FocalLoss, self).__init__() + self.gamma = gamma + self.eps = eps + self.ce = torch.nn.CrossEntropyLoss(weight=weight, reduction=reduction) + + def forward(self, x, y): + logp = self.ce(x, y) + p = torch.exp(-logp) + loss = (1 - p) ** self.gamma * logp + return loss.mean() + + + + +if __name__ == "__main__": + pass \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..f447e37 --- /dev/null +++ b/main.py @@ -0,0 +1,23 @@ +import yaml +from pathlib import Path +import utils +import torch + +if __name__ == "__main__": + model_config = Path("config") / "model.yaml" + with model_config.open("r", encoding="utf-8") as f: + model_config = yaml.load(f, yaml.FullLoader) + + # 类别 + classes = model_config["classes"] + + # 类别对应的语义颜色,按照顺序对应 + colors = utils.get_colors(len(classes)) + + + train_config = Path("config") / "train.yaml" + with train_config.open("r", encoding="utf-8") as f: + train_config = yaml.load(f, yaml.FullLoader) + + # 类别对应的权重 + weight = torch.tensor(train_config["weight"]) if len(train_config["weight"]) != 1 else torch.ones(len(classes)) \ No newline at end of file diff --git a/model.py b/model.py new file mode 100644 index 0000000..96c738a --- /dev/null +++ b/model.py @@ -0,0 +1,534 @@ +import json +import math + +import torch.nn as nn +import torch +import bricks +import torch.nn.functional as F +from abc import * +import utils + +""" + [batch_size, in_channels, height, width] -> [batch_size, out_channels, height // 4, width // 4] +""" +class StemConv(nn.Module): + + def __init__(self, in_channels, out_channels, norm_layer=None): + super(StemConv, self).__init__() + + self.proj = nn.Sequential( + bricks.DownSampling( + in_channels=in_channels, + out_channels=out_channels // 2, + kernel_size=(3, 3), + stride=(2, 2), + norm_layer=norm_layer + ), + bricks.DownSampling( + in_channels=out_channels // 2, + out_channels=out_channels, + kernel_size=(3, 3), + stride=(2, 2), + norm_layer=norm_layer + ), + ) + + def forward(self, x): + out = self.proj(x) + return out + + +class MSCA(nn.Module): + + def __init__(self, in_channels): + super(MSCA, self).__init__() + + self.conv = bricks.DepthwiseConv( + in_channels=in_channels, + kernel_size=(5, 5), + padding=(2, 2), + bias=True + ) + + + self.conv7 = nn.Sequential( + bricks.DepthwiseConv( + in_channels=in_channels, + kernel_size=(1, 7), + padding=(0, 3), + bias=True + ), + bricks.DepthwiseConv( + in_channels=in_channels, + kernel_size=(7, 1), + padding=(3, 0), + bias=True + ) + ) + + self.conv11 = nn.Sequential( + bricks.DepthwiseConv( + in_channels=in_channels, + kernel_size=(1, 11), + padding=(0, 5), + bias=True + ), + bricks.DepthwiseConv( + in_channels=in_channels, + kernel_size=(11, 1), + padding=(5, 0), + bias=True + ) + ) + + self.conv21 = nn.Sequential( + bricks.DepthwiseConv( + in_channels=in_channels, + kernel_size=(1, 21), + padding=(0, 10), + bias=True + ), + bricks.DepthwiseConv( + in_channels=in_channels, + kernel_size=(21, 1), + padding=(10, 0), + bias=True + ) + ) + + self.fc = nn.Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=(1, 1) + ) + + def forward(self, x): + u = x + out = self.conv(x) + + branch1 = self.conv7(out) + branch2 = self.conv11(out) + branch3 = self.conv21(out) + + out = self.fc(out + branch1 + branch2 + branch3) + out = out * u + return out + + +class Attention(nn.Module): + + def __init__(self, in_channels): + super(Attention, self).__init__() + + self.fc1 = nn.Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=(1, 1) + ) + self.msca = MSCA(in_channels=in_channels) + self.fc2 = nn.Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=(1, 1) + ) + + def forward(self, x): + out = F.gelu(self.fc1(x)) + out = self.msca(out) + out = self.fc2(out) + return out + + +class FFN(nn.Module): + + def __init__(self, in_features, hidden_features, out_features, drop_prob=0.): + super(FFN, self).__init__() + + self.fc1 = nn.Conv2d( + in_channels=in_features, + out_channels=hidden_features, + kernel_size=(1, 1) + ) + self.dw = bricks.DepthwiseConv( + in_channels=hidden_features, + kernel_size=(3, 3), + bias=True + ) + self.fc2 = nn.Conv2d( + in_channels=hidden_features, + out_channels=out_features, + kernel_size=(1, 1) + ) + self.dropout = nn.Dropout(drop_prob) + + def forward(self, x): + out = self.fc1(x) + out = F.gelu(self.dw(out)) + out = self.fc2(out) + out = self.dropout(out) + return out + +class Block(nn.Module): + + def __init__(self, in_channels, expand_ratio, drop_prob=0., drop_path_prob=0.): + super(Block, self).__init__() + + + self.norm1 = nn.BatchNorm2d(num_features=in_channels) + self.attention = Attention(in_channels=in_channels) + self.drop_path = bricks.DropPath(drop_prob=drop_path_prob if drop_path_prob >= 0 else nn.Identity) + self.norm2 = nn.BatchNorm2d(num_features=in_channels) + self.ffn = FFN( + in_features=in_channels, + hidden_features=int(expand_ratio * in_channels), + out_features=in_channels, + drop_prob=drop_prob + ) + + layer_scale_init_value = 1e-2 + self.layer_scale1 = nn.Parameter( + layer_scale_init_value * torch.ones(in_channels), + requires_grad=True + ) + self.layer_scale2 = nn.Parameter( + layer_scale_init_value * torch.ones(in_channels), + requires_grad=True + ) + + def forward(self, x): + out = self.norm1(x) + out = self.attention(out) + out = x + self.drop_path( + self.layer_scale1.unsqueeze(-1).unsqueeze(-1) * out + ) + x = out + + out = self.norm2(out) + out = self.ffn(out) + out = x + self.drop_path( + self.layer_scale2.unsqueeze(-1).unsqueeze(-1) * out + ) + + return out + +class Stage(nn.Module): + + def __init__( + self, + stage_id, + in_channels, + out_channels, + expand_ratio, + blocks_num, + drop_prob=0., + drop_path_prob=[0.] + ): + super(Stage, self).__init__() + + + assert blocks_num == len(drop_path_prob) + + if stage_id == 0: + self.down_sampling = StemConv( + in_channels=in_channels, + out_channels=out_channels + ) + else: + self.down_sampling = bricks.DownSampling( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=(3, 3), + stride=(2, 2) + ) + + self.blocks = nn.Sequential( + *[ + Block( + in_channels=out_channels, + expand_ratio=expand_ratio, + drop_prob=drop_prob, + drop_path_prob=drop_path_prob[i] + ) for i in range(0, blocks_num) + ] + ) + + self.norm = nn.LayerNorm(out_channels) + + def forward(self, x): + out = self.down_sampling(x) + out = self.blocks(out) + # [batch_size, channels, height, width] -> [batch_size, channels, height * width] + batch_size, channels, height, width = out.shape + out = out.view(batch_size, channels, -1) + # [batch_size, channels, height * width] -> [batch_size, height * width, channels] + out = torch.transpose(out, -2, -1) + out = self.norm(out) + + # [batch_size, height * width, channels] -> [batch_size, channels, height * width] + out = torch.transpose(out, -2, -1) + # [batch_size, channels, height * width] -> [batch_size, channels, height, width] + out = out.view(batch_size, -1, height, width) + + return out + +class MSCAN(nn.Module): + + def __init__( + self, + embed_dims=[3, 32, 64, 160, 256], + expand_ratios=[8, 8, 4, 4], + depths=[3, 3, 5, 2], + drop_prob=0.1, + drop_path_prob=0.1 + ): + super(MSCAN, self).__init__() + + dpr = [x.item() for x in torch.linspace(0, drop_path_prob, sum(depths))] + self.stages = nn.Sequential( + *[ + Stage( + stage_id=stage_id, + in_channels=embed_dims[stage_id], + out_channels=embed_dims[stage_id + 1], + expand_ratio=expand_ratios[stage_id], + blocks_num=depths[stage_id], + drop_prob=drop_prob, + drop_path_prob=dpr[sum(depths[: stage_id]): sum(depths[: stage_id + 1])] + ) for stage_id in range(0, len(depths)) + ] + ) + + def forward(self, x): + out = x + outputs = [] + + for idx, stage in enumerate(self.stages): + out = stage(out) + if idx != 0: + outputs.append(out) + + # outputs: [output_of_stage1, output_of_stage2, output_of_stage3] + # output_of_stage1: [batch_size, embed_dims[2], height / 8, width / 8] + # output_of_stage2: [batch_size, embed_dims[3], height / 16, width / 16] + # output_of_stage3: [batch_size, embed_dims[4], height / 32, width / 32] + return [x, *outputs] + +class Hamburger(nn.Module): + + def __init__( + self, + hamburger_channels=256, + nmf2d_config=json.dumps( + { + "SPATIAL": True, + "MD_S": 1, + "MD_D": 512, + "MD_R": 64, + "TRAIN_STEPS": 6, + "EVAL_STEPS": 7, + "INV_T": 1, + "ETA": 0.9, + "RAND_INIT": True, + "return_bases": False, + "device": "cuda" + } + ) + ): + super(Hamburger, self).__init__() + self.ham_in = nn.Sequential( + nn.Conv2d( + in_channels=hamburger_channels, + out_channels=hamburger_channels, + kernel_size=(1, 1) + ) + ) + + self.ham = bricks.NMF2D(args=nmf2d_config) + + self.ham_out = nn.Sequential( + nn.Conv2d( + in_channels=hamburger_channels, + out_channels=hamburger_channels, + kernel_size=(1, 1), + bias=False + ), + nn.GroupNorm( + num_groups=32, + num_channels=hamburger_channels + ) + ) + + def forward(self, x): + out = self.ham_in(x) + out = self.ham(out) + out = self.ham_out(out) + out = F.relu(x + out) + return out + + +class LightHamHead(nn.Module): + + def __init__( + self, + in_channels_list=[64, 160, 256], + hidden_channels=256, + out_channels=256, + classes_num=150, + drop_prob=0.1, + nmf2d_config=json.dumps( + { + "SPATIAL": True, + "MD_S": 1, + "MD_D": 512, + "MD_R": 64, + "TRAIN_STEPS": 6, + "EVAL_STEPS": 7, + "INV_T": 1, + "ETA": 0.9, + "RAND_INIT": True, + "return_bases": False, + "device": "cuda" + } + ) + ): + super(LightHamHead, self).__init__() + + self.cls_seg = nn.Sequential( + nn.Dropout2d(drop_prob), + nn.Conv2d( + in_channels=out_channels, + out_channels=classes_num, + kernel_size=(1, 1) + ) + ) + + self.squeeze = nn.Sequential( + nn.Conv2d( + in_channels=sum(in_channels_list), + out_channels=hidden_channels, + kernel_size=(1, 1), + bias=False + ), + nn.GroupNorm( + num_groups=32, + num_channels=hidden_channels, + ), + nn.ReLU() + ) + + self.hamburger = Hamburger( + hamburger_channels=hidden_channels, + nmf2d_config=nmf2d_config + ) + + self.align = nn.Sequential( + nn.Conv2d( + in_channels=hidden_channels, + out_channels=out_channels, + kernel_size=(1, 1), + bias=False + ), + nn.GroupNorm( + num_groups=32, + num_channels=out_channels + ), + nn.ReLU() + ) + + + + + + # inputs: [x, x_1, x_2, x_3] + # x: [batch_size, channels, height, width] + def forward(self, inputs): + assert len(inputs) >= 2 + o = inputs[0] + batch_size, _, standard_height, standard_width = inputs[1].shape + standard_shape = (standard_height, standard_width) + inputs = [ + F.interpolate( + input=x, + size=standard_shape, + mode="bilinear", + align_corners=False + ) + for x in inputs[1:] + ] + + # x: [batch_size, channels_1 + channels_2 + channels_3, standard_height, standard_width] + x = torch.cat(inputs, dim=1) + + # out: [batch_size, channels_1 + channels_2 + channels_3, standard_height, standard_width] + out = self.squeeze(x) + out = self.hamburger(out) + out = self.align(out) + + # out: [batch_size, classes_num, standard_height, standard_width] + out = self.cls_seg(out) + + _, _, original_height, original_width = o.shape + # out: [batch_size, original_height * original_width, classes_num] + out = F.interpolate( + input=out, + size=(original_height, original_width), + mode="bilinear", + align_corners=False + ) + out = torch.transpose(out.view(batch_size, -1, original_height * original_width), -2, -1) + + return out + + +class SegNeXt(nn.Module): + + def __init__( + self, + embed_dims=[3, 32, 64, 160, 256], + expand_rations=[8, 8, 4, 4], + depths=[3, 3, 5, 2], + drop_prob_of_encoder=0.1, + drop_path_prob=0.1, + hidden_channels=256, + out_channels=256, + classes_num=150, + drop_prob_of_decoder=0.1, + nmf2d_config=json.dumps( + { + "SPATIAL": True, + "MD_S": 1, + "MD_D": 512, + "MD_R": 64, + "TRAIN_STEPS": 6, + "EVAL_STEPS": 7, + "INV_T": 1, + "ETA": 0.9, + "RAND_INIT": False, + "return_bases": False, + "device": "cuda" + } + ) + ): + super(SegNeXt, self).__init__() + + self.encoder = MSCAN( + embed_dims=embed_dims, + expand_ratios=expand_rations, + depths=depths, + drop_prob=drop_prob_of_encoder, + drop_path_prob=drop_path_prob + ) + + self.decoder = LightHamHead( + in_channels_list=embed_dims[-3:], + hidden_channels=hidden_channels, + out_channels=out_channels, + classes_num=classes_num, + drop_prob=drop_prob_of_decoder, + nmf2d_config=nmf2d_config + ) + + def forward(self, x): + out = self.encoder(x) + out = self.decoder(out) + return out \ No newline at end of file diff --git a/model_utils.py b/model_utils.py new file mode 100644 index 0000000..ff657e5 --- /dev/null +++ b/model_utils.py @@ -0,0 +1,291 @@ +import copy +import math +import os.path +from pathlib import Path +import torch.nn as nn +import torch +import yaml +import model +import json +import re +import torch.optim as optim +import learning_rate_scheduler + +""" + 获取模型 + @:param train: 是否获取模型进行训练 + 如果为True,使用模型进行训练; + 如果为False,使用模型进行预测。 + @:param model_config: 模型配置文件路径 + @:param train_config: 训练配置文件路径 + @:param predict_config: 预测配置文件路径 + @:return 实例化模型 +""" +def get_model( + train: bool, + model_config=Path("config") / "model.yaml", + train_config=Path("config") / "train.yaml", + predict_config=Path("config") / "predict.yaml" +): + with model_config.open("r", encoding="utf-8") as mcf: + model_config = yaml.load(mcf, Loader=yaml.FullLoader) + + nmf2d_config = model_config["nmf2d_config"] + if train: + with train_config.open("r", encoding="utf-8") as tcf: + train_config = yaml.load(tcf, Loader=yaml.FullLoader) + device = train_config["device"] + else: + with predict_config.open("r", encoding="utf-8") as pcf: + predict_config = yaml.load(pcf, Loader=yaml.FullLoader) + device = predict_config["device"] + nmf2d_config["device"] = device + + net = model.SegNeXt( + embed_dims=model_config["embed_dims"], + expand_rations=model_config["expand_rations"], + depths=model_config["depths"], + drop_prob_of_encoder=model_config["drop_prob_of_encoder"], + drop_path_prob=model_config["drop_path_prob"], + hidden_channels=model_config["channels_of_hamburger"], + out_channels=model_config["channels_of_hamburger"], + classes_num=len(model_config["classes"]), + drop_prob_of_decoder=model_config["drop_prob_of_decoder"], + nmf2d_config=json.dumps(nmf2d_config) + ).to(device=device) + return net + +""" + 分割模型中的参数 + named_parameters: 带名称的参数 + regex_expr: 正则表达式(r"") + + 返回值: + target, left + target: 表示符合正则表达式的参数 + left: 表示不符合正则表达式的参数 +""" +def split_parameters(named_parameters, regex_expr): + target = [] + left = [] + + pattern = re.compile(regex_expr) + for name, param in named_parameters: + if pattern.fullmatch(name): + target.append((name, param)) + else: + left.append((name, param)) + + return target, left + + +""" + 获取优化器 + @:param net: 网络模型 + @:param optimizer_config: 优化器配置文件路径 + @:return 优化器 +""" +def get_optimizer( + net, + optimizer_config=Path("config") / "optimizer.yaml" +): + with optimizer_config.open("r", encoding="utf-8") as f: + optimizer_config = yaml.load(f, Loader=yaml.FullLoader) + + base_config = optimizer_config["base_config"] + lr = eval(base_config["kwargs"])["lr"] + weight_decay = eval(base_config["kwargs"])["weight_decay"] + + + parameters_config = optimizer_config["parameters"][1:] + left = net.named_parameters() + parameters = [] + + for params_config in parameters_config[1:]: + params, left = split_parameters( + named_parameters=left, + regex_expr=r'' + next(iter(params_config.values()))["regex_expr"] + ) + params = list( + map( + lambda tp: tp[-1], params + ) + ) + parameters.append(params) + + parameters = [ + list( + map( + lambda tp: tp[-1], left + ) + ), + *parameters + ] + params = [ + { + 'params': param, + 'lr': lr * next(iter(params_config.values())).setdefault('lr_mult', 1.0), + 'weight_decay': weight_decay * next(iter(params_config.values())).setdefault('weight_decay', 0.) + } + for idx, params_config in enumerate(parameters_config) for param in parameters[idx] + ] + + optimizer = eval(f"optim.{base_config['optim_type']}")(params, **eval(base_config["kwargs"])) + return optimizer + +""" + 获取学习率调度器 + @:param optimizer: 优化器 + @:param lr_scheduler_config: 学习率调度器配置文件路径 + @:return 学习率调度器 +""" +def get_lr_scheduler( + optimizer, + lr_scheduler_config=Path("config") / "lr_scheduler.yaml" +): + lr_scheduler = None + with lr_scheduler_config.open("r", encoding="utf-8") as f: + lr_scheduler_config = yaml.load(f, yaml.FullLoader) + lr_scheduler = learning_rate_scheduler.get_lr_scheduler( + optimizer=optimizer, + scheduler_type=eval(f"learning_rate_scheduler.SchedulerType.{lr_scheduler_config['scheduler_type']}"), + kwargs=eval(lr_scheduler_config["kwargs"]) + ) + return lr_scheduler + + +""" + 搜寻模型权重文件和自己创建的模型中第一个不同的参数 + left: 元组,("模型名称": state_dict) + right: 元组,("模型名称": state_dict) + ignore_counts: 忽略不同的数目 + 列表: + { + "row_num": 0, + "模型名称1": "name1", + "模型名称2": "name2" + } +""" +def first_diff(left: tuple, right: tuple, ignore_counts=0): + left = copy.deepcopy(left) + left_name, left_state = left + left_state = list(left_state.keys()) + left_ord = 0 + + right = copy.deepcopy(right) + right_name, right_state = right + right_state = list(right_state.keys()) + right_ord = 0 + + response = None + + while left_ord < len(left_state) and right_ord < len(right_state): + left_sign = left_state[left_ord].split(".")[-1] + right_sign = right_state[right_ord].split(".")[-1] + print(f"{left_ord}: {left_state[left_ord]} --> {right_state[right_ord]}") + if left_sign != right_sign: + if ignore_counts != 0: + ignore_counts -= 1 + left_ord += 1 + right_ord += 1 + continue + + assert left_ord == right_ord + response = { + "row_num": left_ord, + left_name: left_state[left_ord], + right_name: right_state[right_ord] + } + return response + + left_ord += 1 + right_ord += 1 + + while ignore_counts: + left_ord += 1 + right_ord += 1 + ignore_counts -= 1 + + if left_ord < len(left_state) and right_ord >= len(right_state): + response = { + "row_num": left_ord, + left_name: left_state[left_ord], + right_name: "None" + } + if left_ord >= len(left_state) and right_ord < len(right_state): + response = { + "row_num": right_ord, + left_name: "None", + right_name: right_state[right_ord] + } + if left_ord >= len(left_state) and right_ord >= len(right_state): + response = { + "row_num": -1, + left_name: "same", + right_name: "same" + } + print(f"{response['row_num']}: {response[left_name]} --> {response[right_name]}") + return response + + +""" + 初始化模型 + @:param train: + True表示,初始化用来训练的网络; + False表示,初始化用来预测的网络. + net: 网络模型 + optimizer: 优化器 + pretrained: 是否加载预训练权重 + @:param train_config: 训练配置文件路径 +""" +def init_model( + train, + net, + optimizer=None, + train_config=Path("config") / "train.yaml", + predict_config=Path("config") / "predict.yaml" +): + # 初始化权重 + for m in net.modules(): + if isinstance(m, nn.Linear): + if m.weight is not None: + nn.init.trunc_normal_(m.weight, std=.02) + if m.bias is not None: + nn.init.constant_(m.bias, 0.) + elif isinstance(m, nn.LayerNorm): + if m.weight is not None: + nn.init.constant_(m.weight, 1.0) + if m.bias is not None: + nn.init.constant_(m.bias, 0.) + elif isinstance(m, nn.Conv2d): + fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels + fan_out //= m.groups + if m.weight is not None: + nn.init.normal_(m.weight, math.sqrt(2.0 / fan_out)) + if m.bias is not None: + nn.init.normal_(m.bias, 0.) + + if train: + with train_config.open("r", encoding="utf-8") as tcf: + config = yaml.load(tcf, yaml.FullLoader) + else: + with predict_config.open("r", encoding="utf-8") as pcf: + config = yaml.load(pcf, yaml.FullLoader) + + mode = config["mode"] + if mode == -1: + return + + checkpoint = torch.load(os.path.sep.join(config["checkpoint"])) + if mode == 0: + for regex_expr in config["regex_expr"]: + checkpoint["state_dict"] = { + tp[0]: tp[-1] + for tp in zip(net.state_dict().keys(), checkpoint["state_dict"].values()) + if re.compile(r"" + regex_expr).fullmatch(tp[0]) + } + checkpoint["optimizer"]["state"] = dict() + + net.load_state_dict(checkpoint["state_dict"], strict=False) + if train: + optimizer.load_state_dict(checkpoint["optimizer"]) \ No newline at end of file diff --git a/predict.py b/predict.py new file mode 100644 index 0000000..0b81328 --- /dev/null +++ b/predict.py @@ -0,0 +1,115 @@ +import os + +import numpy as np +import yaml +from PIL import Image +import data_utils +import torch +from pathlib import Path +import model_utils +import utils +from matplotlib import pyplot as plt + + + +""" + 预测 + @:param net: 网络模型 + @:param image: 图像 + @:param cls_name: 类别名 + @:param predict_config: 预测配置文件路径 + @:param model_config: 模型配置文件路径 + + @:return mask: [image_height, image_width],元素类型为bool +""" +def predict( + net, + image: Image, + cls_name, + predict_config=Path("config") / "predict.yaml", + model_config=Path("config") / "model.yaml" +): + with model_config.open("r", encoding="utf-8") as mcf: + model_config = yaml.load(mcf, Loader=yaml.FullLoader) + classes = model_config["classes"] + + with predict_config.open("r", encoding="utf-8") as pcf: + predict_config = yaml.load(pcf, yaml.FullLoader) + device = predict_config["device"] + image = data_utils.pil2tensor(image, device) + if len(image.shape) == 3: + image = torch.unsqueeze(image, dim=0) + batch_size, _, image_height, image_width = image.shape + + prediction = data_utils.inv_one_hot_of_outputs( + torch.transpose( + net(image), + -2, + -1 + ).reshape(batch_size, len(classes), image_height, image_width), + device + ) + + mask = torch.squeeze( + prediction == utils.get_label_of_cls(classes, cls_name)[0] + ) + + return mask + + +""" + 将预测结果与原图混合 + + @:param net: 神经网络模型 + @:param image: 原图 + @:param mask: predict的对应某一类别的mask + @:param mask: 神经网络的预测结果 + @:param classes: 所有类别 + @:param cls_name: 类别 + @:param colors: 所有类别对应的颜色列表 + @:return 混合后的图像 +""" +def blend( + image: Image, + mask, + classes, + cls_name, + colors +): + mask = mask.to(device="cpu").numpy() + new_image = np.zeros((*mask.shape, 3), dtype=np.uint8) + new_image[mask] = utils.get_color_of_cls(classes, colors, cls_name) + new_image = Image.fromarray(new_image) + blend_image = Image.blend(image, new_image, 0.5) + return blend_image + + + +""" + 展示图像 + @:param 需要进行展示的图像,图像尺寸应为[height, width, channels=3] +""" +def show_image(image): + plt.imshow(image) + plt.show() + + + + +if __name__ == "__main__": + with Path(os.path.sep.join(["config", "model.yaml"])).open("r", encoding="utf-8") as f: + model_config = yaml.load(f, Loader=yaml.FullLoader) + classes = model_config["classes"] + + colors = utils.get_colors(len(classes)) + + image_path = os.path.sep.join([ + "dataset", "test", "biomass_image_train_0233_8.jpg" + ]) + + cls_name = "leaf" + net = model_utils.get_model(False) + model_utils.init_model(False, net) + image = Image.open(image_path) + mask = predict(net, image, cls_name) + show_image(blend(image, mask, classes, cls_name, colors)) \ No newline at end of file diff --git a/train.py b/train.py new file mode 100644 index 0000000..44cce08 --- /dev/null +++ b/train.py @@ -0,0 +1,280 @@ +import math +import os.path +import numpy as np +import torch +import yaml +from pathlib import Path +from tqdm import tqdm +import utils +import data_utils +import model_utils +from torch.utils.data import DataLoader +import losses +from datetime import datetime + +""" + 1 epoch train + @:param epochs: 总共的epoch数 + @:param epoch: 当前epoch + @:param net: 神经网络模型 + @:param train_data_loader: 训练数据加载器 + @:param image_size: 图片大小 + @:param classes_num: 类别数 + @:param loss_fn: 损失函数 + @:param lr_scheduler: 学习率调度器 + @:param optimizer: 优化器 + @:param device: 运行场地 + @:return 1 epoch train avg loss, 1 epoch train avg scores +""" +def fit( + epochs, + epoch, + net, + train_data_loader, + image_size, + classes_num, + loss_fn, + lr_scheduler, + optimizer, + device="cuda" +): + matrix = data_utils.ConfusionMatrix(classes_num) + scores_list = [] + loss_list = [] + progress_bar = tqdm(train_data_loader) + for idx, data in enumerate(progress_bar): + images, labels = data + lr_scheduler.step() + optimizer.zero_grad() + predictions = torch.transpose(net(images), -2, -1).view(-1, classes_num, *image_size) + matrix.update(labels, data_utils.inv_one_hot_of_outputs(predictions, device), device) + scores = matrix.get_scores() + matrix.reset() + scores_list.append(scores) + + loss = loss_fn( + predictions, + torch.squeeze(labels, dim=1).to(dtype=torch.long) + ) + loss_value = loss.item() + if np.isnan(loss_value): + loss_value = max(loss_list) if len(loss_list) != 0 else 1.0 + loss_list.append(loss_value) + + loss.backward() + optimizer.step() + + progress_bar.set_description( + f"train --> Epoch {epoch + 1} / {epochs}, batch_loss: {loss_value:.3f}, batch_iou: {scores['avg_iou']:.3f}, batch_accuracy: {scores['accuracy']:.3f}" + ) + progress_bar.close() + return sum(loss_list) / len(loss_list), utils.avg_confusion_matrix_scores_list(scores_list) + +""" + 1 epoch train + @:param epochs: 总共的epoch数 + @:param epoch: 当前epoch + @:param net: 神经网络模型 + @:param train_data_loader: 验证数据加载器 + @:param image_size: 图片大小 + @:param classes_num: 类别数 + @:param loss_fn: 损失函数 + @:param device: 运行场地 + @:return val avg loss, val avg scores +""" +@torch.no_grad() +def val( + epochs, + epoch, + net, + val_data_loader, + image_size, + classes_num, + loss_fn, + device="cuda" +): + matrix = data_utils.ConfusionMatrix(classes_num) + scores_list = [] + loss_list = [] + progress_bar = tqdm(val_data_loader) + for idx, data in enumerate(progress_bar): + images, labels = data + predictions = torch.transpose(net(images), -2, -1).view(-1, classes_num, *image_size) + matrix.update(labels, data_utils.inv_one_hot_of_outputs(predictions, device), device) + scores = matrix.get_scores() + matrix.reset() + scores_list.append(scores) + + loss = loss_fn( + predictions, + torch.squeeze(labels, dim=1).to(dtype=torch.long) + ) + loss_value = loss.item() + if np.isnan(loss_value): + loss_value = max(loss_list) if len(loss_list) != 0 else 1.0 + loss_list.append(loss_value) + + progress_bar.set_description( + f"val ---> Epoch {epoch + 1} / {epochs}, batch_loss: {loss_value:.3f}, batch_iou: {scores['avg_iou']:.3f}, batch_accuracy: {scores['accuracy']:.3f}" + ) + progress_bar.close() + return sum(loss_list) / len(loss_list), utils.avg_confusion_matrix_scores_list(scores_list) + + +""" + 模型训练 + + net: 网络模型 + optimizer: 优化器, + lr_scheduler: 学习率调度器, + weight: 每一类的权重 + root_path: 存储训练数据和验证数据的根目录 + train_dir_names: 存储训练数据的目录,元组形式(images_path, labels_path) + val_dir_names: 存储验证数据的目录, 元组形式(images_path, labels_path) + classes_num: 类别数量 + yaml_path: 配置文件路径 +""" +def train( + net, + optimizer, + lr_scheduler, + train_config=Path("config") / "train.yaml", + model_config=Path("config") / "model.yaml" +): + with model_config.open("r", encoding="utf-8") as mcf: + model_config = yaml.load(mcf, yaml.FullLoader) + classes_num = len(model_config["classes"]) + + with train_config.open("r", encoding="utf-8") as tcf: + train_config = yaml.load(tcf, Loader=yaml.Loader) + device = train_config["device"] + epochs = train_config["epochs"] + + train_images_dataset = data_utils.Pic2PicDataset( + root=os.path.sep.join(train_config["root"]), + x_dir_name=Path(os.path.sep.join(train_config["train_dir_name"])) / train_config["images_dir_name"], + y_dir_name=Path(os.path.sep.join(train_config["train_dir_name"])) / train_config["labels_dir_name"] + ) + train_data_loader = DataLoader( + dataset=train_images_dataset, + batch_size=train_config["batch_size"], + shuffle=True, + num_workers=train_config["workers"] + ) + + val_images_dataset = data_utils.Pic2PicDataset( + root=os.path.sep.join(train_config["root"]), + x_dir_name=Path(os.path.sep.join(train_config["val_dir_name"])) / train_config["images_dir_name"], + y_dir_name=Path(os.path.sep.join(train_config["val_dir_name"])) / train_config["labels_dir_name"] + ) + val_data_loader = DataLoader( + dataset=val_images_dataset, + batch_size=train_config["batch_size"], + shuffle=False, + num_workers=train_config["workers"] + ) + + image_height, image_width = train_config["image_height"], train_config["image_width"] + weight = torch.tensor(train_config["weight"]) if len(train_config["weight"]) != 1 else torch.ones(classes_num) + loss_fn = losses.FocalLoss( + weight=weight.to(device) + ) + + max_train_iou, max_val_iou = -np.inf, -np.inf + best_train_model, best_val_model = None, None + + for epoch in range(0, epochs): + # 训练 + net.train() + train_avg_loss, train_avg_scores = fit( + epochs=epochs, + epoch=epoch, + net=net, + train_data_loader=train_data_loader, + image_size=(image_height, image_width), + classes_num=classes_num, + loss_fn=loss_fn, + lr_scheduler=lr_scheduler, + optimizer=optimizer, + device=device + ) + print() + print(utils.confusion_matrix_scores2table(train_avg_scores)) + print(f"train_avg_loss: {train_avg_loss:.3f}") + + if max_train_iou < train_avg_scores["avg_iou"]: + max_train_iou = train_avg_scores["avg_iou"] + best_train_model = { + "state_dict": net.state_dict(), + "optimizer": optimizer.state_dict(), + "avg_iou": max_train_iou + } + + + + # 验证 + if (epoch + 1) % train_config["eval_every_n_epoch"] == 0: + net.eval() + val_avg_loss, val_avg_scores = val( + epochs=epochs, + epoch=epoch, + net=net, + val_data_loader=val_data_loader, + image_size=(image_height, image_width), + classes_num=classes_num, + loss_fn=loss_fn, + device=device + ) + print() + print(utils.confusion_matrix_scores2table(val_avg_scores)) + print(f"val_avg_loss: {val_avg_loss:.3f}") + + if max_val_iou < val_avg_scores["avg_iou"]: + max_val_iou = val_avg_scores["avg_iou"] + best_val_model = { + "state_dict": net.state_dict(), + "optimizer": optimizer.state_dict(), + "avg_iou": max_val_iou + } + + + + m = { + "state_dict": net.state_dict(), + "optimizer": optimizer.state_dict(), + "avg_iou": val_avg_scores["avg_iou"] + } + + torch.save( + obj=m, + f=f"{os.path.sep.join(train_config['save_path'])}_Iou{100 * best_val_model['avg_iou']:.3f}_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.pth" + ) + + + torch.save( + obj=best_train_model, + f=f"{os.path.sep.join(train_config['save_path'])}_train_Iou{100 * best_train_model['avg_iou']:.3f}_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.pth" + ) + torch.save( + obj=best_train_model, + f=f"{os.path.sep.join(train_config['save_path'])}_val_Iou{100 * best_val_model['avg_iou']:.3f}_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.pth" + ) + + + + + +if __name__ == "__main__": + net = model_utils.get_model(True) + optimizer = model_utils.get_optimizer(net) + lr_scheduler = model_utils.get_lr_scheduler(optimizer=optimizer) + model_utils.init_model( + train=True, + net=net, + optimizer=optimizer + ) + train( + net=net, + optimizer=optimizer, + lr_scheduler=lr_scheduler + ) \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..6363bf1 --- /dev/null +++ b/utils.py @@ -0,0 +1,537 @@ +import colorsys +import copy +import json +import math +import os +from pathlib import Path +import numpy as np +import torch +from PIL import Image, ImageDraw +from tabulate import tabulate +from torchvision.transforms import transforms, InterpolationMode + +""" + 生成num种颜色 + 返回值: color list,返回的color list的第一个数值永远是(0, 0, 0) +""" +def get_colors(num: int): + assert num >= 1 + if num <= 21: + colors = [ + (0, 0, 0), + (128, 0, 0), + (0, 128, 0), + (128, 128, 0), + (0, 0, 128), + (128, 0, 128), + (0, 128, 128), + (128, 128, 128), + (64, 0, 0), + (192, 0, 0), + (64, 128, 0), + (192, 128, 0), + (64, 0, 128), + (192, 0, 128), + (64, 128, 128), + (192, 128, 128), + (0, 64, 0), + (128, 64, 0), + (0, 192, 0), + (128, 192, 0), + (0, 64, 128), + (128, 64, 12) + ] + else: + hsv_tuples = [(x / num, 1., 1.) for x in range(0, num - 1)] + colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples)) + colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors)) + if (0, 0, 0) in colors: + colors.remove((0, 0, 0)) + colors = [(0, 0, 0), *colors] + return colors + +""" + 获取某种颜色对应的标签 + 返回值:标签值 +""" +def get_label_of_color(colors, color): + low_label = colors.index(color) + return low_label, 255 - low_label + +""" + 获取某个标签值对应的颜色 + 返回值:元组(r, g, b) +""" +def get_color_of_label(colors, label): + low_label = label if label < 255 - label else 255 - label + return colors[low_label] + +""" + 获取某种类别对应的标签 + 返回值:标签值 +""" +def get_label_of_cls(classes, cls): + low_label = classes.index(cls) + return low_label, 255 - low_label + +""" + 获取某个标签值对应的类别 + 返回值:类别 +""" +def get_cls_of_label(classes, label): + low_label = label if label < 255 - label else 255 - label + return classes[low_label] + +""" + 获取某种颜色对应的类别 + 返回值:类别 + color: (r, g, b) +""" +def get_cls_of_color(classes, colors, color): + idx = colors.index(color) + return get_cls_of_label(classes, idx) + +""" + 获取某种类别对应的颜色 + 返回值:颜色,(r, g, b) +""" +def get_color_of_cls(classes, colors, cls): + idx = classes.index(cls) + return get_color_of_label(colors, idx) + + +def draw_mask(draw, points, shape_type, label, out_line_value, line_width=10, point_width=5): + points = [tuple(point) for point in points] + if shape_type == 'circle': + assert len(points) == 2, 'Shape of shape_type=circle must have 2 points' + (cx, cy), (px, py) = points + d = math.sqrt((cx - px) ** 2 + (cy - py) ** 2) + draw.ellipse([cx - d, cy - d, cx + d, cy + d], outline=out_line_value, fill=label) + elif shape_type == 'rectangle': + assert len(points) == 2, 'Shape of shape_type=rectangle must have 2 points' + draw.rectangle(points, outline=out_line_value, fill=label) + elif shape_type == 'line': + assert len(points) == 2, 'Shape of shape_type=line must have 2 points' + greater_label = out_line_value + draw.line(xy=points, fill=greater_label, width=line_width) + elif shape_type == 'linestrip': + greater_label = out_line_value + draw.line(xy=points, fill=greater_label, width=line_width) + elif shape_type == 'point': + assert len(points) == 1, 'Shape of shape_type=point must have 1 points' + cx, cy = points[0] + r = point_width + draw.ellipse([cx - r, cy - r, cx + r, cy + r], outline=out_line_value, fill=label) + else: + assert len(points) > 2, 'Polygon must have points more than 2' + draw.polygon(xy=points, outline=out_line_value, fill=label) + +""" + 负责将labelme的标记转换成(mask)图像 + classes: 类别列表 +""" +def labelme_json2mask(classes, json_path: str, mask_saved_path: str): + assert classes is not None and classes[0] == "background" + + json_path = Path(json_path) + if json_path.exists() and json_path.is_file(): + with json_path.open(mode="r") as f: + json_data = json.load(f) + image_height = json_data["imageHeight"] + image_width = json_data["imageWidth"] + image_path = json_data["imagePath"] + shapes = json_data["shapes"] + + cls_info_list = [] + for shape in shapes: + cls_name_in_json = shape["label"] + assert cls_name_in_json in classes + points = shape["points"] + shape_type = shape["shape_type"] + label_of_cls = classes.index(cls_name_in_json) + cls_info_list.append( + { + "cls_name": cls_name_in_json, + "label": label_of_cls, + "points": points, + "shape_type": shape_type + } + ) + + mask = np.zeros(shape=(image_height, image_width), dtype=np.uint8) + mask = Image.fromarray(mask) + draw = ImageDraw.Draw(mask) + for cls_info in cls_info_list: + points = cls_info["points"] + shape_type = cls_info["shape_type"] + label = cls_info["label"] + draw_mask(draw, points, shape_type, label, 255 - label) + + mask = np.array(mask) + mask = Image.fromarray(mask) + mask.save(str(Path(mask_saved_path) / (str(image_path).split(".")[0] + ".png"))) + + os.remove(json_path) + +""" + 将root_path下labelme生成的json文件全部进行处理: + 1. 有原图匹配的json文件,会转换成mask,存储到mask_saved_path路径下 + 2. 没有原图,但是有json文件的,直接删除该json文件 + 3. 有原图,但是没有json文件的,会在mask_saved_path下生成一个纯黑背景图片 + root_path: 存储着原图和json文件,原图后缀名尽量为.jpg +""" +def convert_labelme_jsons2masks(classes, root_path: str, mask_saved_path: str, original_image_suffix=".jpg"): + assert 0 < len(classes) <= 128 + original_images = set( + map( + lambda name: str(name).split(".")[0], + Path(root_path).glob(pattern=f"*{original_image_suffix}") + ) + ) + json_files = Path(root_path).glob(pattern="*.json") + for json_file in json_files: + filename = str(json_file).split(".")[0] + if filename in original_images: + labelme_json2mask(classes, str(json_file), mask_saved_path) + original_images.remove(filename) + else: + os.remove(json_file) + + if len(original_images) != 0: + for image_filename in original_images: + image_path = image_filename + f"{original_image_suffix}" + image = Image.open(image_path) + height, width = image.height, image.width + image.close() + mask = np.zeros((height, width), dtype=np.uint8) + mask = Image.fromarray(mask) + mask.save(str(Path(mask_saved_path) / (os.path.basename(image_filename) + ".png"))) + +""" + 将混淆矩阵得到的尺度(scores)组合成表格形式输出到控制台 + scores: 混淆矩阵的尺度(scores) +""" +def confusion_matrix_scores2table(scores): + assert scores is not None and isinstance(scores, dict) + + classes = [tp[0] for tp in scores["classes_precision"]] + cls_precision_list = [tp[-1] for tp in scores["classes_precision"]] + cls_recall_list = [tp[-1] for tp in scores["classes_recall"]] + cls_iou_list = [tp[-1] for tp in scores["classes_iou"]] + table1 = tabulate( + tabular_data=np.concatenate( + ( + np.asarray(classes).reshape(-1, 1), + np.asarray(cls_precision_list).reshape(-1, 1), + np.asarray(cls_recall_list).reshape(-1, 1), + np.asarray(cls_iou_list).reshape(-1, 1) + ), 1 + ), + headers=["classes", "precision", "recall", "iou"], + tablefmt="grid" + ) + + avg_precision = scores["avg_precision"] + avg_recall = scores["avg_recall"] + avg_iou = scores["avg_iou"] + accuracy = scores["accuracy"] + table2 = tabulate( + tabular_data=[(avg_precision, avg_recall, avg_iou, accuracy)], + headers=["avg_precision", "avg_recall", "avg_iou", "accuracy"], + tablefmt="grid" + ) + + table = tabulate( + tabular_data=np.concatenate( + ( + np.asarray(["single", "overall"]).reshape(-1, 1), + np.asarray([table1, table2]).reshape(-1, 1) + ), 1 + ), + headers=["table type", "table"], + tablefmt="grid" + ) + + return table + + +""" + 相加混淆矩阵得到的两个scores + + 返回值: + 相加后的混淆矩阵 +""" +def sum_2_confusion_matrix_scores(scores_left: dict, scores_right: dict): + scores_left["classes_precision"] = [ + (tp[0][0], tp[0][-1] + tp[-1][-1]) for tp in zip(scores_left["classes_precision"], scores_right["classes_precision"]) + ] + scores_left["classes_recall"] = [ + (tp[0][0], tp[0][-1] + tp[-1][-1]) for tp in zip(scores_left["classes_recall"], scores_right["classes_recall"]) + ] + scores_left["classes_iou"] = [ + (tp[0][0], tp[0][-1] + tp[-1][-1]) for tp in zip(scores_left["classes_iou"], scores_right["classes_iou"]) + ] + + scores_left["avg_precision"] = scores_left["avg_precision"] + scores_right["avg_precision"] + scores_left["avg_recall"] = scores_left["avg_recall"] + scores_right["avg_recall"] + scores_left["avg_iou"] = scores_left["avg_iou"] + scores_right["avg_iou"] + scores_left["accuracy"] = scores_left["accuracy"] + scores_right["accuracy"] + + return scores_left + +""" + 将混淆矩阵列表内的scores进行相加 + @:param scores_list: 得分列表 + @:return 相加后的得分 +""" +def sum_confusion_matrix_scores_list(scores_list): + if len(scores_list) == 1: + return scores_list[0] + + result = scores_list[0] + for i in range(1, len(scores_list)): + result = sum_2_confusion_matrix_scores(result, scores_list[i]) + return result + +""" + 对混淆矩阵得出的scores_list相加后求平均 + + 返回值: + 相加后求平均的scores +""" +def avg_confusion_matrix_scores_list(scores_list): + assert scores_list is not None and len(scores_list) >= 1 + result = sum_confusion_matrix_scores_list(scores_list) + + result["classes_precision"] = [ + (tp[0], tp[-1] / len(scores_list)) for tp in result["classes_precision"] + ] + result["classes_recall"] = [ + (tp[0], tp[-1] / len(scores_list)) for tp in result["classes_recall"] + ] + result["classes_iou"] = [ + (tp[0], tp[-1] / len(scores_list)) for tp in result["classes_iou"] + ] + + result["avg_precision"] = result["avg_precision"] / len(scores_list) + result["avg_recall"] = result["avg_recall"] / len(scores_list) + result["avg_iou"] = result["avg_iou"] / len(scores_list) + result["accuracy"] = result["accuracy"] / len(scores_list) + + return result + +""" + 对原始作为x的输入图像进行增强预处理,产生相同大小的图片(旋转、翻转、亮度调整) + ts是pytorch工具包,经过该工具包处理后图像如果和原本的不同, + 就会保存在磁盘上,以达到增强数据的目的,请先执行该函数之后,再对原始数 + 据图像进行人工标注。 + root_path目录下的数据只有图片,且图片后缀名一致 + + root_path: 作为x的原始输入图像所在目录 + ts: 预处理策略 +""" +def augment_raw_images2( + root_path, + ts=transforms.Compose( + [ + transforms.RandomHorizontalFlip(), + transforms.RandomVerticalFlip(), + transforms.RandomRotation(degrees=30), + transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.3) + ] + ) +): + image_paths = Path(root_path).glob(pattern="*") + for image_path in image_paths: + counter = 0 + image_filename, image_suffix = os.path.splitext(image_path) + + image = Image.open(image_path) + image_np = np.array(image) + + + for transform in ts.transforms: + new_image = transform(Image.fromarray(image_np)) + new_image_np = np.array(new_image) + + if not np.array_equal(image_np, new_image_np): + new_image_copy = Image.fromarray(new_image_np) + new_image_copy.save(str(Path(f"{image_filename}_{counter}{image_suffix}"))) + new_image_copy.close() + counter += 1 + + new_image.close() + + image.close() + + + +""" + 对原始作为x的输入图像进行增强预处理,产生image_cropped_shape大小的图片 + 现将图像resize为image_resized_shape大小,然后进行1次裁剪和1次随机裁剪,裁剪的图像保留下来,原始图像放入to_path中 + ts是pytorch工具包,经过该工具包处理后图像如果和原本的不同, + 就会保存在磁盘上,以达到增强数据的目的,请先执行该函数之后,再对原始数 + 据图像进行人工标注。 + from_path目录下的数据只有图片,且图片后缀名一致 + + from_path: 作为x的原始输入图像所在目录 + to_path: 处理后的原始图像放入哪里,如果为None,就删除原始图像 + image_resized_shape: 图像resize之后的大小, image_cropped_shape每个维度必须小于image_resized_shape + image_cropped_shape: 图像裁剪后的大小,image_cropped_shape每个维度必须小于image_resized_shape + ts: 预处理策略 +""" +def augment_raw_images( + from_path, + to_path="to/path", + image_resized_shape=(256, 256), + image_cropped_shape=(224, 224), + ts=None +): + if ts is None: + ts = transforms.Compose( + [ + transforms.Resize(image_resized_shape, interpolation=InterpolationMode.BILINEAR), + transforms.RandomCrop(image_cropped_shape), + transforms.RandomResizedCrop(image_cropped_shape) + ] + ) + image_paths = Path(from_path).glob("*") + for image_path in image_paths: + counter = 0 + image_filename, image_suffix = os.path.splitext(image_path) + with Image.open(image_path) as image: + image = ts.transforms[0](image) + image_copy_np = copy.deepcopy(np.array(image)) + for transform in ts.transforms[0:]: + image = transform(image) + image_np = np.array(image) + if not np.array_equal(image_np, image_copy_np): + image.save(str(Path(f"{image_filename}_{counter}{image_suffix}"))) + counter = counter + 1 + image.close() + image = Image.fromarray(image_copy_np) + if to_path: + Path(image_path).rename(Path(to_path) / f"{os.path.basename(image_path)}") + else: + Path(image_path).unlink() + + +""" + 对验证数据集中的图片进行大小的统一,以便其拥有统一的大小,可以进行批次训练 + from_path: 验证数据集所在的目录 + to_path: 原始数据应该转移到哪里 + resized_shape: (height, width), resize后的大小 +""" +def resize_val_images(from_path, to_path, resized_shape): + image_paths = Path(from_path).glob(pattern="*") + for image_path in image_paths: + original_image = Image.open(image_path) + original_image_np = np.array(original_image) + resized_image = Image.fromarray(original_image_np).resize(resized_shape) + original_image.close() + + if not to_path: + Path(image_path).unlink(missing_ok=True) + else: + Path(image_path).rename(Path(to_path) / os.path.basename(image_path)) + + resized_image.save(image_path) + resized_image.close() + + +""" + 将一张图片按照尺寸裁剪为多张图片 + @:param image: 图片 + @:param crop_size: 裁剪尺寸,为tuple(image_height, image_width) + + @:return 裁剪之后的图片列表 +""" +def crop_image2images(image: Image, crop_size): + image_np = np.array(image) + image_height, image_width = image_np.shape[:-1] + left_image_height, left_image_width = image_np.shape[:-1] + crop_height, crop_width = crop_size + left_upper = (0, 0) + right_lower = (crop_width, crop_height) + image_list = [] + + while left_image_width / crop_width >= 1 or left_image_height / crop_height >= 1: + if left_image_width / crop_width >= 1 and left_image_height / crop_height >= 1: + new_image = image.crop((*left_upper, *right_lower)) + left_image_width -= crop_width + left_upper = (left_upper[0] + crop_width, left_upper[-1]) + right_lower = (right_lower[0] + crop_width, right_lower[-1]) + image_list.append(new_image) + elif left_image_height / crop_height >= 1: + left_image_width = image_width + left_image_height -= crop_height + left_upper = (0, image_height - left_image_height) + right_lower = (crop_width, image_height - left_image_height + crop_height) + else: + break + return image_list + +""" + 将目录下的所有图片进行裁剪 + @:param root_path: 图片的目录 + @:param to: 原图片应该转移到哪里 + @:param crop_size: 裁剪大小, tuple(crop_height, crop_width) +""" +def crop_images2small_images(root_path, to, crop_size): + image_paths = Path(root_path).glob(pattern="*") + for image_path in image_paths: + image = Image.open(image_path) + image_cropped_list = crop_image2images(image, crop_size) + for idx, image_cropped in enumerate(image_cropped_list): + image_cropped.save( + f"_{idx}".join(os.path.splitext(image_path)) + ) + image_cropped.close() + image.close() + if to is None: + Path(image_path).unlink(missing_ok=True) + else: + Path(image_path).rename( + str( + Path(to) / os.path.basename(image_path) + ) + ) + +""" + 判断是否能够多gpu分布式并行运算 +""" +def distributed_enabled(): + return torch.cuda.is_available() and torch.cuda.device_count() > 1 and torch.__version__ >= "0.4.0" + +if __name__ == "__main__": + # crop_images2small_images( + # root_path="dataset/train/images", + # to=None, + # crop_size=(512, 512) + # ) + + + # augment_raw_images2(root_path="dataset/train/images") + + crop_images2small_images( + root_path="dataset/test", + to=None, + crop_size=(512, 512) + ) + + # augment_raw_images2(root_path="dataset/val/images") + + # resize_val_images( + # from_path="dataset/test", + # to_path=None, + # resized_shape=(1024, 1024) + # ) + + # convert_labelme_jsons2masks( + # classes=[ + # "background", + # "leaf" + # ], + # root_path="dataset/train/images", + # mask_saved_path="dataset/train/labels" + # ) \ No newline at end of file