pixels_features_analysis.py 11.3 KB
#!/usr/bin/python
# -*- coding: utf-8 -*-

import os.path as op
import glob
import numpy as np
import matplotlib.pyplot as plt
import json
import otbApplication
from matplotlib.lines import Line2D
import collections
import argparse

   

def get_alcd_label_from_diff(diff_img, diff_class_nb, alcd_labeled_img):
    '''
    Get the pixels of the ALCD labeled image, which are of the class
    diff_class_nb (linked to the confusion matrix, so 1 2 3 or 4) of
    the image diff_img 
    '''
    Superimpose = otbApplication.Registry.CreateApplication("Superimpose")
    Superimpose.SetParameterString("inr", str(diff_img))
    Superimpose.SetParameterString("inm", str(alcd_labeled_img))
    Superimpose.SetParameterString("interpolator", "nn")
    Superimpose.UpdateParameters()
    Superimpose.Execute()
    

    BandMathX = otbApplication.Registry.CreateApplication("BandMathX")    

    BandMathX.AddParameterStringList("il", str(diff_img))
    BandMathX.AddImageToParameterInputImageList("il", Superimpose.GetParameterOutputImage("out"))
    #im1 is the diff image, im2 the ALCD ref
    # pixels that have a null value / no-data will be set to 0, 
    # the tones dont belong to the class will be set to -1,
    # and the others to their labeled class
    expression = "((im1b1 == {diff_class_nb} ? im2b1 : -1) + (im1b1 == 0 ? 1 : 0))".format(
    diff_class_nb = str(diff_class_nb))            
        
    BandMathX.SetParameterString("exp", expression)
    
    BandMathX.UpdateParameters()
    BandMathX.Execute()
    
    # Pass directly to numpy array, doesnt write on disk
    extraction_output = BandMathX.GetImageAsNumpyArray('out')
    alcd_masked = np.array(extraction_output).flatten().astype(int)
    
    # dict with number of pixels per class
    class_numbers = collections.Counter(alcd_masked)
    
    # number of valid pixels on the image
    nb_valid_pixels = len(alcd_masked)-class_numbers[0]


    return class_numbers, nb_valid_pixels

    


def get_miclassified_histograms(location, date, cas_alcd, processing_chain, pcc_sub_dir = None, diff_class_nb = 1, display = False):
    paths_configuration = json.load(open(op.join('..', 'paths_configuration.json')))
        
    data_alcd_path = paths_configuration["data_paths"]["data_alcd"]
    data_pcc_path = paths_configuration["data_paths"]["data_pcc"]
    if pcc_sub_dir != None:
        data_pcc_path = op.join(data_pcc_path, pcc_sub_dir)
    tile = paths_configuration["tile_location"][location]
    
    location_dir = location + '_' + tile + '_' + date
    raw_img = glob.glob(op.join(data_alcd_path, location_dir, 'In_data', 'Image', (location + '_bands.tif')))[0] 
       
    main_dir = op.join(data_pcc_path, '{}_{}_{}'.format(location, tile, date))

    alcd_labeled_img = op.join(main_dir, 'Original_data', 'labeled_img_original.tif')

    if cas_alcd == 1:
         reference_sub_dir = 'ALCD_initial'  
         if processing_chain == 'maja':
             processing_name = 'maja_ini'
         elif processing_chain == 'sen2cor':
             processing_name = 'sen2cor_ini'
         elif processing_chain == 'fmask':
             processing_name = 'fmask_ini'
    elif cas_alcd == 2:
         reference_sub_dir = 'ALCD_dilated'  
         if processing_chain == 'maja':
             processing_name = 'maja_ini'
         elif processing_chain == 'sen2cor':
             processing_name = 'sen2cor_dilat'
         elif processing_chain == 'fmask':
             processing_name = 'fmask_dilat'
    elif cas_alcd == 3:
         reference_sub_dir = 'ALCD_initial'  
         if processing_chain == 'maja':
             processing_name = 'maja_erode'
         elif processing_chain == 'sen2cor':
             processing_name = 'sen2cor_ini'
         elif processing_chain == 'fmask':
             processing_name = 'fmask_ini'
                  

    
    diff_img = glob.glob(op.join(main_dir, 'Binary_difference', reference_sub_dir, '*{}*.tif'.format(processing_name)))[0]

    numbers_dict, nb_valid_pixels = get_alcd_label_from_diff(diff_img, diff_class_nb, alcd_labeled_img)


    non_zero_class_nb = []
    nb_per_class = []


    all_classes_nb = np.arange(1,8)    
    for alcd_class_nb in all_classes_nb:  
        if numbers_dict[alcd_class_nb] > 0:
            non_zero_class_nb.append(alcd_class_nb)
            nb_per_class.append(numbers_dict[alcd_class_nb])
    
    
    total_samples = sum(nb_per_class)
    nb_per_class = [float(n)/total_samples for n in nb_per_class]
        
    proportion_in_image = float(total_samples)/float(nb_valid_pixels)
    
    return non_zero_class_nb, nb_per_class, proportion_in_image
    
    
    
