SegNeXt/data_utils.py

442 lines
14 KiB
Python
Raw Permalink Normal View History

2023-04-07 22:29:25 +08:00
import json
import math
from torch.utils.data import Dataset
from pathlib import Path
from torch.utils.data.dataset import T_co
from PIL import Image
import numpy as np
import torch
from torchvision import transforms
import arguments
import utils
import torch.nn.functional as F
"""
处理labels中1~224的像素即进行如下处理
224 -> 1
223 -> 2
...
labels: 标签集合/模型预测集合[batch_size, channels=1, height, width]
返回值
labels, [batch_size, channels=1, height, width]
"""
@torch.no_grad()
def converge_labels(labels: torch.Tensor, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
assert len(labels.shape) == 4 and labels.shape[1] == 1
labels = labels.to(device)
for num in range(254, 127, -1):
labels[labels == num] = 255 - num
return labels
"""
对labels进行独热编码
classes_num: 编码的类别数量
labels: 标签集合, [batch_size, channels=1, height, width]
返回值独热编码后的矩阵, [batch_size, height * width, classes_num]
"""
@torch.no_grad()
def one_hot(
classes_num: int,
labels: torch.Tensor,
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
assert len(labels.shape) == 4 and labels.shape[1] == 1
labels = labels.to(device)
# (batch_size, channels, height, width) -> (batch_size, channels, height * width)
labels = torch.flatten(labels, start_dim=-2)
# (batch_size, channels, height * width) -> (batch_size, height * width, channels)
labels = torch.transpose(labels, -2, -1)
assert labels.shape[-1] == 1
# (batch_size, height * width, channels) -> (batch_size, height * width)
labels = torch.squeeze(labels, dim=-1).long()
# (batch_size, height * width, classes_num)
one_hot_labels = torch.zeros(*labels.shape, classes_num).to(device)
return torch.scatter(input=one_hot_labels, dim=-1, index=torch.unsqueeze(labels, -1), value=1.)
"""
将模型的输出反独热编码
outputs: [batch_size, classes_num, height, width]
返回值
反独热编码后的张量, [batch_size, 1, height, width]
"""
@torch.no_grad()
def inv_one_hot_of_outputs(
outputs,
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
):
assert len(outputs.shape) == 4
result = torch.argmax(
F.log_softmax(
input=outputs.to(device).permute(0, 2, 3, 1),
dim=-1
),
dim=-1,
keepdim=True
).permute(0, 3, 1, 2)
return result
"""
将PIL读取格式的图片或np转换为tensor格式同时将维度顺序和数量进行转换
返回值[channels, height, width]
"""
@torch.no_grad()
def pil2tensor(pil, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
to_tensor = transforms.ToTensor()
return to_tensor(pil).to(device)
class Pic2PicDataset(Dataset):
"""
root: 数据集存放的目录该目录中存放了数据(x)及其对应的标签(y)
x_dir_name: root下数据(x)所处的目录名
y_dir_name: root下标签(y)所处的目录名
"""
def __init__(self, root: str, x_dir_name="images", y_dir_name="labels", device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
super(Pic2PicDataset, self).__init__()
self.device = device
x_paths = (Path(root) / x_dir_name).glob(pattern="*")
y_paths = (Path(root) / y_dir_name).glob(pattern="*")
self.x2y_paths = list(zip(x_paths, y_paths))
def __len__(self):
return len(self.x2y_paths)
def __getitem__(self, index) -> T_co:
item = self.x2y_paths[index]
x_path, y_path = item
x = Image.open(x_path)
y = Image.open(y_path)
y_np = np.array(y)
y.close()
y = converge_labels(torch.from_numpy(y_np).unsqueeze(0).unsqueeze(0), device=self.device)
return pil2tensor(x, self.device), y.squeeze(0)
class ConfusionMatrix:
def __init__(self, classes_num):
self.classes_num = classes_num
# matrix的维度[classes_num, classes_num]
self.matrix = None
"""
计算混淆矩阵
labels: 真实标签[batch_size, channels=1, height, width]
labels已经经过converge_labels()处理其中的像素值都是类别对应的较小label
predictions: 预测值[batch_size, channels=1, height, width]
predictions也已经经过converge_labels()处理其中的像素值也已经被处理为类别对应的较小label
"""
@torch.no_grad()
def update(self, labels, predictions, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
assert len(labels.shape) == 4 and len(predictions.shape) == 4 and labels.shape[1] == 1 and predictions.shape[1] == 1
if self.matrix is None:
labels = labels.to(device)
predictions = predictions.to(device)
# [batch_size, channels=1, height, width] -> [batch_size, height, width]
labels = torch.squeeze(labels, dim=1)
# [batch_size, channels=1, height, width] -> [batch_size, height, width]
predictions = torch.squeeze(predictions, dim=1)
# mask: [batch_size, height, width]
mask = (labels < self.classes_num) | (predictions < self.classes_num)
# labels_masked: [batch_size, height, width]
labels_masked = labels[mask]
# predictions_masked: [batch_size, height, width]
predictions_masked = predictions[mask]
assert labels_masked.shape == predictions_masked.shape
# matrix: [classes_num, classes_num], all ele is 0
self.matrix = torch.zeros(self.classes_num, self.classes_num, dtype=torch.float32, device=device)
for row in range(0, self.classes_num):
for col in range(0, self.classes_num):
cnt = torch.sum((labels_masked == row) & (predictions_masked == col))
self.matrix[row, col] = cnt
"""
清空混淆矩阵
"""
def reset(self):
self.matrix = None
"""
获取计算出的混淆矩阵
"""
def get_confusion_matrix(self):
assert self.matrix is not None
return self.matrix
"""
计算某一个标签对应的类别的精度
label_of_cls: 类别的标签值
返回值
(cls_name, precision)
"""
@torch.no_grad()
def adjust_cls_precision(self, label_of_cls):
assert self.matrix is not None and 0 <= label_of_cls < self.classes_num
result = (
utils.get_cls_of_label(arguments.classes, label_of_cls),
(self.matrix[label_of_cls, label_of_cls] / torch.sum(self.matrix[:, label_of_cls])).item()
)
return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.)
"""
计算所有类别的精度
返回值
列表, [(cls_name, precision), ...]
"""
@torch.no_grad()
def adjust_classes_precision(self):
cls_precision_list = []
# 0是background(背景)的标签值
for label_of_cls in range(0, self.classes_num):
cls_precision_list.append(self.adjust_cls_precision(label_of_cls))
return cls_precision_list
"""
计算平均预测精度
返回值
precision
"""
@torch.no_grad()
def adjust_avg_precision(self):
assert self.matrix is not None
try:
return math.fsum([tp[-1] for tp in self.adjust_classes_precision()]) / self.classes_num
except ZeroDivisionError as e:
return 0.
"""
计算某一个标签对应的类别的召回率
返回值
(cls_name, recall)
"""
@torch.no_grad()
def adjust_cls_recall(self, label_of_cls):
assert self.matrix is not None and 0 <= label_of_cls < self.classes_num
result = (
utils.get_cls_of_label(arguments.classes, label_of_cls),
(self.matrix[label_of_cls, label_of_cls] / torch.sum(self.matrix[label_of_cls, :])).item()
)
return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.)
"""
计算所有类别的召回率
返回值
列表, [(cls_name, recall), ...]
"""
@torch.no_grad()
def adjust_classes_recall(self):
cls_recall_list = []
# 0是background(背景)的标签值
for label_of_cls in range(0, self.classes_num):
cls_recall_list.append(self.adjust_cls_recall(label_of_cls))
return cls_recall_list
"""
计算平均召回率
返回值
recall
"""
@torch.no_grad()
def adjust_avg_recall(self):
assert self.matrix is not None
try:
return math.fsum([tp[-1] for tp in self.adjust_classes_recall()]) / self.classes_num
except ZeroDivisionError as e:
return 0.
"""
计算准确率
"""
@torch.no_grad()
def adjust_accuracy(self):
assert self.matrix is not None
try:
return (torch.sum(torch.diag(self.matrix)) / torch.sum(self.matrix)).item()
except ZeroDivisionError as e:
return 0.
"""
计算某一个标签对应的类别的iou
返回值
(cls_name, iou)
"""
@torch.no_grad()
def adjust_cls_iou(self, label_of_cls):
assert self.matrix is not None and 0 <= label_of_cls < self.classes_num
result = (
utils.get_cls_of_label(arguments.classes, label_of_cls),
(self.matrix[label_of_cls, label_of_cls] /
(torch.sum(
torch.cat(
[
self.matrix[label_of_cls, :].view(-1),
self.matrix[:, label_of_cls].view(-1)
]
)
) - self.matrix[label_of_cls, label_of_cls])).item()
)
return result if not np.isnan(result[-1]) else (utils.get_cls_of_label(arguments.classes, label_of_cls), 0.)
"""
计算所有类别的iou
返回值
列表, [(cls_name, iou), ...]
"""
@torch.no_grad()
def adjust_classes_iou(self):
cls_iou_list = []
# 0是background(背景)的标签值
for label_of_cls in range(0, self.classes_num):
cls_iou_list.append(self.adjust_cls_iou(label_of_cls))
return cls_iou_list
"""
计算平均iou
返回值
iou
"""
@torch.no_grad()
def adjust_avg_iou(self):
assert self.matrix is not None
try:
return math.fsum([tp[-1] for tp in self.adjust_classes_iou()]) / self.classes_num
except ZeroDivisionError as e:
return 0.
"""
返回评价指标
一个函数全部包括
返回值
字典
{
"classes_precision": [(cls_name, precision), ...],
"avg_precision": precision,
"classes_recall": [(cls_name, recall), ...],
"avg_recall": recall,
"classes_iou": [(cls_name, iou), ...],
"avg_iou": iou,
"accuracy": accuracy
}
"""
@torch.no_grad()
def get_scores(self):
return {
"classes_precision": self.adjust_classes_precision(),
"avg_precision": self.adjust_avg_precision(),
"classes_recall": self.adjust_classes_recall(),
"avg_recall": self.adjust_avg_recall(),
"classes_iou": self.adjust_classes_iou(),
"avg_iou": self.adjust_avg_iou(),
"accuracy": self.adjust_accuracy()
}
"""
对图片的每个通道进行标准化
result = (pixel_value - mean) / std
images: 输入的图像, [batch_size, channels, height, width]
返回值
标准化后的张量, std: [batch_size=1, channels, height, width], mean: [batch_size=1, channels, height, width]
"""
@torch.no_grad()
def normalize_channels(images):
assert len(images.shape) == 4
std_mean_tuple = torch.std_mean(
input=images,
dim=0
)
images = (images - std_mean_tuple[0]) / std_mean_tuple[1]
return images, *std_mean_tuple
if __name__ == "__main__":
pass
# labels = torch.tensor(
# [
# [
# [
# [1, 2, 3, 4],
# [3, 3, 4, 0]
# ]
# ],
# [
# [
# [1, 2, 3, 3],
# [2, 0, 4, 4]
# ]
# ]
# ]
# )
#
# predictions = torch.tensor(
# [
# [
# [
# [1, 4, 3, 2],
# [2, 2, 4, 3]
# ]
# ],
# [
# [
# [1, 4, 4, 2],
# [0, 1, 4, 3]
# ]
# ]
# ]
# )
#
# print(labels.shape)
# print(predictions.shape)
#
# cm = ConfusionMatrix(classes_num=5)
# cm.update(labels, predictions)
# scores = cm.get_scores()
#
# utils.confusion_matrix_scores2table(scores)
#
# utils.avg_confusion_matrix_scores_list(
# [scores, scores]
# )
# utils.confusion_matrix_scores2table(scores)
# data = torch.ones(2, 3, 4, 5).to(device="cuda", dtype=torch.float32)
# print(normalize_channels(data)[0])
# a = np.ones((224, 224, 3))
# print(pil2tensor(a).shape)