Elevation mapping cupy

class elevation_mapping_cupy.elevation_mapping.ElevationMap(param: Parameter)

Bases: object

Core elevation mapping class.

__init__(param: Parameter)
Parameters:

param (elevation_mapping_cupy.parameter.Parameter)

clear()

Reset all the layers of the elevation & the semantic map.

get_position(position)

Return the position of the map center.

Parameters:

position (numpy.ndarray)

move(delta_position)

Shift the map along all three axes according to the input.

Parameters:

delta_position (numpy.ndarray)

move_to(position, R)

Shift the map to an absolute position and update the rotation of the robot.

Parameters:
  • position (numpy.ndarray)

  • R (cupy._core.core.ndarray)

pad_value(x, shift_value, idx=None, value=0.0)

Create a padding of the map along x,y-axis according to amount that has shifted.

Parameters:
  • x (cupy._core.core.ndarray)

  • shift_value (cupy._core.core.ndarray)

  • idx (Union[None, int, None, None])

  • value (float)

shift_map_xy(delta_pixel)

Shift the map along the horizontal axes according to the input.

Parameters:

delta_pixel (cupy._core.core.ndarray)

shift_map_z(delta_z)

Shift the relevant layers along the vertical axis.

Parameters:

delta_z (cupy._core.core.ndarray)

compile_kernels()

Compile all kernels belonging to the elevation map.

compile_image_kernels()

Compile kernels related to processing image messages.

shift_translation_to_map_center(t)

Deduct the map center to get the translation of a point w.r.t. the map center.

Parameters:

t (cupy._core.core.ndarray) – Absolute point position

update_map_with_kernel(points_all, channels, R, t, position_noise, orientation_noise)

Update map with new measurement.

Parameters:
  • points_all (cupy._core.core.ndarray)

  • channels (List[str])

  • R (cupy._core.core.ndarray)

  • t (cupy._core.core.ndarray)

  • position_noise (float)

  • orientation_noise (float)

clear_overlap_map(t)

Clear overlapping areas around the map center.

Parameters:

t (cupy._core.core.ndarray) – Absolute point position

get_additive_mean_error()

Returns the additive mean error.

Returns:

update_variance()

Adds the time variacne to the valid cells.

update_time()

adds the time interval to the time layer.

update_upper_bound_with_valid_elevation()

Filters all invalid cell’s upper_bound and is_upper_bound layers.

input_pointcloud(raw_points: cupy._core.core.ndarray, channels: List[str], R: cupy._core.core.ndarray, t: cupy._core.core.ndarray, position_noise: float, orientation_noise: float)

Input the point cloud and fuse the new measurements to update the elevation map.

Parameters:
  • raw_points (cupy._core.core.ndarray)

  • channels (List[str])

  • R (cupy._core.core.ndarray)

  • t (cupy._core.core.ndarray)

  • position_noise (float)

  • orientation_noise (float)

Return type:

None

input_image(image: List[cupy._core.core.ndarray], channels: List[str], R: cupy._core.core.ndarray, t: cupy._core.core.ndarray, K: cupy._core.core.ndarray, D: cupy._core.core.ndarray, distortion_model: str, image_height: int, image_width: int)

Input image and fuse the new measurements to update the elevation map.

Parameters:
  • sub_key (str) – Key used to identify the subscriber configuration

  • image (List[cupy._core.core.ndarray]) – List of array containing the individual image input channels

  • R (cupy._core.core.ndarray) – Camera optical center rotation

  • t (cupy._core.core.ndarray) – Camera optical center translation

  • K (cupy._core.core.ndarray) – Camera intrinsics

  • image_height (int) – Image height

  • image_width (int) – Image width

Return type:

None

update_normal(dilated_map)

Clear the normal map and then apply the normal kernel with dilated map as input.

Parameters:

dilated_map (cupy._core.core.ndarray)

process_map_for_publish(input_map, fill_nan=False, add_z=False, xp=cupy)

Process the input_map according to the fill_nan and add_z flags.

