Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'tmkl2':
hashers.TMKL2(frames_per_second=15),
'tmkl1':
hashers.TMKL1(frames_per_second=15)
}
for filepath in [
'perception/testing/videos/v1.m4v',
'perception/testing/videos/v2.m4v'
]:
# Ensure synchronized hashing
hashes1 = {
hasher_name: hasher.compute(filepath)
for hasher_name, hasher in video_hashers.items()
}
hashes2 = hashers.tools.compute_synchronized_video_hashes(
filepath=filepath, hashers=video_hashers)
assert hashes1 == hashes2
def test_opencv_hasher(hasher: hashers.ImageHasher, image1: str, image2: str):
# For OpenCV hashers we make sure the distance we compute
# is the same as inside OpenCV
f1 = image1
f2 = image2
opencv_distance = hasher.hasher.compare(
hasher.hasher.compute(hashers.tools.read(f1)),
hasher.hasher.compute(hashers.tools.read(f2)))
if hasher.distance_metric == 'hamming':
opencv_distance /= hasher.hash_length
np.testing.assert_approx_equal(
opencv_distance,
hasher.compute_distance(hasher.compute(f1), hasher.compute(f2)),
significant=4)
Args:
hashers: A dictionary of hashers.
max_workers: Maximum number of workers for parallel hash
computation.
Returns:
hashes: A BenchmarkHashes object.
"""
id_rates = {
hasher_name: hasher.frames_per_second
for hasher_name, hasher in hashers.items()
if hasher.frames_per_second is not None
}
if id_rates:
framerates = tools.get_common_framerates({
hasher_name: hasher.frames_per_second
for hasher_name, hasher in hashers.items()
if hasher.frames_per_second is not None
})
else:
framerates = {}
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_workers) as executor:
futures = [
executor.submit(
_process_row,
row=row,
framerates=framerates,
hashers=hashers) for index, row in self._df.iterrows()
]
Returns:
A dictionary of matches. See Safer matching service documentation (
contact Thorn for a copy).
"""
raw_hashes = [
self.hasher.compute_with_quality(
image if isinstance(image, str) else image[0])
for image in images
]
hashes = [{
'id':
image if isinstance(image, str) else image[1],
self.hasher_api_id:
hash_string,
'md5':
perception_hashers.tools.compute_md5(image) if isinstance(
image, str) else
(perception_hashers.tools.compute_md5(image[0]) if isinstance(
image[0], str) else None)
} for image, (hash_string, quality) in zip(images, raw_hashes)
if quality > self.quality_threshold]
for hash_dict in hashes:
# We cannot include an md5 key if we don't
# have the md5.
if hash_dict['md5'] is None:
del hash_dict['md5']
if not hashes:
warnings.warn(
message="No images of sufficient quality were found.",
category=UserWarning)
return {}
body = {'hashes': hashes, 'version': 'v2'}
def _compute_with_quality(self, image: np.ndarray):
return self._compute(image), tools.compute_quality(image)
batch_size: The minimum number of hashes to include in each batch.
"""
def convert(scenes):
if hash_format == 'vector':
return scenes
if self.base_hasher.returns_multiple:
return [([
self.vector_to_string(h, hash_format=hash_format)
for h in hs
], frames) for hs, frames in scenes]
return [(self.vector_to_string(h, hash_format=hash_format), frames)
for h, frames in scenes]
state = None
for frame, frame_index, frame_timestamp in tools.read_video(
filepath=filepath,
frames_per_second=self.frames_per_second,
errors=errors):
state = self.process_frame(
frame=frame,
frame_index=frame_index,
frame_timestamp=frame_timestamp,
state=state,
batch_mode=True)
if len(state['scenes']) >= batch_size:
yield convert(state['scenes'])
state['scenes'] = []
assert state is not None
if state['substate']:
self.handle_scene(state)
if state['scenes']:
def _compute_isometric(self, image):
return {
transform_name: self._dct_to_hash(dct)
for transform_name, dct in tools.get_isometric_dct_transforms(
self._compute_dct(image)).items()
}
def _process_row(row, hashers, framerates):
error = None
try:
assert not pd.isnull(row['filepath']), 'No filepath provided.'
hashes = tools.compute_synchronized_video_hashes(
filepath=row['filepath'],
hashers=hashers,
framerates=framerates,
hash_format='base64')
# pylint: disable=broad-except
except Exception as exception:
error = str(exception)
hashes = {
hasher_name: [None] if hasher.returns_multiple else None
for hasher_name, hasher in hashers.items()
}
base_dict = {
'guid': row['guid'],
'filepath': row['filepath'],
'error': error,
'category': row['category'],