def plot_confusion_repartition(location, date, processing_chain, cas_alcd, pcc_sub_dir = None):
    '''
    Plot the repartition of classes for each cell of the confusion
    matrix.
    e.g. what percentage of low_clouds and clouds_shadows in false positive
    ''' 
    paths_configuration = json.load(open(op.join('..', 'paths_configuration.json')))
    data_pcc_dir = paths_configuration["data_paths"]["data_pcc"]
    if pcc_sub_dir != None:
        data_pcc_dir = op.join(data_pcc_dir, pcc_sub_dir)    
    
    tile = paths_configuration["tile_location"][location]
    main_dir = op.join(data_pcc_dir, '{}_{}_{}'.format(location, tile, date))

    
    class_nb = []
    all_proportion = []
    mat_conf_proportion = []
    all_indexes = [0]
    all_colors = []
    all_classes_proportion = []
    all_labels = []
    colormap = create_colormap(color_pattern = 'labeled', class_call = 'number')
    ticks_positions = []
    
    alcd_class_dict = {'0':'null value', '1':'background', '2':'low clouds',
    '3':'high clouds', '4':'clouds shadows', '5':'land',
    '6':'water', '7':'snow'}    
    
    for confusion_mat_place in [1,2,3,4]:
        non_zero_class_nb, nb_per_class, proportion_in_image = get_miclassified_histograms(location, 
        date, cas_alcd, processing_chain, pcc_sub_dir = pcc_sub_dir, 
        diff_class_nb = confusion_mat_place, display = True)
        class_nb.append(non_zero_class_nb)
        all_proportion.append(nb_per_class)
        mat_conf_proportion.append(proportion_in_image)
        
        all_classes_proportion.extend(nb_per_class)
        
        for clas in non_zero_class_nb:
            all_colors.append(colormap[str(clas)])
            all_indexes.append(all_indexes[-1]+1)

        ticks_positions.append(float(all_indexes[-(len(non_zero_class_nb)+1)]+all_indexes[-1])/2)
        all_indexes[-1] = all_indexes[-1]+2    

    all_indexes.pop()
            
    # create plot
    fig, ax = plt.subplots()
    ax.yaxis.grid(True) # put horizontal grid
    
    # ticks creation
    ticks_labels = ['TP ({:.01f}%)'.format(mat_conf_proportion[0]*100),
    'FN ({:.01f}%)'.format(mat_conf_proportion[1]*100),
    'FP ({:.01f}%)'.format(mat_conf_proportion[2]*100),
    'TN ({:.01f}%)'.format(mat_conf_proportion[3]*100)]

    plt.xticks(ticks_positions, ticks_labels)    

    # legend customization
    all_legend_nb = list(set([item for sublist in class_nb for item in sublist]))
    all_legend_names = [alcd_class_dict[str(clas)] for clas in all_legend_nb]
    all_legend_colors = [colormap[str(clas)] for clas in all_legend_nb]
    custom_lines = [Line2D([0], [0], color=all_legend_colors[k], lw=4) for k in range(len(all_legend_colors))]
    
    plt.legend(custom_lines, all_legend_names, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)        
    
    # plot the data
    bar_width = 1.
    opacity = 0.8
    
    rects = plt.bar(all_indexes, all_classes_proportion, bar_width,
                         alpha=opacity,
                         color=all_colors,
                         label = all_labels)    
 
    
    # axes modification
    plt.xlabel('Confusion matrix stat. (% of total pixels)')
    plt.ylabel('Proportion among the stat')    
    plt.ylim(0.,1.)
    plt.title('Repartition of the true classes in the confusion matrix \n for {}, {}, {}'.format(processing_chain, location, date))
    plt.tight_layout()
    
    # save the figure
    #~ plt.show()    
    out_fig = op.join(main_dir, 'Statistics', 'features', 'repartition_matconf_{}_{}_{}_case{}.png'.format(processing_chain, location, date, cas_alcd))
    plt.savefig(out_fig, bbox_inches='tight')    
        
        

