본문 바로가기

Machine Learning Tasks/Object Detection

Object Detection - YOLO v3 Pytorch 구현 (2)

반응형

지난 포스트

[Machine Learning/기타] - Object Detection - YOLO v3 Pytorch 구현 (1)


Training

Model

모델은 Darknet 을 사용합니다. YOLO 공식 홈페이지에 사전훈련된 Darknet 모델의 파라미터 "yolov3.weight"를 받을 수 있는데, 바이너리 파일이므로 Pytorch / Keras 프레임워크로 별도로 모델을 구성했다면 사전훈련된 파라미터 값을 레이어에 따라 적절하게 덮어씌우는 과정이 필요합니다. 즉, Darknet 모델을 불러오고 사전훈련된 파라미터 값을 불러온 이후에 backbone 모델 파라미터를 고정시키고 detector 부분만 따로 훈련시키는 전이학습을 (transfer learning) 수행합니다. 따라서 backbone 파라미터는 "requires_grad=False"로 설정하고 나머지 파라미터는 "requires_grad=True"로 설정합니다. 그리고 Adam optimizer 를 사용하며 1~2 epoch 정도만 훈련시킵니다.

model = Darknet()
model.load_state_dict(torch.load('weights/Dartnet_VOC_weights_ini'))

#  This section is to freeze all the network except the output three layers
for name, param in model.named_parameters():
    param.requires_grad = False
    if int(name.split('.')[1]) in (79, 80, 81, 91, 92, 93, 103, 104, 105):
        param.requires_grad = True

model = model.to(device)
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()))

since = time.time()

best_model_wts = copy.deepcopy(model.state_dict())
best_F1_score = 0.0

num_epochs = 1
  • 구글 코랩을 사용한다면 배치 사이즈를 최대 8로밖에 잡을 수 없습니다. 훈련 데이터가 16000 여개이므로 여러 epoch 훈련시키고 싶어도 매우 오랜 시간이 걸립니다.
  • 이미지를 포워딩시키면 [52x52, 26x26, 13x13] feature map 에서의 결과와 각 feature map 을 입력 이미지에 대한 예측으로 바꾼 detection 결과가 리턴됩니다.

Train

지난 포스트에서 정의한 데이터로더와 로스함수를 이용하여 훈련 코드를 구성합니다. YOLO v3는 multi-scale scheme 을 사용하므로 [52x52, 26x26, 13x13] feature map 에서의 결과가 훈련에 사용됩니다. 또한, 성능평가를 위해 각 레이어의 feature map 결과를 Figure 1과 같이 2차원 텐서로 변환하는 "predict_transform" 함수를 구현합니다. 즉, feature map 형태가 [batch size, B*(5+C), grid, grid] 였다면 이를 [batch size, grid*grid*B, 5+C] 형태로 재구성하는 것이죠. 또한, "predict_transform" 함수에서는 모델의 출력을 YOLO v3에 맞게 (중심점으로부터의 오프셋과 사전정의된 앵커박스와의 높이, 너비 비율) 입력의 예측으로 바꾸어줍니다. 

Figure 1

