Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_segments_for_ranges(traj, ranges):
counter = 0
segments = [] # list of trajectories
for the_range in ranges:
temp_traj = traj.copy()
if type(the_range) == SpatioTemporalRange:
temp_traj.df = create_entry_and_exit_points(traj, the_range)
try:
segment = temp_traj.get_segment_between(the_range.t_0, the_range.t_n)
except ValueError as e:
continue
segment.id = "{}_{}".format(traj.id, counter)
segment.parent = traj
segments.append(segment)
counter += 1
return segments
length = row['line'].length
t0 = t + (t_delta * row['line'].project(pt0)/length)
tn = t + (t_delta * row['line'].project(ptn)/length)
# to avoid numerical issues with microseconds beyond six digits, we reconstruct the timestamps
t0 = datetime(t0.year, t0.month, t0.day, t0.hour, t0.minute, t0.second, t0.microsecond)
tn = datetime(tn.year, tn.month, tn.day, tn.hour, tn.minute, tn.second, tn.microsecond)
# to avoid intersection issues with zero length lines
if ptn == translate(pt0, 0.00000001, 0.00000001):
t0 = row['prev_t']
tn = row['t']
# to avoid numerical issues with timestamps
if is_equal(tn, row['t']):
tn = row['t']
if is_equal(t0, row['prev_t']):
t0 = row['prev_t']
return SpatioTemporalRange(pt0, ptn, t0, tn)
else:
return None
end = r.t_n
pt0 = r.pt_0
ptn = r.pt_n
elif end == r.t_0:
end = r.t_n
ptn = r.pt_n
elif r.t_0 > end and is_equal(r.t_0, end):
end = r.t_n
ptn = r.pt_n
else:
new.append(SpatioTemporalRange(pt0, ptn, start, end))
start = r.t_0
end = r.t_n
pt0 = r.pt_0
ptn = r.pt_n
new.append(SpatioTemporalRange(pt0, ptn, start, end))
return new
for r in ranges:
if r is None:
continue # raise ValueError('Received range that is None!')
if start is None:
start = r.t_0
end = r.t_n
pt0 = r.pt_0
ptn = r.pt_n
elif end == r.t_0:
end = r.t_n
ptn = r.pt_n
elif r.t_0 > end and is_equal(r.t_0, end):
end = r.t_n
ptn = r.pt_n
else:
new.append(SpatioTemporalRange(pt0, ptn, start, end))
start = r.t_0
end = r.t_n
pt0 = r.pt_0
ptn = r.pt_n
new.append(SpatioTemporalRange(pt0, ptn, start, end))
return new
t1 : datetime.datetime
Start time for the segment
t2 : datetime.datetime
End time for the segment
method : str
Extraction method
Returns
-------
shapely LineString
Extracted trajectory segment
"""
if method not in ['interpolated', 'within']:
raise ValueError('Invalid split method {}. Must be one of [interpolated, within]'.format(method))
if method == 'interpolated':
st_range = SpatioTemporalRange(self.get_position_at(t1), self.get_position_at(t2), t1, t2)
temp_df = create_entry_and_exit_points(self, st_range)
temp_df = temp_df[t1:t2]
return point_gdf_to_linestring(temp_df)
else:
try:
return point_gdf_to_linestring(self.get_segment_between(t1, t2).df)
except RuntimeError:
raise RuntimeError("Cannot generate linestring between {0} and {1}".format(t1, t2))