def create_colormap(color_pattern = 'diff', class_call = 'number'):
    '''
    Create a colormap for each class
    '''
    
    class_color = {}

    if color_pattern == 'diff':
        class_color["full_image"] = np.divide([100., 0., 255.], 255.)
        class_color["null_value"] = np.divide([0., 0., 0., 0.], 255.)
        class_color["TP"] = np.divide([10., 192., 56.], 255.)
        class_color["FN"] = np.divide([255., 0., 0.], 255.)
        class_color["FP"] = np.divide([192., 52., 193.], 255.)
        class_color["TN"] = np.divide([10., 10., 10.], 255.)
        
        if class_call == 'number':
            class_color["-1"] = class_color["full_image"]
            class_color["0"] = class_color["null_value"]
            class_color["1"] = class_color["TP"]
            class_color["2"] = class_color["FN"]
            class_color["3"] = class_color["FP"]
            class_color["4"] = class_color["TN"]
    else:
        class_color["full_image"] = np.divide([100., 0., 255.], 255.)
        class_color["null_value"] = np.divide([195., 255., 42.], 255.)
        class_color["background"] = np.divide([187., 83., 58.], 255.)
        class_color["low_clouds"] = np.divide([170., 170., 170.], 255.)
        class_color["high_clouds"] = np.divide([255., 123., 184.], 255.)
        class_color["clouds_shadows"] = np.divide([10., 10., 10.], 255.)
        class_color["land"] = np.divide([0., 151., 56.], 255.)
        class_color["water"] = np.divide([0., 142., 208.], 255.)
        class_color["snow"] = np.divide([77., 77., 77.], 255.)
        if class_call == 'number':
            class_color["-1"] = class_color["full_image"]
            class_color["0"] = class_color["null_value"]
            class_color["1"] = class_color["background"]
            class_color["2"] = class_color["low_clouds"]
            class_color["3"] = class_color["high_clouds"]
            class_color["4"] = class_color["clouds_shadows"]
            class_color["5"] = class_color["land"]
            class_color["6"] = class_color["water"]
            class_color["7"] = class_color["snow"]
            
    
    return class_color


def main():
    parser = argparse.ArgumentParser()
    
    parser.add_argument('-l', action='store', default=None, dest='location', help='Name of the location', required=True)
    parser.add_argument('-d', action='store', default=None, dest='date', help='Date to perform', required=True)
    parser.add_argument('-s', action='store', default=None, dest='pcc_sub_dir', help='Name of the PCC subdir (e.g. HOT016)')
    parser.add_argument('-chain', action='store', default=None, dest='processing_chain', help='Name of the processing_chain (maja, sen2cor or fmask)')
    parser.add_argument('-case', action='store', default=2, dest='cas_alcd', help='Number of the ALCD case (1, 2 or 3)')
    results = parser.parse_args()   
    
    pcc_sub_dir = results.pcc_sub_dir    
    cas_alcd = int(results.cas_alcd)   
    location = results.location
    date = results.date
    processing_chain = results.processing_chain
        
    #~ location = 'Arles'
    #~ date = '20171002'
    #~ processing_chain = 'fmask'
    #~ cas_alcd = 1

    plot_confusion_repartition(location, date, processing_chain, cas_alcd, pcc_sub_dir)



    


    
if __name__ == '__main__':
    main()