#!/usr/bin/python
# -*- coding: UTF-8 -*-
import tensorflow as tf
import numpy as np
import math
import cv2
from collections import namedtuple
__author__ = 'Vien'
'''
注释
'''
# SSD参数
SSDParams = namedtuple('SSDParameters', ['img_shape', # 输入图片大小: 300x300
'num_classes', # 类别个数: 20+1(20类+1背景)
'no_annotation_label',
'feature_layers', # 最后detection layer的特征图名字列表
'feature_shapes', # 最后detection layer的特征图size尺寸列表
'anchor_size_bounds', # the down and upper bounds of anchor sizes
'anchor_sizes', # 最后detection layer的anchor size尺寸列表list
'anchor_ratios', # 最后detection layer的anchor的长宽比列表list
'anchor_steps', # list of cell size (pixel size) of layer for detection
'anchor_offset', # 每个anchor的中心点坐标相对cell左上角的偏移量
'normalizations', # list of normalizations of layer for detection
'prior_scaling' #
])
class SSD(object):
# 构造函数
def __init__(self, x=None, is_training=True):
self.is_training = is_training
self.threshold = 0.7 # class score类别分数阈值
self.params = SSDParams(img_shape=(300, 300),
num_classes=2,
no_annotation_label=2,
feature_layers=['block4', 'block7', 'block8', 'block9', 'block10', 'block11'],
feature_shapes=[(38, 38), (19, 19), (10, 10), (5, 5), (3, 3), (1, 1)],
anchor_size_bounds=[0.15, 0.90], # diff from the original paper
anchor_sizes=[],
anchor_ratios=[[2, .5], [2, .5, 3, 1. / 3], [2, .5, 3, 1. / 3],
[2, .5, 3, 1. / 3], [2, .5], [2, .5]],
anchor_steps=[8, 16, 32, 64, 100, 300],
anchor_offset=0.5,
normalizations=[20, -1, -1, -1, -1, -1],
prior_scaling=[0.1, 0.1, 0.2, 0.2]
)
self.anchor_sizes = ssd_size_bounds_to_values(self.params.anchor_size_bounds,
len(self.params.feature_layers),
self.params.img_shape)
print(self.anchor_sizes)
if is_training:
pass
# self._train_classes, self._train_pre_location, self._train_logits = self._built_net(x)
else:
# SSD300的网络结构(输入图片为300)
classes_all_pred, locations_all_pred = self._built_net()
# 解码网络输出,并筛选边界框
self._classes, self._scores, self._b_boxes = self._b_boxes_select(classes_all_pred, locations_all_pred)
def _built_net(self, x=None):
self._input_images = x if x is not None else tf.placeholder(dtype=tf.float32,
shape=[None, self.params.img_shape[0],
self.params.img_shape[1],
3])
check_points = dict()
with tf.variable_scope('ssd_300_vgg'):
# block 1
net = conv2d(self._input_images, filters=64, kernel_size=3, name='conv1_1')
net = conv2d(net, filters=64, kernel_size=3, name='conv1_2')
check_points['block1'] = net
net = max_pooling2d(net, pool_size=2, name='pool1')
# block 2
net = conv2d(net, filters=128, kernel_size=3, name='conv2_1')
net = conv2d(net, filters=128, kernel_size=3, name='conv2_2')
check_points['block2'] = net
net = max_pooling2d(net, pool_size=2, name='pool2')
# block 3
net = conv2d(net, filters=256, kernel_size=3, name='conv3_1')
net = conv2d(net, filters=256, kernel_size=3, name='conv3_2')
net = conv2d(net, filters=256, kernel_size=3, name='conv3_3')
check_points['block3'] = net
net = max_pooling2d(net, pool_size=2, name='pool3')
# block 4
net = conv2d(net, filters=512, kernel_size=3, name='conv4_1')
net = conv2d(net, filters=512, kernel_size=3, name='conv4_2')
net = conv2d(net, filters=512, kernel_size=3, name='conv4_3')
check_points['block4'] = net
net = max_pooling2d(net, pool_size=2, name='pool4')
# block 5
net = conv2d(net, filters=512, kernel_size=3, name='conv5_1')
net = conv2d(net, filters=512, kernel_size=3, name='conv5_2')
net = conv2d(net, filters=512, kernel_size=3, name='conv5_3')
check_points['block5'] = net
net = max_pooling2d(net, pool_size=3, stride=1, name='pool5')
# The above is the first five layers of the VGG16.
# SSD layers
# block 6
net = conv2d(net, filters=1024, kernel_size=3, dilation_rate=6, name='conv6')
check_points['block6'] = net
net = dropout(net, training=self.is_training)
# block 7
net = conv2d(net, filters=1024, kernel_size=1, name='conv7')
check_points['block7'] = net
net = dropout(net, training=self.is_training)
# block 8
net = conv2d(net, filters=256, kernel_size=1, name='conv8_1x1')
net = pad2d(net)
net = conv2d(net, filters=512, kernel_size=3, stride=2, padding='valid', name='conv8_3x3')
check_points['block8'] = net
# block 9
net = conv2d(net, filters=128, kernel_size=1, name='conv9_1x1')
net = pad2d(net)
net = conv2d(net, filters=256, kernel_size=3, stride=2, padding='valid', name='conv9_3x3')
check_points['block9'] = net
# block 10
net = conv2d(net, filters=128, kernel_size=1, name="conv10_1x1")
net = conv2d(net, filters=256, kernel_size=3, padding="valid", name="conv10_3x3")
check_points['block10'] = net
# block 11
net = conv2d(net, filters=128, kernel_size=1, name="conv11_1x1")
net = conv2d(net, filters=256, kernel_size=3, padding="valid", name="conv11_3x3")
check_points['block11'] = net
# set prediction of loc and cls
classes_all_pred = []
locations_all_pred = []
if self.is_training:
logits_all_pred = []
for i, layer in enumerate(self.params.feature_layers):
loc_pred_layer, cls_pred_layer = self._predictions_layer(check_points[layer],
sizes=self.anchor_sizes[i],
ratios=self.params.anchor_ratios[i],
is_l2norm=self.params.normalizations[i],
name=layer + '_box')
classes_all_pred.append(tf.nn.softmax(cls_pred_layer, axis=-1)) # 解码class得分:用softmax函数变成概率
locations_all_pred.append(loc_pred_layer) # 解码边界框位置xywh
if self.is_training:
logits_all_pred.append(cls_pred_layer)
if self.is_training:
return classes_all_pred, locations_all_pred, logits_all_pred
return classes_all_pred, locations_all_pred
def _predictions_layer(self, x, sizes, ratios, is_l2norm, name='multibox'):
# x: shape=(?, feat_size, feat_size, num_filters)
shape = x.shape.as_list()[1:-1]
shape = [-1] + shape # -1 is represent the num is n
with tf.variable_scope(name):
if is_l2norm > 0:
x = l2norm(x)
num_anchors = len(sizes) + len(ratios)
# position -> regression
# num_anchors * 4 -> 4 is the x, y, h, w
loc_pred = conv2d(x, filters=num_anchors * 4, kernel_size=3, activation=None, name='conv_loc')
loc_pred = tf.reshape(loc_pred, shape + [num_anchors, 4]) # [anchor数量,每个anchor的locations信息]
# category -> classification
cls_pred = conv2d(x, filters=num_anchors * self.params.num_classes, kernel_size=3, activation=None,
name='conv_cls')
cls_pred = tf.reshape(cls_pred,
shape + [num_anchors, self.params.num_classes]) # [anchor数量,每个anchor的class信息]
# loc_pred and cls_pred are 5-dim
return loc_pred, cls_pred
def anchors(self):
return ssd_anchors_all_layers(self.params.img_shape,
self.params.feature_shapes,
self.anchor_sizes,
self.params.anchor_ratios,
self.params.anchor_steps,
self.params.anchor_offset,
np.float32)
def _b_boxes_decode(self, locations, anchors):
# anchor(prior box): d = (d_cx, d_cy, d_w, d_h), bbox: b = (b_cx, b_cy, b_w, b_h),
# prediction: l = (l_cx, l_cy, l_w, l_h)
# b_cx = d_w * l_cx + d_cx, b_cy = d_h * l_cy + d_cy, b_w = d_w * exp(l_w), b_h = d_h * exp(l_h)
d_cy, d_cx, d_h, d_w = anchors
# after expend the dim of x,y in ssd_anchor_one_layer function, d_cy, d_cx are (feat_size[0], feat_size[1], 1)
# locations[:, :, :, :, 0] descent 1 dim, and dim becomes (?, feat_size[0], feat_size[1], num_anchors)
# use broadcast to add
b_cx = d_w * locations[:, :, :, :, 0] * self.params.prior_scaling[0] + d_cx
b_cy = d_h * locations[:, :, :, :, 1] * self.params.prior_scaling[1] + d_cy
b_w = d_w * tf.exp(locations[:, :, :, :, 2] * self.params.prior_scaling[2])
b_h = d_h * tf.exp(locations[:, :, :, :, 3] * self.params.prior_scaling[3])
# Boxes coordinates.
y_min = b_cy - b_h / 2.
x_min = b_cx - b_w / 2.
y_max = b_cy + b_h / 2.
x_max = b_cx + b_w / 2.
# tf.stack is just like map function in python
# after tf.stack, b_box's shape is (?, feat_size[0], feat_size[1], num_anchors, 4)
b_boxes = tf.stack([y_min, x_min, y_max, x_max], axis=-1)
return b_boxes
# 给当前layer每个anchor一个分类(分数最高的类),再去掉分数小于threshold的anchor,返回筛选后剩下的anchors以及对应的分类和分数
# 但其实返回的不是anchors的位置,中间使用预测的位置locations以及anchors的位置,使用解码函数得到了b_boxes的位置
def _b_boxes_select_layer(self, classes, locations, anchors):
# predictions is 5-dim: (?, feat_size, feat_size, num_anchors_1px, num_classes)
# num_boxes = feat_size * feat_size * num_anchors_1px
# for example: the first layer's feature map is 38x38, and num_anchors_1px is 4, then num_boxes = 38 * 38 * 4
num_anchors = int(np.prod(classes.get_shape().as_list()[1:-1]))
b_boxes = self._b_boxes_decode(locations, anchors)
b_boxes = tf.reshape(b_boxes, [num_anchors, 4])
classes = tf.reshape(classes, [num_anchors, self.params.num_classes])
# 移除背景的得分num_class预测值
predictions_except_bg = classes[:, 1:]
# 类别labels:最大的类别分数索引。(因为背景在第一个索引位置,故后面+1)
classes = tf.argmax(predictions_except_bg, axis=1) + 1 # 2-dim to 1-dim 选出每个anchor分数最高的类
scores = tf.reduce_max(predictions_except_bg, axis=1) # 2-dim to 1-dim 选出每个anchor最高分,对应上面的类
# 变成bool类型的向量:True留下、False去除
# tf.boolean_mask 第一个参数是要筛选的数组 第二个参数是boolean数组
filter_mask = scores > self.threshold
classes = tf.boolean_mask(classes, filter_mask)
scores = tf.boolean_mask(scores, filter_mask)
b_boxes = tf.boolean_mask(b_boxes, filter_mask)
return classes, scores, b_boxes
# 对每层遍历使用_b_boxes_select_layer函数得到所有的层的b_boxes
def _b_boxes_select(self, classes, locations):
anchors_all = self.anchors()
classes_all = []
scores_all = []
b_boxes_all = []
# 对每个feature layer选择bboxes:循环调用上面的筛选原则
for n in range(len(classes)):
anchors_layer = list(map(tf.convert_to_tensor, anchors_all[n]))
classes_layer, scores_layer, b_boxes_layer = self._b_boxes_select_layer(classes[n], locations[n],
anchors_layer)
classes_all.append(classes_layer)
scores_all.append(scores_layer)
b_boxes_all.append(b_boxes_layer)
# 整合所有的feature layer筛选的边界框结果: 三个1-dim的数组,包含所有layer筛选出来的b_boxes以及对应的classes和scores
classes = tf.concat(classes_all, axis=0)
scores = tf.concat(scores_all, axis=0)
b_boxes = tf.concat(b_boxes_all, axis=0)
return classes, scores, b_boxes
def detections(self):
return self._classes, self._scores, self._b_boxes
# def train_prediction(self):
# return self._train_classes, self._train_logits, self._train_pre_location
def input_images(self):
return self._input_images
# =========================================================================== #
# Tools
# =========================================================================== #
def l2norm(x, trainable=True, scope='L2Normalization'):
n_channels = x.get_shape().as_list()[-1] # 通道数
l2_norm = tf.nn.l2_normalize(x, dim=[3], epsilon=1e-12) # 只对每个像素点在channels上做归一化
with tf.variable_scope(scope):
gamma = tf.get_variable("gamma", shape=[n_channels, ], dtype=tf.float32,
trainable=trainable)
return l2_norm * gamma
def conv2d(x, filters, kernel_size, stride=1, dilation_rate=1, padding='same', activation: None = tf.nn.relu,
name='conv2d'):
kernel_size = [kernel_size] * 2
strides = [stride] * 2
dilation_rate = [dilation_rate] * 2
return tf.layers.conv2d(inputs=x, filters=filters, kernel_size=kernel_size, strides=strides,
dilation_rate=dilation_rate, padding=padding, activation=activation, name=name)
def max_pooling2d(x, pool_size, stride=None, padding='same', name='max_pooling2d'):
if stride is None:
strides = [pool_size] * 2
else:
strides = [stride] * 2
pool_size = [pool_size] * 2
return tf.layers.max_pooling2d(inputs=x, pool_size=pool_size, strides=strides, padding=padding, name=name)
def pad2d(x, pad=1):
return tf.pad(tensor=x, paddings=[[0, 0], [pad, pad], [pad, pad], [0, 0]])
def dropout(x, rate=0.5, training=True):
return tf.layers.dropout(inputs=x, rate=rate, training=training)
def ssd_size_bounds_to_values(size_bounds,
n_feat_layers,
img_shape=(300, 300)):
"""Compute the reference sizes of the anchor boxes from relative bounds.
The absolute values are measured in pixels, based on the network
default size (300 pixels).
This function follows the computation performed in the original
implementation of SSD in Caffe.
Return:
list of list containing the absolute sizes at each scale. For each scale,
the ratios only apply to the first value.
"""
assert img_shape[0] == img_shape[1]
img_size = img_shape[0]
min_ratio = int(size_bounds[0] * 100)
max_ratio = int(size_bounds[1] * 100)
step = int(math.floor((max_ratio - min_ratio) / (n_feat_layers - 2)))
# Start with the following smallest sizes.
sizes = [[size_bounds[0] / 2, size_bounds[0]]]
for ratio in range(min_ratio, max_ratio + 1, step):
sizes.append((ratio / 100.,
(ratio + step) / 100.))
return sizes
def ssd_anchor_one_layer(img_shape,
feat_shape,
sizes,
ratios,
step,
offset=0.5,
dtype=np.float32):
"""Computer SSD default anchor boxes for one feature layer.
Determine the relative position grid of the centers, and the relative
width and height.
Arguments:
feat_shape: Feature shape, used for computing relative position grids;
size: Absolute reference sizes;
ratios: Ratios to use on these features;
img_shape: Image shape, used for computing height, width relatively to the
former;
offset: Grid offset.
Return:
y, x, h, w: Relative x and y grids, and height and width.
"""
# Compute the position grid: simple way.
y, x = np.mgrid[0:feat_shape[0], 0:feat_shape[1]]
y = (y.astype(dtype) + offset) / feat_shape[0]
x = (x.astype(dtype) + offset) / feat_shape[1]
# Weird SSD-Caffe computation using steps values...
# y, x = np.mgrid[0:feat_shape[0], 0:feat_shape[1]]
# y = (y.astype(dtype) + offset) * step / img_shape[0]
# x = (x.astype(dtype) + offset) * step / img_shape[1]
# Expand dims to support easy broadcasting.
y = np.expand_dims(y, axis=-1) # [size, size, 1]
x = np.expand_dims(x, axis=-1) # [size, size, 1]
# Compute relative height and width.
# Tries to follow the original implementation of SSD for the order.
num_anchors = len(sizes) + len(ratios)
h = np.zeros((num_anchors,), dtype=dtype) # [n_anchors]
w = np.zeros((num_anchors,), dtype=dtype) # [n_anchors]
# Add first anchor boxes with ratio=1.
h[0] = sizes[0]
w[0] = sizes[0]
di = 1
if len(sizes) > 1:
h[1] = math.sqrt(sizes[0] * sizes[1])
w[1] = math.sqrt(sizes[0] * sizes[1])
di += 1
for i, r in enumerate(ratios):
h[i + di] = sizes[0] / math.sqrt(r)
w[i + di] = sizes[0] * math.sqrt(r)
return y, x, h, w
def ssd_anchors_all_layers(img_shape,
layers_shape,
anchor_sizes,
anchor_ratios,
anchor_steps,
offset=0.5,
dtype=np.float32):
"""Compute anchor boxes for all feature layers.
"""
layers_anchors = []
for i, s in enumerate(layers_shape):
anchor_bboxes = ssd_anchor_one_layer(img_shape, s,
anchor_sizes[i],
anchor_ratios[i],
anchor_steps[i],
offset=offset, dtype=dtype)
layers_anchors.append(anchor_bboxes)
return layers_anchors
def tf_ssd_bboxes_encode(labels,
bboxes,
anchors,
num_classes,
no_annotation_label,
ignore_threshold=0.5,
prior_scaling=(0.1, 0.1, 0.2, 0.2),
dtype=tf.float32,
scope='ssd_bboxes_encode'):
with tf.name_scope(scope):
target_labels = []
target_localizations = []
target_scores = []
for i, anchors_layer in enumerate(anchors):
with tf.name_scope('bboxes_encode_block_%i' % i):
t_labels, t_loc, t_scores = \
tf_ssd_bboxes_encode_layer(labels, bboxes, anchors_layer,
num_classes, no_annotation_label,
ignore_threshold,
prior_scaling, dtype)
target_labels.append(t_labels)
target_localizations.append(t_loc)
target_scores.append(t_scores)
return target_labels, target_localizations, target_scores
def tf_ssd_bboxes_encode_layer(labels,
bboxes,
anchors_layer,
num_classes,
no_annotation_label,
ignore_threshold=0.5,
prior_scaling=(0.1, 0.1, 0.2, 0.2),
dtype=tf.float32):
# `y, x, h, w`
# `x, y`代表中心点位置,shape为(N, M, 1),其中(N, M)为特征图尺寸。
# `h, w`代表边长,shape与为(N, ),其中N为每个特征点的anchor数量。
# 取值都在[0, 1]之间,都是在整张图片中的相对位置。
yref, xref, href, wref = anchors_layer
# 转换anchor的表示方式
ymin = yref - href / 2.
xmin = xref - wref / 2.
ymax = yref + href / 2.
xmax = xref + wref / 2.
# 计算anchor面积
vol_anchors = (xmax - xmin) * (ymax - ymin)
# shape为(feature_map_height, feature_map_width, anchors_per_feature_map_point)
# 可以代表特征图中所有anchor
shape = (yref.shape[0], yref.shape[1], href.size)
feat_labels = tf.zeros(shape, dtype=tf.int64)
feat_scores = tf.zeros(shape, dtype=dtype)
feat_ymin = tf.zeros(shape, dtype=dtype)
feat_xmin = tf.zeros(shape, dtype=dtype)
feat_ymax = tf.ones(shape, dtype=dtype)
feat_xmax = tf.ones(shape, dtype=dtype)
def jaccard_with_anchors(bbox):
""" 计算某个bbox与所有输入anchors的交并比"""
int_ymin = tf.maximum(ymin, bbox[0])
int_xmin = tf.maximum(xmin, bbox[1])
int_ymax = tf.minimum(ymax, bbox[2])
int_xmax = tf.minimum(xmax, bbox[3])
h = tf.maximum(int_ymax - int_ymin, 0.)
w = tf.maximum(int_xmax - int_xmin, 0.)
# Volumes.
inter_vol = h * w
union_vol = vol_anchors - inter_vol \
+ (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
jaccard = tf.div(inter_vol, union_vol)
return jaccard
def intersection_with_anchors(bbox):
""" 计算某个bbox与anchor交叉面积 占 anchor面积的比例 """
int_ymin = tf.maximum(ymin, bbox[0])
int_xmin = tf.maximum(xmin, bbox[1])
int_ymax = tf.minimum(ymax, bbox[2])
int_xmax = tf.minimum(xmax, bbox[3])
h = tf.maximum(int_ymax - int_ymin, 0.)
w = tf.maximum(int_xmax - int_xmin, 0.)
inter_vol = h * w
scores = tf.div(inter_vol, vol_anchors)
return scores
# while循环的条件部分
def condition(i, feat_labels, feat_scores,
feat_ymin, feat_xmin, feat_ymax, feat_xmax):
""" i < len(labels) """
# 类似于while的条件:while i < n: 代码段, 这里就是看i是否小于labels的数量,是就继续执行
r = tf.less(i, tf.shape(labels))
return r[0]
# while循环的循环体,满足while条件后执行的代码
def body(i, feat_labels, feat_scores, feat_ymin, feat_xmin, feat_ymax, feat_xmax):
label = labels[i]
bbox = bboxes[i]
# 计算当前层所有anchor与第i个ground truth的真实边界框的IOU
jaccard = jaccard_with_anchors(bbox)
# 条件如下
# cur_jaccard > scores && jaccard > jaccard_threshold && scores > -0.5 && label < num_classes
# tf.greater(a, b) return True if a > b else False
# tf_ssd_bboxes_encode_layer开头的位置定义了feat_scores,并且都初始化为0,所以第一次循环,就是当前层所有anchors与第i个真实边界框
# 的所有jaccard值与一堆0组成的数组比较,返回一个boolean数组,大于0是True否则是False。后面的循环还会更新,比如第二次进入循环体的时候,
# 一部分feat_scores就是第一次循环时jaccard>0的那些jaccard值了
# 一直循环的话,就会一直比较与之前一次循环得到的feat_scores,最终得到的是
mask = tf.greater(jaccard, feat_scores)
# mask = tf.logical_and(mask, tf.greater(jaccard, matching_threshold))
# 为什么是-0.5 不是 0.5
mask = tf.logical_and(mask, feat_scores > -0.5)
# 还有label大于num_classes的情况吗
mask = tf.logical_and(mask, label < num_classes)
# 这里是把True和False变成了1和0或者1.0, 0.0方便后面计算feat_labels和feat_ymin feat_xmin feat_ymax feat_xmax
imask = tf.cast(mask, tf.int64)
fmask = tf.cast(mask, dtype)
# 符合条件的 添加到 feat_labels/feat_scores/feat_ymin/feat_xmin/feat_ymax/feat_xmax 中
# 不符合条件的还是使用之前的值
# feat_labels初始化都是0,第一次比较不是背景,则是label的值,否则就是0,也就是背景的值
feat_labels = imask * label + (1 - imask) * feat_labels
# tf.where 类似于 x = 1 if a else b, mask为True的位置在jaccard中取,其他在feat_scores取
# 在这里选出了上面经过层层的mask条件筛选,得到的当前层所有的anchors的分数,
# 实际上每个anchor是选择了与某个真实边界框之间的jaccard最高的,把这个jaccard作为分数
feat_scores = tf.where(mask, jaccard, feat_scores)
# 相应的这里不是背景的就是真实框的坐标值,是背景的就是之前的坐标值,当然,第一次时,是背景的就是初始化的值
feat_ymin = fmask * bbox[0] + (1 - fmask) * feat_ymin
feat_xmin = fmask * bbox[1] + (1 - fmask) * feat_xmin
feat_ymax = fmask * bbox[2] + (1 - fmask) * feat_ymax
feat_xmax = fmask * bbox[3] + (1 - fmask) * feat_xmax
# Check no annotation label: ignore these anchors...
# interscts = intersection_with_anchors(bbox)
# mask = tf.logical_and(interscts > ignore_threshold,
# label == no_annotation_label)
# # Replace scores by -1.
# feat_scores = tf.where(mask, -tf.cast(mask, dtype), feat_scores)
return [i + 1, feat_labels, feat_scores,
feat_ymin, feat_xmin, feat_ymax, feat_xmax]
# 本质就是遍历所有 ground truth 中的label
# 将每个 gt 中的 label 与所有 anchors 进行对比
# 最后获取所有anchors的 label(分类标签) score(与gt的最大jaccard),以及groud truth的bbox信息
# 上面定义了循环条件和循环体,这里就是那个while循环真正的执行地方了
i = 0
[i, feat_labels, feat_scores,
feat_ymin, feat_xmin,
feat_ymax, feat_xmax] = tf.while_loop(condition, body,
[i, feat_labels, feat_scores,
feat_ymin, feat_xmin,
feat_ymax, feat_xmax])
# 转换bbox表达方式
# 根据x和y的最大最小值求出中心点的x y以及h w
feat_cy = (feat_ymax + feat_ymin) / 2.
feat_cx = (feat_xmax + feat_xmin) / 2.
feat_h = feat_ymax - feat_ymin
feat_w = feat_xmax - feat_xmin
# 获取偏差值(预测数据就是预测偏差),并进行scale
# 这里就是encode,根据论文公式来就行,是预测过程decode的反向过程,或者说decode是encode的反向过程
feat_cy = (feat_cy - yref) / href / prior_scaling[0]
feat_cx = (feat_cx - xref) / wref / prior_scaling[1]
feat_h = tf.log(feat_h / href) / prior_scaling[2]
feat_w = tf.log(feat_w / wref) / prior_scaling[3]
feat_localizations = tf.stack([feat_cx, feat_cy, feat_w, feat_h], axis=-1)
# 总的来说这个函数把当前layer中的每个anchor都给安排了一个与之jaccard最高的ground truth
# 没有对应ground truth的就是背景 标签为0
# 对应上ground truth的anchor,label就是ground truth对应的分类
# 对应上ground truth的anchor,location不是直接用的ground truth的,而是对ground truth的location进行encode之后的
# 对应上ground truth的anchor,score就是与安排的这个ground truth的jaccard值
return feat_labels, feat_localizations, feat_scores
def ssd_losses(logits, localisations,
gclasses, glocalisations, gscores,
match_threshold=0.5,
negative_ratio=3.,
alpha=1.,
label_smoothing=0.,
device='/cpu:0',
scope=None):
with tf.name_scope(scope, 'ssd_losses'):
lshape = get_shape(logits[0], 5)
num_classes = lshape[-1]
batch_size = lshape[0]
# flattern所有数据
flogits = []
fgclasses = []
fgscores = []
flocalisations = []
fglocalisations = []
for i in range(len(logits)):
flogits.append(tf.reshape(logits[i], [-1, num_classes]))
fgclasses.append(tf.reshape(gclasses[i], [-1]))
fgscores.append(tf.reshape(gscores[i], [-1]))
flocalisations.append(tf.reshape(localisations[i], [-1, 4]))
fglocalisations.append(tf.reshape(glocalisations[i], [-1, 4]))
logits = tf.concat(flogits, axis=0)
gclasses = tf.concat(fgclasses, axis=0)
gscores = tf.concat(fgscores, axis=0)
localisations = tf.concat(flocalisations, axis=0)
glocalisations = tf.concat(fglocalisations, axis=0)
dtype = logits.dtype
# 根据gscores获取正/反例
pmask = gscores > match_threshold
fpmask = tf.cast(pmask, dtype)
n_positives = tf.reduce_sum(fpmask)
# Hard negative mining...
no_classes = tf.cast(pmask, tf.int32)
predictions = tf.nn.softmax(logits)
nmask = tf.logical_and(tf.logical_not(pmask),
gscores > -0.5)
fnmask = tf.cast(nmask, dtype)
nvalues = tf.where(nmask,
predictions[:, 0],
1. - fnmask)
nvalues_flat = tf.reshape(nvalues, [-1])
# 设置反例数量为正例的negative_ratio
max_neg_entries = tf.cast(tf.reduce_sum(fnmask), tf.int32)
n_neg = tf.cast(negative_ratio * n_positives, tf.int32) + batch_size
n_neg = tf.minimum(n_neg, max_neg_entries)
val, idxes = tf.nn.top_k(-nvalues_flat, k=n_neg)
max_hard_pred = -val[-1]
nmask = tf.logical_and(nmask, nvalues < max_hard_pred)
fnmask = tf.cast(nmask, dtype)
# 计算正例的分类误差
with tf.name_scope('cross_entropy_pos'):
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits,
labels=gclasses)
loss_pos = tf.div(tf.reduce_sum(loss * fpmask), batch_size, name='value')
# tf.losses.add_loss(loss_pos)
# 计算反例的分类误差
with tf.name_scope('cross_entropy_neg'):
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits,
labels=no_classes)
loss_neg = tf.div(tf.reduce_sum(loss * fnmask), batch_size, name='value')
# tf.losses.add_loss(loss_neg)
# bbox位置误差: smooth L1, L2,
with tf.name_scope('localization'):
weights = tf.expand_dims(alpha * fpmask, axis=-1)
loss = abs_smooth(localisations - glocalisations)
loss_loc = tf.div(tf.reduce_sum(loss * weights), batch_size, name='value')
# tf.losses.add_loss(loss_loc)
return loss_pos, loss_neg, loss_loc
def get_shape(x, rank=None):
"""Returns the dimensions of a Tensor as list of integers or scale tensors.
Args:
x: N-d Tensor;
rank: Rank of the Tensor. If None, will try to guess it.
Returns:
A list of `[d1, d2, ..., dN]` corresponding to the dimensions of the
input tensor. Dimensions that are statically known are python integers,
otherwise they are integer scalar tensors.
"""
if x.get_shape().is_fully_defined():
return x.get_shape().as_list()
else:
static_shape = x.get_shape()
if rank is None:
static_shape = static_shape.as_list()
rank = len(static_shape)
else:
static_shape = x.get_shape().with_rank(rank).as_list()
dynamic_shape = tf.unstack(tf.shape(x), rank)
return [s if s is not None else d
for s, d in zip(static_shape, dynamic_shape)]
def abs_smooth(x):
"""Smoothed absolute function. Useful to compute an L1 smooth error.
Define as:
x^2 / 2 if abs(x) < 1
abs(x) - 0.5 if abs(x) > 1
We use here a differentiable definition using min(x) and abs(x). Clearly
not optimal, but good enough for our purpose!
"""
absx = tf.abs(x)
minx = tf.minimum(absx, 1)
r = 0.5 * ((absx - 1) * minx + absx)
return r
if __name__ == '__main__':
ssd = SSD()
viencoding.com版权所有,允许转载,但转载请注明出处和原文链接: https://viencoding.com/article/247
欢迎小伙伴们在下方评论区留言 ~ O(∩_∩)O
文章对我有帮助, 点此请博主吃包辣条 ~ O(∩_∩)O
There are no comments yet.