Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _compute_distance(self, row):
pt0 = row['prev_pt']
pt1 = row[self.get_geom_column_name()]
if not isinstance(pt0, Point):
return 0.0
if pt0 == pt1:
return 0.0
if self.is_latlon:
dist_meters = measure_distance_spherical(pt0, pt1)
else: # The following distance will be in CRS units that might not be meters!
dist_meters = measure_distance_euclidean(pt0, pt1)
return dist_meters
def is_representative_max_distance(self):
p_i = self.traj.df.iloc[self.i][self.traj.get_geom_column_name()]
p_j = self.traj.df.iloc[self.j][self.traj.get_geom_column_name()]
if self.traj.is_latlon:
d_space = measure_distance_spherical(p_i, p_j)
else:
d_space = measure_distance_euclidean(p_i, p_j)
if d_space >= self.max_distance:
self.significant_points.append(p_i)
return True
else:
return False
def more_points_further_than_min_distance(self):
for self.k in range(self.j + 1, self.n):
p_j = self.traj.df.iloc[self.j][self.traj.get_geom_column_name()]
p_k = self.traj.df.iloc[self.k][self.traj.get_geom_column_name()]
if self.traj.is_latlon:
d_space_j_k = measure_distance_spherical(p_j, p_k)
else:
d_space_j_k = measure_distance_euclidean(p_j, p_k)
if d_space_j_k >= self.min_distance:
return True
return False
def _compute_speed(self, row):
pt0 = row['prev_pt']
pt1 = row[self.get_geom_column_name()]
if not isinstance(pt0, Point):
return 0.0
if not isinstance(pt1, Point):
raise ValueError('Invalid trajectory! Got {} instead of point!'.format(pt1))
if pt0 == pt1:
return 0.0
if self.is_latlon:
dist_meters = measure_distance_spherical(pt0, pt1)
else: # The following distance will be in CRS units that might not be meters!
dist_meters = measure_distance_euclidean(pt0, pt1)
return dist_meters / row['delta_t'].total_seconds()
def _generalize_traj(self, traj, tolerance):
temp_df = traj.df.copy()
prev_pt = temp_df.iloc[0][traj.get_geom_column_name()]
keep_rows = [0]
i = 0
for index, row in temp_df.iterrows():
pt = row[traj.get_geom_column_name()]
if traj.is_latlon:
dist = measure_distance_spherical(pt, prev_pt)
else:
dist = measure_distance_euclidean(pt, prev_pt)
if dist >= tolerance:
keep_rows.append(i)
prev_pt = pt
i += 1
keep_rows.append(len(traj.df)-1)
new_df = traj.df.iloc[keep_rows]
new_traj = Trajectory(new_df, traj.id)
return new_traj