Parameters:
  • input_map (cupy._core.core.ndarray)

  • fill_nan (bool)

  • add_z (bool)

  • xp (module)

Return type:

cupy._core.core.ndarray

get_elevation()

Get the elevation layer.

Returns:

elevation layer

get_variance()

Get the variance layer.

Returns:

variance layer

get_traversability()

Get the traversability layer.

Returns:

traversability layer

get_time()

Get the time layer.

Returns:

time layer

get_upper_bound()

Get the upper bound layer.

Returns:

upper bound layer

Return type:

upper_bound

get_is_upper_bound()

Get the is upper bound layer.

Returns:

layer

Return type:

is_upper_bound

xp_of_array(array)

Indicate which library is used for xp.

Parameters:

array (cupy._core.core.ndarray)

Returns:

either np or cp

Return type:

module

copy_to_cpu(array, data, stream=None)

Transforms the data to float32 and if on gpu loads it to cpu.

Parameters:
  • array (cupy._core.core.ndarray)

  • data (numpy.ndarray)

  • stream (Union[None, cupy.cuda.stream.Stream, None, None, None, None, None, None, None])

exists_layer(name)

Check if the layer exists in elevation map or in the semantic map.

Parameters:

name (str) – Layer name

Returns:

Indicates if layer exists.

Return type:

bool

get_map_with_name_ref(name, data)

Load a layer according to the name input to the data input.

Parameters:
  • name (str) – Name of the layer.

  • data (numpy.ndarray) – Data structure that contains layer.

get_normal_maps()

Get the normal maps.

Returns:

the three normal values for each cell

Return type:

maps

get_normal_ref(normal_x_data, normal_y_data, normal_z_data)

Get the normal maps as reference.

Parameters:
  • normal_x_data

  • normal_y_data

  • normal_z_data

get_layer(name)

Return the layer with the name input.

Parameters:

name – The layers name.

Returns:

The rqeuested layer.

Return type:

return_map

get_polygon_traversability(polygon, result)

Check if input polygons are traversable.

Parameters:
  • polygon (cupy._core.core.ndarray)

  • result (numpy.ndarray)

Return type:

Union[None, int]

get_untraversable_polygon(untraversable_polygon)

Copy the untraversable polygons to input untraversable_polygons.

Parameters:

untraversable_polygon (numpy.ndarray)

initialize_map(points, method='cubic')

Initializes the map according to some points and using an approximation according to method.

Parameters:
  • points (numpy.ndarray)

  • method (str) – Interpolation method [‘linear’, ‘cubic’, ‘nearest’]

class elevation_mapping_cupy.semantic_map.SemanticMap(param: Parameter)

Bases: object

__init__(param: Parameter)
Parameters:

param (elevation_mapping_cupy.parameter.Parameter)

clear()

Clear the semantic map.

initialize_fusion()

Initialize the fusion algorithms.

update_fusion_setting()

Update the fusion settings.

add_layer(name)

Add a new layer to the semantic map.

Parameters:

name (str) – The name of the new layer.

pad_value(x, shift_value, idx=None, value=0.0)

Create a padding of the map along x,y-axis according to amount that has shifted.

Parameters:
  • x (cupy._core.core.ndarray)

  • shift_value (cupy._core.core.ndarray)

  • idx (Union[None, int, None, None])

  • value (float)

shift_map_xy(shift_value)

Shift the map along x,y-axis according to shift values.

Parameters:

shift_value

get_fusion(channels: List[str], channel_fusions: Dict[str, str], layer_specs: Dict[str, str]) List[str]

Get all fusion algorithms that need to be applied to a specific pointcloud.

Parameters:

channels (List[str])

get_matching_fusion(channel: str, fusion_algs: Dict[str, str])

Use regular expression to check if the fusion algorithm matches the channel name.

get_layer_indices(fusion_alg, layer_specs)

Get the indices of the layers that are used for a specific fusion algorithm.

Parameters:

fusion_alg (str) – fusion algorithm name

Returns:

indices of the layers

Return type:

cp.array

