Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_traj(nodes, crs=CRS_METRIC, id=1, parent=None):
nodes = [node.to_dict() for node in nodes]
df = pd.DataFrame(nodes).set_index('t')
geo_df = GeoDataFrame(df, crs=crs)
return Trajectory(id, geo_df, parent=parent)
def copy(self):
"""
Return a copy of the trajectory.
Returns
-------
Trajectory
"""
return Trajectory(self.df.copy(), self.id, parent=self.parent)
if __name__ == '__main__':
xmin, xmax, ymin, ymax = 116.3685035,116.3702945,39.904675,39.907728
polygon = Polygon([(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin), (xmin, ymin)])
t_start = datetime.now()
df = read_file(os.path.join(script_path,'demodata_geolife.gpkg'))
df['t'] = pd.to_datetime(df['t'])
df = df.set_index('t')
print("Finished reading {} rows in {}".format(len(df),datetime.now() - t_start))
t_start = datetime.now()
trajectories = []
for key, values in df.groupby(['trajectory_id']):
trajectory = Trajectory(key, values)
print(trajectory)
trajectories.append(trajectory)
print("Finished creating {} trajectories in {}".format(len(trajectories),datetime.now() - t_start))
t_start = datetime.now()
intersections = []
for key, values in df.groupby(['trajectory_id']):
traj = Trajectory(key, values)
for intersection in traj.clip(polygon):
intersections.append(intersection)
print("Found {} intersections in {}".format(len(intersections),datetime.now() - t_start))
def _df_to_trajectories(self, df, traj_id_col, obj_id_col):
trajectories = []
for traj_id, values in df.groupby([traj_id_col]):
if len(values) < 2:
continue
if obj_id_col in values.columns:
obj_id = values.iloc[0][obj_id_col]
else:
obj_id = None
trajectory = Trajectory(values, traj_id, obj_id=obj_id)
if trajectory.get_length() < self.min_length:
continue
trajectories.append(trajectory)
return trajectories
def df_to_trajectories(self, df, trajectory_id):
trajectories = []
for key, values in df.groupby([trajectory_id]):
if len(values) < 2:
continue
trajectory = Trajectory(key, values)
if trajectory.get_length() < self.min_length:
continue
trajectories.append(trajectory)
return trajectories
import pandas as pd
from geopandas import read_file
script_path = os.path.dirname(__file__)
sys.path.append(os.path.join(script_path,".."))
from movingpandas.trajectory import Trajectory
if __name__ == '__main__':
df = read_file(os.path.join(script_path,'demodata_geolife.gpkg'))
df['t'] = pd.to_datetime(df['t'])
df = df.set_index('t')
trajectories = []
for key, values in df.groupby(['trajectory_id']):
trajectories.append(Trajectory(key, values))
intersections = []
polygon_file = fiona.open(os.path.join(script_path,'demodata_grid.gpkg'), 'r')
for feature in polygon_file:
for traj in trajectories:
for intersection in traj.intersection(feature):
intersections.append(intersection)
for intersection in intersections:
print(intersection)
polygon_file.close()
Returns
-------
list
List of trajectories
"""
result = []
temp_df = self.df.copy()
temp_df['t'] = temp_df.index
temp_df['gap'] = temp_df['t'].diff() > gap
temp_df['gap'] = temp_df['gap'].apply(lambda x: 1 if x else 0).cumsum()
dfs = [group[1] for group in temp_df.groupby(temp_df['gap'])]
for i, df in enumerate(dfs):
df = df.drop(columns=['t', 'gap'])
if len(df) > 1:
result.append(Trajectory(df, '{}_{}'.format(self.id, i)))
return result
"""
Return Trajectory segment between times t1 and t2.
Parameters
----------
t1 : datetime.datetime
Start time for the segment
t2 : datetime.datetime
End time for the segment
Returns
-------
Trajectory
Extracted trajectory segment
"""
segment = Trajectory(self.df[t1:t2], self.id, parent=self)
if not segment.is_valid():
raise RuntimeError("Failed to extract valid trajectory segment between {} and {}".format(t1, t2))
return segment