Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# go over all the sequences of each user
seq_list = glob.glob(os.path.join(pose_path, '*.cdf'))
seq_list.sort()
for seq_i in seq_list:
# sequence info
seq_name = seq_i.split('/')[-1]
action, camera, _ = seq_name.split('.')
action = action.replace(' ', '_')
# irrelevant sequences
if action == '_ALL':
continue
# 3D pose file
poses_3d = pycdf.CDF(seq_i)['Pose'][0]
# 2D pose file
pose2d_file = os.path.join(pose2d_path, seq_name)
poses_2d = pycdf.CDF(pose2d_file)['Pose'][0]
# bbox file
bbox_file = os.path.join(bbox_path, seq_name.replace('cdf', 'mat'))
bbox_h5py = h5py.File(bbox_file)
# video file
if extract_img:
vid_file = os.path.join(vid_path, seq_name.replace('cdf', 'mp4'))
imgs_path = os.path.join(dataset_path, 'images')
vidcap = cv2.VideoCapture(vid_file)
# go over each frame of the sequence
def load_targets(target_path, file_name, subject):
target_prefix = "/MyPoseFeatures/D2_Positions"
target_path = os.path.join(target_path, subject + target_prefix)
file_meta = file_name.split("_")
activity = file_meta[0]
cdf = pycdf.CDF(os.path.join(target_path, activity + ".cdf"))
targets = cdf[0]
targets = targets[0, :, :]
cdf.close()
return targets
for seq_i in seq_list:
# sequence info
seq_name = seq_i.split('/')[-1]
action, camera, _ = seq_name.split('.')
action = action.replace(' ', '_')
# irrelevant sequences
if action == '_ALL':
continue
# 3D pose file
poses_3d = pycdf.CDF(seq_i)['Pose'][0]
# 2D pose file
pose2d_file = os.path.join(pose2d_path, seq_name)
poses_2d = pycdf.CDF(pose2d_file)['Pose'][0]
# bbox file
bbox_file = os.path.join(bbox_path, seq_name.replace('cdf', 'mat'))
bbox_h5py = h5py.File(bbox_file)
# video file
if extract_img:
vid_file = os.path.join(vid_path, seq_name.replace('cdf', 'mp4'))
imgs_path = os.path.join(dataset_path, 'images')
vidcap = cv2.VideoCapture(vid_file)
# go over each frame of the sequence
for frame_i in range(poses_3d.shape[0]):
# read video frame
if extract_img:
success, image = vidcap.read()
def process_view(out_dir, subject, action, subaction, camera):
subj_dir = path.join('extracted', subject)
base_filename = metadata.get_base_filename(subject, action, subaction, camera)
# Load joint position annotations
with pycdf.CDF(path.join(subj_dir, 'Poses_D2_Positions', base_filename + '.cdf')) as cdf:
poses_2d = np.array(cdf['Pose'])
poses_2d = poses_2d.reshape(poses_2d.shape[1], 32, 2)
with pycdf.CDF(path.join(subj_dir, 'Poses_D3_Positions_mono_universal', base_filename + '.cdf')) as cdf:
poses_3d_univ = np.array(cdf['Pose'])
poses_3d_univ = poses_3d_univ.reshape(poses_3d_univ.shape[1], 32, 3)
with pycdf.CDF(path.join(subj_dir, 'Poses_D3_Positions_mono', base_filename + '.cdf')) as cdf:
poses_3d = np.array(cdf['Pose'])
poses_3d = poses_3d.reshape(poses_3d.shape[1], 32, 3)
# Infer camera intrinsics
camera_int = infer_camera_intrinsics(poses_2d, poses_3d)
camera_int_univ = infer_camera_intrinsics(poses_2d, poses_3d_univ)
frame_indices = select_frame_indices_to_include(subject, poses_3d_univ)
frames = frame_indices + 1
video_file = path.join(subj_dir, 'Videos', base_filename + '.mp4')
frames_dir = path.join(out_dir, 'imageSequence', camera)
makedirs(frames_dir, exist_ok=True)
ls = ftp.nlst( fl0 )
fl = ls[-1]
fl_path = os.path.join( self.path, fl )
ftp.retrbinary( "RETR " + fl,
open( fl_path, 'wb' ).write )
except :
self.mesg_txt( 'fail', date_str )
return
# If the file now exists, try to load it; otherwise, abort.
self.mesg_txt( 'load', date_str )
if ( os.path.isfile( fl_path ) ) :
try :
cdf = pycdf.CDF( fl_path )
except :
self.mesg_txt( 'fail', date_str )
return
else :
self.mesg_txt( 'fail', date_str )
return
# Add the CDF object and tags for each spectrum to the arrays.
c = len( self.arr_cdf )
self.arr_cdf = self.arr_cdf + [ cdf ] # arr_cdf and arr_date of same size
self.arr_date = self.arr_date + [ date_str ]
n_spec = len( cdf['Epoch'] )
self.arr_tag = self.arr_tag + [ fc_tag( c=c, s=s,
n_video_frames = None
for view in views:
# print(subject, sequence, view)
try:
video_path = _video_path(subject, sequence, view)
with imageio.get_reader(video_path) as reader:
n = len(reader)
n_video_frames = n if n_video_frames is None else \
min(n_video_frames, n)
except Exception:
print('Failed to read video file. Skipping...')
return False
n = n_video_frames
with pycdf.CDF(_p3_path(subject, sequence)) as cdf:
p3 = cdf['Pose'][0]
n_frames = p3.shape[0]
n = min(n_frames, n)
p3 = p3.reshape(n_frames, -1, 3)[:n, _filter_indices]
p3d = group.create_dataset('p3', p3.shape, dtype=np.int32)
p3d[...] = p3
group.attrs['len'] = n
# p3 = normalize(np.array(p3d, dtype=np.float32), subject)
# theta = group.create_dataset('theta', (n,), dtype=np.float32)
# rel_poses = group.create_dataset(
# 'rel_p3', (n, skeleton.n_joints, 3), dtype=np.float32)
# for i, abs_pose in enumerate(p3):
# rel_poses[i], theta[i] = abs_to_rel(abs_pose)
except :
self.mesg_txt( 'fail', date_str )
return
# If the file now exists locally, try to load it; otherwise,
# abort.
self.mesg_txt( 'load', date_str )
if ( os.path.isfile( fl_path ) ) :
try :
cdf = pycdf.CDF( fl_path )
except :
self.mesg_txt( 'fail', date_str )
return
else :
self.mesg_txt( 'fail', date_str )
return
# Append the requested date to the list of dates loaded.
ind = len( self.arr_date )
fl = ls[-1]
fl_path = os.path.join( self.path, fl )
ftp.retrbinary( "RETR " + fl,
open( fl_path, 'wb' ).write )
except :
self.mesg_txt( 'fail', date_str )
return
# If the file now exists, try to load it; otherwise,
# abort.
self.mesg_txt( 'load', date_str )
if ( os.path.isfile( fl_path ) ) :
try :
cdf = pycdf.CDF( fl_path )
except :
self.mesg_txt( 'fail', date_str )
return
else :
self.mesg_txt( 'fail', date_str )
return
# Extract the data from the loaded file and select those data
# which seem to have valid (versus fill) values.
if ( self.use_h2 ) :
# Extract the data from the loaded file.
sub_t = cdf['Epoch'][:,0]
sub_b_x = cdf['BGSE'][:,0]
#!/usr/bin/python
import sys
import os
os.putenv("CDF_LIB", "/usr/local/cdf/lib")
import spacepy.pycdf as cdf
#cdf_file = cdf.CDF('~/Download/QinDenton_1min_merged_20101229-v2.cdf')
cdf_file = cdf.CDF('~/Download/QinDenton_hour_merged_20120206-v5.cdf')
Epoch = cdf_file['Epoch']
Year = cdf_file['Year']
DOY = cdf_file['DOY']
hour = cdf_file['hour']
min = cdf_file['min']
ByIMF = cdf_file['ByIMF']
BzIMF = cdf_file['BzIMF']
V_SW = cdf_file['V_SW']
Den_P = cdf_file['Den_P']
Pdyn = cdf_file['Pdyn']
G1 = cdf_file['G1']
G2 = cdf_file['G2']
G3 = cdf_file['G3']
ByIMF_status = cdf_file['ByIMF_status']
BzIMF_status = cdf_file['BzIMF_status']
V_SW_status = cdf_file['V_SW_status']
def get_3d_angles(self, actor, action, sub_action=0):
"""
:param actor:
:param action:
:param sub_action:
:return:
"""
cdf_file = self.get_cdf_file('D3_Angles',
actor, action, sub_action)
cdf = pycdf.CDF(cdf_file)
angles3d = np.squeeze(cdf['Pose'])
return angles3d