get_indices_fusion(pcl_channels: List[str], fusion_alg: str, layer_specs: Dict[str, str])

Computes the indices of the channels of the pointcloud and the layers of the semantic map of type fusion_alg.

Parameters:
  • pcl_channels (List[str]) – list of all channel names

  • fusion_alg (str) – fusion algorithm type we want to use for channel selection

Return type:

Union[Tuple[List[int], List[int]], Tuple[cupy._core.core.ndarray, cupy._core.core.ndarray]]

update_layers_pointcloud(points_all, channels, R, t, elevation_map)

Update the semantic map with the pointcloud.

Parameters:
  • points_all – semantic point cloud

  • channels – list of channel names

  • R – rotation matrix

  • t – translation vector

  • elevation_map – elevation map object

update_layers_image(image: cupy._core.core.ndarray, channels: List[str], uv_correspondence: cupy._core.core.ndarray, valid_correspondence: cupy._core.core.ndarray, image_height: cupy._core.core.ndarray, image_width: cupy._core.core.ndarray)

Update the semantic map with the new image.

Parameters:
  • sub_key

  • image

  • uv_correspondence

  • valid_correspondence

  • image_height

  • image_width

decode_max(mer)

Decode the float32 value into two 16 bit value containing the class probability and the class id.

Parameters:

mer

Returns:

probability cp.array: class id

Return type:

cp.array

get_map_with_name(name)

Return the map with the given name.

Parameters:

name – layer name

Returns:

map

Return type:

cp.array

get_rgb(name)

Return the rgb map with the given name.

Parameters:

name

Returns:

rgb map

Return type:

cp.array

get_semantic(name)

Return the semantic map layer with the given name.

Parameters:

name (str) – layer name

Returns:

semantic map layer

Return type:

cp.array

process_map_for_publish(input_map)

Remove padding.

Parameters:

input_map (cp.array) – map layer

Returns:

map layer without padding

Return type:

cp.array

get_index(name)

Return the index of the layer with the given name.

Parameters:

name (str)

Returns:

index

Return type:

int

class elevation_mapping_cupy.parameter.Parameter(*args: Any, **kwargs: Any)

Bases: Serializable

This class holds the parameters for the elevation mapping algorithm.

resolution

The resolution in meters. (Default: 0.04)

Type:

float

subscriber_cfg

The configuration for the subscriber. (Default: { "front_cam": { "channels": ["rgb", "person"], "topic_name": "/elevation_mapping/pointcloud_semantic", "data_type": "pointcloud", } })

Type:

dict

additional_layers

The additional layers for the map. (Default: ["color"])

Type:

list

fusion_algorithms

The list of fusion algorithms. (Default: [ "image_color", "image_exponential", "pointcloud_average", "pointcloud_bayesian_inference", "pointcloud_class_average", "pointcloud_class_bayesian", "pointcloud_class_max", "pointcloud_color", ])

Type:

list

pointcloud_channel_fusions

The fusion for pointcloud channels. (Default: {"rgb": "color", "default": "class_average"})

Type:

dict

image_channel_fusions

The fusion for image channels. (Default: {"rgb": "color", "default": "exponential"})

Type:

dict

data_type

The data type for the map. (Default: np.float32)

Type:

str

average_weight

The weight for the average fusion. (Default: 0.5)

Type:

float

map_length

The map’s size in meters. (Default: 8.0)

Type:

float

sensor_noise_factor

The point’s noise is sensor_noise_factor*z^2 (z is distance from sensor). (Default: 0.05)

Type:

float

mahalanobis_thresh

Points outside this distance is outlier. (Default: 2.0)

Type:

float

outlier_variance

If point is outlier, add this value to the cell. (Default: 0.01)

Type:

float

drift_compensation_variance_inlier

Cells under this value is used for drift compensation. (Default: 0.1)

Type:

float

time_variance

Add this value when update_variance is called. (Default: 0.01)

Type:

float

time_interval

Time layer is updated with this interval. (Default: 0.1)

Type:

float

max_variance

