Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_deduplicate():
directory = tempfile.TemporaryDirectory()
original = testing.DEFAULT_TEST_IMAGES[0]
duplicate = os.path.join(directory.name, 'image1.jpg')
shutil.copy(original, duplicate)
pairs = tools.deduplicate(
files=[
testing.DEFAULT_TEST_IMAGES[0], testing.DEFAULT_TEST_IMAGES[1],
duplicate
],
hashers=[(hashers.PHash(hash_size=16), 0.25)])
assert len(pairs) == 1
file1, file2 = pairs[0]
assert ((file1 == duplicate) and
(file2 == original)) or ((file1 == original) and
(file2 == duplicate))
'blackpad':
benchmarking.video_transforms.get_black_frame_padding_transform(
duration_s=1),
'slideshow':
benchmarking.video_transforms.get_slideshow_transform(
frame_input_rate=1, frame_output_rate=1),
}
transformed = video_dataset.transform(
storage_dir=tempfile.TemporaryDirectory().name, transforms=transforms)
assert len(transformed._df) == len(transforms) * len(video_dataset._df)
assert transformed._df['filepath'].isnull().sum() == 0
# We will compute hashes for each of the transformed
# videos and check the results for correctness.
phash_framewise_hasher = hashers.FramewiseHasher(
frame_hasher=hashers.PHash(),
interframe_threshold=-1,
frames_per_second=2)
hashes = transformed.compute_hashes(
hashers={'phashframewise': phash_framewise_hasher})
guid = hashes._df.guid.iloc[0]
df = hashes._df[hashes._df['guid'] == guid]
clip1s = df[(df.transform_name == 'clip1s')]
noop = df[(df.transform_name == 'noop')]
blackpad = df[(df.transform_name == 'blackpad')]
slideshow = df[(df.transform_name == 'slideshow')]
# We should have dropped two hashes from the beginning
# on the clipped video.
assert len(clip1s) == len(noop) - 2
def test_synchronized_hashing():
video_hashers = {
'phashframewise':
hashers.FramewiseHasher(
frame_hasher=hashers.PHash(hash_size=16),
frames_per_second=1,
interframe_threshold=0.2),
'tmkl2':
hashers.TMKL2(frames_per_second=15),
'tmkl1':
hashers.TMKL1(frames_per_second=15)
}
for filepath in [
'perception/testing/videos/v1.m4v',
'perception/testing/videos/v2.m4v'
]:
# Ensure synchronized hashing
hashes1 = {
hasher_name: hasher.compute(filepath)
for hasher_name, hasher in video_hashers.items()
def test_video_hashing_common():
testing.test_video_hasher_integrity(
hasher=hashers.FramewiseHasher(
frame_hasher=hashers.PHash(hash_size=16),
interframe_threshold=0.1,
frames_per_second=1))
(hashers.WaveletHash, 0.1, 0.1, False), (hashers.PHash, 0.1, 0.1, False),
(PDQHash, 0.1, 0.15, False), (hashers.DHash, 0.1, 0.1, False),
(hashers.MarrHildreth, 0.1, 0.1, True),
(hashers.BlockMean, 0.1, 0.1, True),
(hashers.ColorMoment, 10, 0.1, True)])
def test_image_hashing_common(hasher_class, pil_opencv_threshold,
transform_threshold, opencv_hasher):
testing.test_image_hasher_integrity(
hasher=hasher_class(),
pil_opencv_threshold=pil_opencv_threshold,
transform_threshold=transform_threshold,
opencv_hasher=opencv_hasher)
def test_benchmark_transforms():
transformed = dataset.transform(
transforms={
'blur0.05': iaa.GaussianBlur(0.05),
'noop': iaa.Resize(size=(256, 256))
},
storage_dir='/tmp/transforms')
assert len(transformed._df) == len(files) * 2
hashes = transformed.compute_hashes(hashers={'pdna': hashers.PHash()})
tr = hashes.compute_threshold_recall().reset_index()
hashes._metrics = None
hashes._df.at[0, 'hash'] = None
with pytest.warns(UserWarning, match='invalid / empty hashes'):
hashes.compute_threshold_recall()
assert (tr[tr['transform_name'] == 'noop']['recall'] == 100.0).all()
# This is a charting function but we execute it just to make sure
# it runs without error.
hashes.show_histograms()
'(4) username and password as `SAFER_MATCHING_SERVICE_USERNAME` and '
'`SAFER_MATCHING_SERVICE_PASSWORD` env vars.')
if url is None:
url = os.environ.get('SAFER_MATCHING_SERVICE_URL')
if url is None:
raise ValueError(
'You must provide either the url or the SAFER_MATCHING_SERVICE_URL env var.'
)
if urllib.parse.urlparse(url).scheme != 'https' and not os.environ.get(
'SAFER_MATCHING_SERVICE_DEV_ALLOW_HTTP'):
raise ValueError(
'You must provide an url that begins with `https://`.')
self.api_key = api_key
self.url = url
if hasher is None:
hasher = perception_hashers.PHash(hash_size=16, highfreq_factor=4)
if hasher_api_id is None:
hasher_api_id = 'phash'
self.hasher = hasher
self.hasher_api_id = hasher_api_id
self.quality_threshold = quality_threshold