def predict_transform(prediction, inp_dim, anchors, num_classes, CUDA = True):
    '''
    Arguments:
    - prediction: [batch size, B*(5+C), grid, grid]
    '''
    batch_size = prediction.size(0)
    stride =  inp_dim // prediction.size(2)
    grid_size = inp_dim // stride
    bbox_attrs = 5 + num_classes
    num_anchors = len(anchors)
    
    prediction = prediction.view(batch_size, bbox_attrs*num_anchors, grid_size*grid_size)
    prediction = prediction.transpose(1,2).contiguous()
    prediction = prediction.view(batch_size, grid_size*grid_size*num_anchors, bbox_attrs)
    anchors = [(a[0]/stride, a[1]/stride) for a in anchors]

    #Sigmoid the  centre_X, centre_Y. and object confidencce
    prediction[:,:,0] = torch.sigmoid(prediction[:,:,0])
    prediction[:,:,1] = torch.sigmoid(prediction[:,:,1])
    prediction[:,:,4] = torch.sigmoid(prediction[:,:,4])
    
    #Add the center offsets
    grid = np.arange(grid_size)
    a,b = np.meshgrid(grid, grid)

    x_offset = torch.FloatTensor(a).view(-1,1)
    y_offset = torch.FloatTensor(b).view(-1,1)

    if CUDA:
        x_offset = x_offset.cuda()
        y_offset = y_offset.cuda()

    x_y_offset = torch.cat((x_offset, y_offset), 1).repeat(1,num_anchors).view(-1,2).unsqueeze(0)

    prediction[:,:,:2] += x_y_offset

    #log space transform height and the width
    anchors = torch.FloatTensor(anchors)

    if CUDA:
        anchors = anchors.cuda()

    anchors = anchors.repeat(grid_size*grid_size, 1).unsqueeze(0)
    prediction[:,:,2:4] = torch.exp(prediction[:,:,2:4])*anchors
    
    prediction[:,:,5: 5 + num_classes] = torch.sigmoid((prediction[:,:, 5 : 5 + num_classes]))

    prediction[:,:,:4] *= stride
    
    return prediction
  • 입력으로 들어온 prediction 텐서를 [batch size, grid*grid*B, 5+20] 형태로 바꾸어줍니다.
  • 박스의 x, y 중심에 대해서는 sigmoid 함수를 적용하고 meshgrid 함수를 이용해 생성한 그리드 오프셋 "x_y_offset" 을 더해줍니다.
  • Object confidence / class score 에 대해서도 sigmoid 함수를 적용합니다.
  • 박스의 높이, 너비에 대해서는 먼저 사전정의된 앵커박스를 feature map 크기에 맞추어 $p_w, p_h$를 구해준 이후에 $b_w, b_h$를 구합니다.
  • 입력 이미지에 대한 예측을 산출해야하므로 박스 중심, 높이너비에 대해 입력 이미지 대비 feature map 크기 비율을 곱해주어 입력 이미지 상에서의 수치로 환원합니다.
  • 해상도가 높은 feature map 은 작은 객체, 낮은 feature map 은 큰 객체를 담당하므로 입력으로 들어오는 앵커박스는 52x52 feature map 에서 [(10, 13), (16, 30), (33, 23)], 26x26 feature map 에서 [(30, 61), (62, 45), (59, 119)], 13x13 feature map 에서 [(116, 90), (156, 198), (373, 326)]가 들어오게 됩니다.
  • 최종적으로 [52x52, 26x26, 13x13] feature map 을 변환하여 이어붙임으로서 [batch size, 10647, 25] 크기의 예측 결과를 리턴합니다.

이후에는 detection 예측 결과 [batch size, 10647, 25] 텐서에 대해 NMS (Non-Maximum Suppresion) 를 수행합니다. [batch size, 10647, 25] 크기의 텐서를 'prediction' 입력으로 받아 [객체 개수, 8] 크기의 텐서를 출력합니다.