The maximum variance for each cell. (Default: 1.0)

Type:

float

dilation_size

The dilation filter size before traversability filter. (Default: 2)

Type:

float

dilation_size_initialize

The dilation size after the init. (Default: 10)

Type:

float

drift_compensation_alpha

The drift compensation alpha for smoother update of drift compensation. (Default: 1.0)

Type:

float

traversability_inlier

Cells with higher traversability are used for drift compensation. (Default: 0.1)

Type:

float

wall_num_thresh

If there are more points than this value, only higher points than the current height are used to make the wall more sharp. (Default: 100)

Type:

float

min_height_drift_cnt

Drift compensation only happens if the valid cells are more than this number. (Default: 100)

Type:

float

max_ray_length

The maximum length for ray tracing. (Default: 2.0)

Type:

float

cleanup_step

Substitute this value from validity layer at visibility cleanup. (Default: 0.01)

Type:

float

cleanup_cos_thresh

Substitute this value from validity layer at visibility cleanup. (Default: 0.5)

Type:

float

min_valid_distance

Points with shorter distance will be filtered out. (Default: 0.3)

Type:

float

max_height_range

Points higher than this value from sensor will be filtered out to disable ceiling. (Default: 1.0)

Type:

float

ramped_height_range_a

If z > max(d - ramped_height_range_b, 0) * ramped_height_range_a + ramped_height_range_c, reject. (Default: 0.3)

Type:

float

ramped_height_range_b

If z > max(d - ramped_height_range_b, 0) * ramped_height_range_a + ramped_height_range_c, reject. (Default: 1.0)

Type:

float

ramped_height_range_c

If z > max(d - ramped_height_range_b, 0) * ramped_height_range_a + ramped_height_range_c, reject. (Default: 0.2)

Type:

float

safe_thresh

If traversability is smaller, it is counted as unsafe cell. (Default: 0.5)

Type:

float

safe_min_thresh

Polygon is unsafe if there exists lower traversability than this. (Default: 0.5)

Type:

float

max_unsafe_n

If the number of cells under safe_thresh exceeds this value, polygon is unsafe. (Default: 20)

Type:

int

checker_layer

Layer used for checking safety. (Default: "traversability")

Type:

str

min_filter_size

The minimum size for the filter. (Default: 5)

Type:

int

min_filter_iteration

The minimum number of iterations for the filter. (Default: 3)

Type:

int

max_drift

The maximum drift for the compensation. (Default: 0.10)

Type:

float

overlap_clear_range_xy

XY range [m] for clearing overlapped area. This defines the valid area for overlap clearance. (used for multi floor setting) (Default: 4.0)

Type:

float

overlap_clear_range_z

Z range [m] for clearing overlapped area. Cells outside this range will be cleared. (used for multi floor setting) (Default: 2.0)

Type:

float

enable_edge_sharpen

Enable edge sharpening. (Default: True)

Type:

bool

enable_drift_compensation

Enable drift compensation. (Default: True)

Type:

bool

enable_visibility_cleanup

Enable visibility cleanup. (Default: True)

Type:

bool

enable_overlap_clearance

Enable overlap clearance. (Default: True)

Type:

bool

use_only_above_for_upper_bound

Use only above for upper bound. (Default: True)

Type:

bool

use_chainer

Use chainer as a backend of traversability filter or pytorch. If false, it uses pytorch. Pytorch requires ~2GB more GPU memory compared to chainer but runs faster. (Default: True)

Type:

bool

position_noise_thresh

If the position change is bigger than this value, the drift compensation happens. (Default: 0.1)

Type:

float

orientation_noise_thresh

If the orientation change is bigger than this value, the drift compensation happens. (Default: 0.1)

Type:

float

plugin_config_file

Configuration file for the plugin. (Default: "config/plugin_config.yaml")

Type:

str

weight_file

Weight file for traversability filter. (Default: "config/weights.dat")

Type:

str

initial_variance

Initial variance for each cell. (Default: 10.0)

Type:

float

initialized_variance

Initialized variance for each cell. (Default: 10.0)

