Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sorted_s1_indexes, sorted_s2_indexes = find_indexes_of_next_arrival_times(
sorted_s1_trip_values,
sorted_s1_departure_time_values,
sorted_s2_trip_values,
sorted_s2_arrival_time_values
)
else:
# for non-loop routes, there should be at most 1 departure/arrival for a particular trip ID.
# sort by trip ID, then use snp for better performance to find the indexes of matching trip IDs
sort_order = np.argsort(s1_trip_values)
sorted_s1_trip_values = s1_trip_values[sort_order]
sorted_s2_trip_values, sorted_s2_arrival_time_values = sort_parallel(s2_trip_values, s2_arrival_time_values)
_, (sorted_s1_indexes, sorted_s2_indexes) = snp.intersect(sorted_s1_trip_values, sorted_s2_trip_values, indices=True)
# start with an array of all nans
s1_s2_arrival_time_values = np.full(len(s1_trip_values), np.nan)
# find original s1 indexes corresponding to sorted s1 indexes
result_indexes = sort_order[sorted_s1_indexes]
s1_s2_arrival_time_values[result_indexes] = sorted_s2_arrival_time_values[sorted_s2_indexes]
trip_min = (s1_s2_arrival_time_values - s1_departure_time_values) / 60
return trip_min, s1_s2_arrival_time_values
# If s1 and s2 are the same stop, this will compute the time to complete 1 full loop
if not assume_sorted:
s1_departure_time_values, s1_trip_values = sort_parallel(s1_departure_time_values, s1_trip_values)
s2_arrival_time_values, s2_trip_values = sort_parallel(s2_arrival_time_values, s2_trip_values)
s1_indexes, s2_indexes = find_indexes_of_next_arrival_times(
s1_trip_values, s1_departure_time_values,
s2_trip_values, s2_arrival_time_values
)
else:
if not assume_sorted:
s1_trip_values, s1_departure_time_values = sort_parallel(s1_trip_values, s1_departure_time_values)
s2_trip_values, s2_arrival_time_values = sort_parallel(s2_trip_values, s2_arrival_time_values)
_, (s1_indexes, s2_indexes) = snp.intersect(s1_trip_values, s2_trip_values, indices=True)
return (s2_arrival_time_values[s2_indexes] - s1_departure_time_values[s1_indexes]) / 60
def match_keypoints(self, kps_0, kps_1, ids_0, ids_1, ratio, reprojThresh):
"""Match keypoints between 2 frames and find the homography matrix."""
intersection, indices = snp.intersect(
ids_0.reshape(-1), ids_1.reshape(-1), indices=True)
kps_0 = kps_0.reshape(-1, 2) # 2D keypoints
kps_1 = kps_1.reshape(-1, 2) # 2D keypoints
# At least 4 matches to compute an homography
if ids_0[indices[0]].shape[0] > 12:
# Homography between points in kps_1 and kps_0
homography, status = cv2.findHomography(
kps_1[indices[1]], kps_0[indices[0]], cv2.RANSAC)
# reprojThresh)
if status.sum() != kps_0[indices[0]].shape[0]:
print('WARNING: cv2.findHomography can not match all detected'
' keypoints (%d/%d matches).' % (
status.sum(), kps_0[indices[0]].shape[0]))
return homography