def write_results(prediction, confidence, num_classes, nms_conf = 0.4):
    conf_mask = (prediction[:,:,4] > confidence).float().unsqueeze(2)
    prediction = prediction*conf_mask
    
    box_corner = prediction.new(prediction.shape)
    box_corner[:,:,0] = (prediction[:,:,0] - prediction[:,:,2]/2)
    box_corner[:,:,1] = (prediction[:,:,1] - prediction[:,:,3]/2)
    box_corner[:,:,2] = (prediction[:,:,0] + prediction[:,:,2]/2) 
    box_corner[:,:,3] = (prediction[:,:,1] + prediction[:,:,3]/2)
    prediction[:,:,:4] = box_corner[:,:,:4]
    
    batch_size = prediction.size(0)

    write = False

    for ind in range(batch_size):
        image_pred = prediction[ind]          #image Tensor
        # confidence threshholding 
        # NMS
    
        max_conf, max_conf_score = torch.max(image_pred[:,5:5+ num_classes], 1)
        max_conf = max_conf.float().unsqueeze(1)
        max_conf_score = max_conf_score.float().unsqueeze(1)
        seq = (image_pred[:,:5], max_conf, max_conf_score)
        image_pred = torch.cat(seq, 1)
        
        non_zero_ind =  (torch.nonzero(image_pred[:,4]))
        try:
            image_pred_ = image_pred[non_zero_ind.squeeze(),:].view(-1,7)
        except:
            continue
        
        if image_pred_.shape[0] == 0:
            continue           
  
        # Get the various classes detected in the image
        img_classes = unique(image_pred_[:,-1])  # -1 index holds the class index
        
        
        for cls in img_classes:
            #perform NMS
            #get the detections with one particular class
            cls_mask = image_pred_*(image_pred_[:,-1] == cls).float().unsqueeze(1)
            class_mask_ind = torch.nonzero(cls_mask[:,-2]).squeeze()
            image_pred_class = image_pred_[class_mask_ind].view(-1,7)
            
            #sort the detections such that the entry with the maximum objectness
            #confidence is at the top
            conf_sort_index = torch.sort(image_pred_class[:,4], descending = True )[1]
            image_pred_class = image_pred_class[conf_sort_index]
            idx = image_pred_class.size(0)   #Number of detections
            
            for i in range(idx):
                #Get the IOUs of all boxes that come after the one we are looking at 
                #in the loop
                try:
                    ious = bbox_iou(image_pred_class[i].unsqueeze(0), image_pred_class[i+1:])
                except ValueError:
                    break
            
                except IndexError:
                    break
            
                #Zero out all the detections that have IoU > treshhold
                iou_mask = (ious < nms_conf).float().unsqueeze(1)
                image_pred_class[i+1:] *= iou_mask       
            
                #Remove the non-zero entries
                non_zero_ind = torch.nonzero(image_pred_class[:,4]).squeeze()
                image_pred_class = image_pred_class[non_zero_ind].view(-1,7)
                
            #Repeat the batch_id for as many detections of the class cls in the image
            batch_ind = image_pred_class.new(image_pred_class.size(0), 1).fill_(ind)     
            seq = batch_ind, image_pred_class
            
            if not write:
                output = torch.cat(seq,1)
                write = True
            else:
                out = torch.cat(seq,1)
                output = torch.cat((output,out))

    try:
        return output
    except:
        return 0
  • 먼저 입력으로 주어진 confidence 보다 낮은 박스를 제거합니다. 
  • 박스 중심, 크기로부터 upper left, lower right 점을 구합니다.
  • 배치 안의 각 이미지 별로 bounding 박스마다 클래스 점수가 가장 높은 인덱스를 torch.max 함수로 추출합니다. (torch.max 함수에 축이 지정될 경우 (최대값, 최대값 인덱스) 형태가 출력됩니다.) 이후에 [박스 꼭지점 4개, 클래스 점수, 클래스 인덱스] 의 7개 요소를 가진 텐서를 구성하고 object 점수가 confidence 보다 높은 박스만을 추출합니다.
  • 하나의 이미지에서 검출된 여러 클래스에 대해서 NMS를 수행합니다. 먼저 각 클래스 별로 검출된 박스를 추리고 object score 순으로 정렬합니다. 이후에는 NMS 알고리즘대로 주어진 임계치 (0.4) 보다 IOU가 높은 다른 박스들을 제거합니다. 
  • 기존 7개 요소를 가진 텐서 왼쪽에 배치 인덱스를 가진 하나의 요소를 추가하여 최종적으로 [객체 개수, 8] 크기의 텐서를 리턴합니다.

최종 훈련 코드는 다음과 같습니다. Validation 데이터셋에 대한 F1-score 가 증가할 때마다 모델을 저장합니다. 모델을 포워딩했을 시의 "Final_pre" 변수가 [batch size, 10647, 25] 크기의 입력 이미지 예측 텐서, 리스트로 구성된 "output" 텐서는 순서대로 [52x52, 26x26, 13x13] feature map 의 출력입니다.

for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs-1))
    print('-' * 10)

    # Each epoch has a training and validation phase
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train() # set model to training mode
        else:
            model.eval() # set model to evaluate mode

        running_loss, running_xy_loss, running_wh_loss, running_conf_loss, running_cls_loss = 0.0, 0.0, 0.0, 0.0, 0.0
        running_recall, running_precision, running_F1_score = 0.0, 0.0, 0.0

        # iterate over data
        for i_batch, sample_batched in enumerate(dataloaders[phase]):
            inputs, labels = sample_batched['input_img'], sample_batched['label']
            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            # track history if only in train
            with torch.set_grad_enabled(phase == 'train'):

                Final_pre, *output = model(inputs, CUDA)
                Final_pre = write_results(Final_pre, confidence=0.5, num_classes=20, nms_conf=0.4)

                anchors = (
                [(10, 13), (16, 30), (33, 23)], [(30, 61), (62, 45), (59, 119)], [(116, 90), (156, 198), (373, 326)])

                loss_item = {"total_loss": 0, "x": 0, "y": 0, "w": 0, "h": 0, "conf": 0, "cls": 0}

                for i in range(len(output)):
                    losses = Loss(output[i], labels.float(), anchors[i], inp_dim=inp_dim, num_anchors = 3, num_classes = 20)
                    for i, name in enumerate(loss_item):
                        loss_item[name] += losses[i]

                if isinstance(Final_pre, int) == False:
                    F1_score, precision, recall = eval(Final_pre.cpu(), labels, img_width=inp_dim, img_height=inp_dim)
                else:
                    F1_score, precision, recall = 0, 0, 0

                loss = loss_item['total_loss']
                xy_loss = loss_item['x']+loss_item['y']
                wh_loss = loss_item['w']+loss_item['h']
                conf_loss = loss_item['conf']
                cls_loss = loss_item['cls']

                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

                # statistics
                running_loss += loss.item()
                running_xy_loss += xy_loss.item()
                running_wh_loss += wh_loss.item()
                running_conf_loss += conf_loss.item()
                running_cls_loss += cls_loss.item()
                running_recall += recall
                running_precision += precision
                running_F1_score += F1_score

        epoch_loss = running_loss / ((i_batch+1)*batch_size)
        epoch_xy_loss = running_xy_loss / ((i_batch+1)*batch_size)
        epoch_wh_loss = running_wh_loss / ((i_batch + 1) * batch_size)
        epoch_conf_loss = running_conf_loss / ((i_batch + 1) * batch_size)
        epoch_cls_loss = running_cls_loss / ((i_batch + 1) * batch_size)

        epoch_recall = running_recall / (i_batch+1)
        epoch_precision = running_precision / (i_batch+1)
        epoch_F1_score = running_F1_score / (i_batch+1)

        print(
            '{} Loss: {:.4f} Recall: {:.4f} Precision: {:.4f} F1 Score: {:.4f}'.format(phase, epoch_loss, epoch_recall,
                                                                                       epoch_precision, epoch_F1_score))
        print(
            '{} xy: {:.4f} wh: {:.4f} conf: {:.4f} class: {:.4f}'.format(phase, epoch_xy_loss, epoch_wh_loss,
                                                                                                        epoch_conf_loss,
                                                                                                        epoch_cls_loss))

        # deep copy the model
        if phase == 'val' and epoch_F1_score > best_F1_score:
            best_F1_score = epoch_F1_score
            best_model_wts = copy.deepcopy(model.state_dict())