Type:

float

w1

Weights for the first layer. (Default: np.zeros((4, 1, 3, 3)))

Type:

numpy.ndarray

w2

Weights for the second layer. (Default: np.zeros((4, 1, 3, 3)))

Type:

numpy.ndarray

w3

Weights for the third layer. (Default: np.zeros((4, 1, 3, 3)))

Type:

numpy.ndarray

w_out

Weights for the output layer. (Default: np.zeros((1, 12, 1, 1)))

Type:

numpy.ndarray

true_map_length

True length of the map. (Default: None)

Type:

float

cell_n

Number of cells in the map. (Default: None)

Type:

int

true_cell_n

True number of cells in the map. (Default: None)

Type:

int

resolution: float = 0.04
subscriber_cfg: dict
additional_layers: list
fusion_algorithms: list
pointcloud_channel_fusions: dict
image_channel_fusions: dict
data_type

alias of float32

average_weight: float = 0.5
map_length: float = 8.0
sensor_noise_factor: float = 0.05
mahalanobis_thresh: float = 2.0
outlier_variance: float = 0.01
drift_compensation_variance_inlier: float = 0.1
time_variance: float = 0.01
time_interval: float = 0.1
max_variance: float = 1.0
dilation_size: float = 2
dilation_size_initialize: float = 10
drift_compensation_alpha: float = 1.0
traversability_inlier: float = 0.1
wall_num_thresh: float = 100
min_height_drift_cnt: float = 100
max_ray_length: float = 2.0
cleanup_step: float = 0.01
cleanup_cos_thresh: float = 0.5
min_valid_distance: float = 0.3
max_height_range: float = 1.0
ramped_height_range_a: float = 0.3
ramped_height_range_b: float = 1.0
ramped_height_range_c: float = 0.2
safe_thresh: float = 0.5
safe_min_thresh: float = 0.5
max_unsafe_n: int = 20
checker_layer: str = 'traversability'
min_filter_size: int = 5
min_filter_iteration: int = 3
max_drift: float = 0.1
overlap_clear_range_xy: float = 4.0
overlap_clear_range_z: float = 2.0
enable_edge_sharpen: bool = True
enable_drift_compensation: bool = True
enable_visibility_cleanup: bool = True
enable_overlap_clearance: bool = True
use_only_above_for_upper_bound: bool = True
use_chainer: bool = True
position_noise_thresh: float = 0.1
orientation_noise_thresh: float = 0.1
plugin_config_file: str = 'config/plugin_config.yaml'
weight_file: str = 'config/weights.dat'
initial_variance: float = 10.0
initialized_variance: float = 10.0
w1: ndarray
w2: ndarray
w3: ndarray
w_out: ndarray
true_map_length: float = None
cell_n: int = None
true_cell_n: int = None
load_weights(filename)

Load weights from a file into the model’s parameters.

Parameters:

filename (str) – The path to the file containing the weights.

get_names()

Get the names of the parameters.

Returns:

A list of parameter names.

Return type:

list

get_types()

Get the types of the parameters.

Returns:

A list of parameter types.

Return type:

list

set_value(name, value)

Set the value of a parameter.

Parameters:
  • name (str) – The name of the parameter.

  • value (any) – The new value for the parameter.

get_value(name)

Get the value of a parameter.

Parameters:

name (str) – The name of the parameter.

Returns:

The value of the parameter.

Return type:

any

update()

Update the parameters related to the map size and resolution.

