import os
import sys
import yaml
import pickle
import rospkg
import logging
import numpy as np
from collections import namedtuple
[docs]def n2s(x, precision=3):
"""
converts numpy array to string.
Arguments
---------
`x`: `np.array`
The numpy array to convert to a string.
`precision`: `int`
The precision desired on each entry in the array.
"""
return np.array2string(x, precision=precision, separator=',', suppress_small=True)
[docs]def t2s(x):
"""
the equivalent of printing out
a pytorch tensor, but in string form.
"""
return str(x)
[docs]def load_robot_configs(configdir, robot):
"""
Loads robot's DH parameters, SUs' DH parameters and their poses
configdir: str
Path to the config directory where robot yaml files exist
robot: str
Name of the robot
"""
filepath = os.path.join(configdir, robot + '.yaml')
try:
with open(filepath) as file:
return yaml.load(file, Loader=yaml.FullLoader)
except Exception:
raise ValueError('Please provide a valid config directory with robot yaml files')
[docs]def initialize_logging(log_level, filename=None):
"""
Initialize Logging Module with given log_lvel
Arguments
----------
log_level: str
filename: str
"""
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % log_level)
if filename:
overwrite = 'y'
if os.path.exists(filename):
overwrite = input('File {} already exists. Do you want to overwrite [y/n]?'.format(filename))
if overwrite == 'y':
logging.basicConfig(level=numeric_level, filename=filename, filemode='w')
logging.basicConfig(level=numeric_level)
[docs]def parse_datadir(datadir):
if datadir is None:
try:
datadir = os.path.join(rospkg.RosPack().get_path('ros_robotic_skin'), 'data')
except Exception:
raise OSError('ros_robotic_skin not installed in the catkin workspace or pass --datadir=PATH_TO_DATA_DIRECTORY')
return datadir
[docs]def load_data(robot, directory):
"""
Function for collecting acceleration data with poses
Returns
--------
data: Data
Data includes static and dynamic accelerations data
"""
def read_pickle(filename, robot):
filename = '_'.join([filename, robot])
filepath = os.path.join(directory, filename + '_jan16' + '.pickle')
# filepath = os.path.join(directory, filename + '.pickle')
with open(filepath, 'rb') as f:
if sys.version_info[0] == 2:
return pickle.load(f)
else:
return pickle.load(f, encoding='latin1')
static = read_pickle('static_data', robot)
dynamic = read_pickle('dynamic_data', robot)
Data = namedtuple('Data', 'static dynamic')
# load all of the data!
data = Data(static, dynamic)
filepath = os.path.join(directory, 'imu_mappings.pickle')
with open(filepath, 'rb') as f:
if sys.version_info[0] == 2:
imu_mappings = pickle.load(f)
else:
imu_mappings = pickle.load(f, encoding='latin1')
return data, imu_mappings
[docs]def add_noise(data, data_types, sigma=1):
"""
Arguments
----------
data_types: List[str]
"""
return add_outlier(data, data_types, sigma, 1)
[docs]def add_outlier(data, data_types, sigma=3, outlier_ratio=0.25): # noqa:C901
"""
Arguments
----------
data_types: List[str]
"""
for data_type in data_types:
if data_type not in ['static', 'dynamic']:
raise ValueError('There is no such data_type='+data_type)
d = getattr(data, data_type)
imu_indices = {
'static': [4, 5, 6],
'dynamic': [0, 1, 2]}
imu_index = imu_indices[data_type]
pose_names = list(data.dynamic.keys())
joint_names = list(data.dynamic[pose_names[0]].keys())
imu_names = list(data.dynamic[pose_names[0]][joint_names[0]].keys())
n_dynamic_pose = len(list(data.dynamic.keys()))
n_static_pose = len(list(data.static.keys()))
if data_type == 'static':
for i in range(n_static_pose):
for imu in imu_names:
flag = np.random.choice([0, 1], p=[1-outlier_ratio, outlier_ratio])
if not flag:
continue
d[pose_names[i]][imu][imu_index] += np.random.normal(0, sigma, size=3)
return
n_pose = n_dynamic_pose
for i in range(n_pose):
for joint in joint_names:
for imu in imu_names:
flag = np.random.choice([0, 1], p=[1-outlier_ratio, outlier_ratio])
if not flag:
continue
shape = d[pose_names[i]][joint][imu][0][imu_index].shape
d[pose_names[i]][joint][imu][0][imu_index] += np.random.normal(0, sigma, size=shape)
[docs]def outlier_index(data, outlier_ratio):
n_data = data.shape[0]
index = np.random.choice(np.arange(n_data), size=int(n_data*outlier_ratio), replace=False)
if index.size == 0:
return None
elif index.size == 1:
return index[0]
return index