time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best F1 score: {:4f}'.format(best_F1_score))

model.load_state_dict(best_model_wts)
torch.save(model.state_dict(), 'Dartnet_VOC_Weights')

 

Evaluation

Validation 을 위해서는 NMS로 추려진 bounding 박스 결과에 대해 ground-truth 박스와 비교하는 evaluation 과정을 거쳐야 합니다. 먼저 [batch size, max object, 5] 크기의 라벨 텐서 값을 입력 이미지 크기에 맞는 값으로 바꿔주는 코드를 작성합니다.

def convert_label(image_anno, img_width, img_height):

    """
    Function: convert image annotation : center x, center y, w, h (normalized) to x1, y1, x2, y2 for corresponding img
    """
    x_center = image_anno[:, 1]
    y_center = image_anno[:, 2]
    width = image_anno[:, 3]
    height = image_anno[:, 4]

    output = torch.zeros_like(image_anno)
    output[:,0] = image_anno[:,0]
    output[:, 1], output[:, 3] = x_center - width / 2, x_center + width / 2
    output[:, 2], output[:, 4] = y_center - height / 2, y_center + height / 2

    output[:, [1, 3]] *= img_width
    output[:, [2, 4]] *= img_height

    return output.type(torch.FloatTensor)

이후에는 "write_results" 함수에서 작성한 배치의 [객체 개수, 8] 텐서를 입력으로 받아 recall / precision / F1-score 를 계산하는 eval 함수를 작성합니다. Object score가 0.5 이상인 것을 전체 참값으로 (true positive + false positive) 두고 배치 안의 데이터, 데이터 안의 객체 별로 ground-truth 박스와의 IOU가 0.5 이상이고 클래스 라벨이 맞으면 true positive 라고 간주합니다.

def eval(output, labels, img_width, img_height):

    nProposals = int((output[:, 5] > 0.5).sum().item())
    nGT = 0
    nCorrect = 0
    for b in range(labels.shape[0]):  # for each image
        prediction = output[output[:,0] == b]  # filter out the predictions of corresponding image
        for t in range(labels.shape[1]):  # for each object
            if labels[b, t].sum() == 0:  # if the row is empty
                continue
            nGT += 1
            gt_label = convert_label(labels[b, t].unsqueeze(0), img_width, img_height)
            gt_box = gt_label[:, 1:5]
            for i in range(prediction.shape[0]):
                pred_box = prediction[i, 1:5].unsqueeze(0)
                iou = bbox_iou(pred_box, gt_box)
                pred_label = prediction[i, -1]
                target_label = gt_label[0, 0]
                if iou > 0.5 and pred_label == target_label:
                    nCorrect += 1
    recall = float(nCorrect / nGT) if nGT else 1
    precision = float(nCorrect / nProposals) if nProposals else 0
    F1_score = 2 * recall * precision / (recall + precision + 1e-16)

    return F1_score, precision, recall
반응형