__init__(resolution: float = 0.04, subscriber_cfg: dict = <factory>, additional_layers: list = <factory>, fusion_algorithms: list = <factory>, pointcloud_channel_fusions: dict = <factory>, image_channel_fusions: dict = <factory>, data_type: str = <class 'numpy.float32'>, average_weight: float = 0.5, map_length: float = 8.0, sensor_noise_factor: float = 0.05, mahalanobis_thresh: float = 2.0, outlier_variance: float = 0.01, drift_compensation_variance_inlier: float = 0.1, time_variance: float = 0.01, time_interval: float = 0.1, max_variance: float = 1.0, dilation_size: float = 2, dilation_size_initialize: float = 10, drift_compensation_alpha: float = 1.0, traversability_inlier: float = 0.1, wall_num_thresh: float = 100, min_height_drift_cnt: float = 100, max_ray_length: float = 2.0, cleanup_step: float = 0.01, cleanup_cos_thresh: float = 0.5, min_valid_distance: float = 0.3, max_height_range: float = 1.0, ramped_height_range_a: float = 0.3, ramped_height_range_b: float = 1.0, ramped_height_range_c: float = 0.2, safe_thresh: float = 0.5, safe_min_thresh: float = 0.5, max_unsafe_n: int = 20, checker_layer: str = 'traversability', min_filter_size: int = 5, min_filter_iteration: int = 3, max_drift: float = 0.1, overlap_clear_range_xy: float = 4.0, overlap_clear_range_z: float = 2.0, enable_edge_sharpen: bool = True, enable_drift_compensation: bool = True, enable_visibility_cleanup: bool = True, enable_overlap_clearance: bool = True, use_only_above_for_upper_bound: bool = True, use_chainer: bool = True, position_noise_thresh: float = 0.1, orientation_noise_thresh: float = 0.1, plugin_config_file: str = 'config/plugin_config.yaml', weight_file: str = 'config/weights.dat', initial_variance: float = 10.0, initialized_variance: float = 10.0, w1: ~numpy.ndarray = <factory>, w2: ~numpy.ndarray = <factory>, w3: ~numpy.ndarray = <factory>, w_out: ~numpy.ndarray = <factory>, true_map_length: float = None, cell_n: int = None, true_cell_n: int = None) None
class elevation_mapping_cupy.plugins.plugin_manager.PluginParams(name: str, layer_name: str, fill_nan: bool = False, is_height_layer: bool = False)

Bases: object

name: str
layer_name: str
fill_nan: bool = False
is_height_layer: bool = False
__init__(name: str, layer_name: str, fill_nan: bool = False, is_height_layer: bool = False) None
class elevation_mapping_cupy.plugins.plugin_manager.PluginBase(*args, **kwargs)

Bases: ABC

This is a base class of Plugins

__init__(*args, **kwargs)
Parameters:
  • plugin_params – PluginParams

  • callback (The parameter of)

get_layer_data(elevation_map: cupy.ndarray, layer_names: List[str], plugin_layers: cupy.ndarray, plugin_layer_names: List[str], semantic_map: cupy.ndarray, semantic_layer_names: List[str], name: str) cupy.ndarray | None

Retrieve a copy of the layer data from the elevation, plugin, or semantic maps based on the layer name.

Parameters:
  • elevation_map (cp.ndarray) – The elevation map containing various layers.

  • layer_names (List[str]) – A list of names for each layer in the elevation map.

  • plugin_layers (cp.ndarray) – The plugin layers containing additional data.

  • plugin_layer_names (List[str]) – A list of names for each layer in the plugin layers.

  • semantic_map (cp.ndarray) – The semantic map containing various layers.

  • semantic_layer_names (List[str]) – A list of names for each layer in the semantic map.

  • name (str) – The name of the layer to retrieve.

Returns:

A copy of the requested layer as a cupy ndarray if found, otherwise None.

Return type:

Optional[cp.ndarray]

class elevation_mapping_cupy.plugins.plugin_manager.PluginManager(cell_n: int)

Bases: object

This manages the plugins.

__init__(cell_n: int)
init(plugin_params: List[PluginParams], extra_params: List[Dict])
load_plugin_settings(file_path: str)
get_layer_names()
get_plugin_names()
get_plugin_index_with_name(name: str) int
get_layer_index_with_name(name: str) int
update_with_name(name: str, elevation_map: cupy.ndarray, layer_names: List[str], semantic_map=None, semantic_params=None, rotation=None, elements_to_shift={})
get_map_with_name(name: str) cupy.ndarray
get_param_with_name(name: str) PluginParams