Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_download_ffmpeg_linux(path = tempfile.gettempdir()):
if os.name != 'nt':
try:
#inialize varibles
file_url = 'https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-{}-static.tar.xz'.format(getBitmode(False))
file_name = os.path.join(os.path.abspath(path),'ffmpeg-release-{}-static.tar.xz'.format(getBitmode(False)))
file_path = os.path.join(os.path.abspath(path), 'ffmpeg-4.1.3-{}-static/ffmpeg'.format(getBitmode(False)))
base_path, _ = os.path.split(file_path) #extract file base path
#check if file already exists
if os.path.isfile(file_path):
pass #skip download if does
else:
import requests
if check_python_version() == 2:
from backports import lzma
import tarfile
#check if given pth has write access
assert os.access(path, os.W_OK), "Permission Denied: Cannot write ffmpeg binaries to directory = " + path
#remove leftovers
if os.path.isfile(file_name):
os.remove(file_name)
#download and write file to the given path
with open(file_name, "wb") as f:
response = requests.get(file_url, stream=True)
total_length = response.headers.get('content-length')
if total_length is None: # no content length header
with open(file_name, "wb") as f:
response = requests.get(file_url, stream=True)
total_length = response.headers.get('content-length')
if total_length is None: # no content length header
f.write(response.content)
else:
dl = 0
total_length = int(total_length)
for data in response.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
done = int(50 * dl / total_length)
sys.stdout.write("\r[{}{}]{}{}".format('=' * done, ' ' * (50-done), done * 2, '%') )
sys.stdout.flush()
print("\nExtracting executables, Please Wait...")
if check_python_version()==2:
with lzma.LZMAFile(file_name, "r") as f:
with tarfile.open(fileobj=f) as tar:
tar.extractall(base_path)
else:
with tarfile.open(file_name, 'r:xz') as tar:
tar.extractall(base_path)
tar.close()
print("\nChecking Files...")
if os.path.isfile(file_path):
pass
else:
folder = os.path.join(os.path.abspath(os.path.join(file_path ,"..")), 'ffmpeg-4.1.3-{}-static'.format(getBitmode(False)))
folder_dest = os.path.abspath(os.path.join(file_path ,".."))
files = os.listdir(folder)
for file in files:
shutil.move(os.path.join(folder, file), folder_dest)
def test_get_valid_ffmpeg_path(paths, ffmpeg_download_paths, results):
_windows = True if os.name == 'nt' else False
try:
output = get_valid_ffmpeg_path(custom_ffmpeg = paths, is_windows = _windows, ffmpeg_download_path = ffmpeg_download_paths, logging = True)
if not (paths == 'wrong_test_path' or ffmpeg_download_paths == 'wrong_test_path'):
assert bool(output) == results, "FFmpeg excutables validation and correction Test failed at path: {} and FFmpeg ffmpeg_download_paths: {}".format(paths, ffmpeg_download_paths)
except Exception as e:
if paths == 'wrong_test_path' or ffmpeg_download_paths == 'wrong_test_path':
pass
else:
pytest.fail(str(e))
def test_writegear_class(f_name, compression, c_ffmpeg, output_params, result):
try:
WriteGear(output_filename = f_name, compression_mode = compression , custom_ffmpeg = c_ffmpeg, logging = True, **output_params)
except Exception as e:
if result:
pytest.fail(str(e))
def test_ffmpeg_binaries_download(paths):
_windows = True if os.name == 'nt' else False
file_path = ''
try:
file_path = download_ffmpeg_binaries(path = paths, os_windows = _windows)
if file_path:
assert os.path.isfile(file_path), "FFmpeg download failed!"
if paths != tempfile.gettempdir():
shutil.rmtree(os.path.abspath(os.path.join(file_path ,"../..")))
except Exception as e:
if paths == 'wrong_test_path' and "Permission Denied:" in str(e):
pass
else:
pytest.fail(str(e))
def test_validate_ffmpeg(paths):
FFmpeg_path = ''
if os.name == 'nt':
FFmpeg_path += os.path.join(paths, 'ffmpeg-latest-{}-static/bin/ffmpeg.exe'.format(getBitmode()))
else:
FFmpeg_path += os.path.join(paths, 'ffmpeg-4.1.3-{}-static/ffmpeg'.format(getBitmode(False)))
try:
output = validate_ffmpeg(FFmpeg_path, logging = True)
if paths != 'wrong_test_path':
assert bool(output), "Validation Test failed at path: {}".format(FFmpeg_path)
except Exception as e:
if paths == 'wrong_test_path':
pass
else:
pytest.fail(str(e))
if "-fps" not in self.output_parameters:
FPS = 25
#auto assign dimensions
HEIGHT = self.inputheight
WIDTH = self.inputwidth
#assign parameter dict values to variables
try:
for key, value in self.output_parameters.items():
if key == '-fourcc':
FOURCC = cv2.VideoWriter_fourcc(*(value.upper()))
elif key == '-fps':
FPS = float(value)
elif key =='-backend' and value.upper() in ['CAP_FFMPEG','CAP_GSTREAMER']:
BACKEND = capPropId(value.upper())
elif key == '-color':
COLOR = bool(int(value))
else:
pass
except Exception as e:
# log if something is wrong
if self.logging:
print(e)
raise ValueError('Wrong Values passed to OpenCV Writer, Kindly Refer Docs!')
if self.logging:
#log values for debugging
print('FILE_PATH: {}, FOURCC = {}, FPS = {}, WIDTH = {}, HEIGHT = {}, BACKEND = {}'.format(self.out_file,FOURCC, FPS, WIDTH, HEIGHT, BACKEND))
#start different process for with/without Backend.
#log it
if logging:
print('Enabling Threaded Queue Mode!')
else:
#otherwise disable it
self.threaded_queue_mode = False
#intiate screen dimension handler
screen_dims = {}
#initializing colorspace variable
self.color_space = None
try:
#reformat proper mss dict and assign to screen dimension handler
screen_dims = {k.strip(): v for k,v in options.items() if k.strip() in ["top", "left", "width", "height"]}
# separately handle colorspace value to int conversion
if not(colorspace is None):
self.color_space = capPropId(colorspace.strip())
except Exception as e:
# Catch if any error occurred
if logging:
print(e)
# intialize mss capture instance
self.mss_capture_instance = None
try:
# check whether user-defined dimensions are provided
if screen_dims and len(screen_dims) == 4:
self.mss_capture_instance = screen_dims #create instance from dimensions
else:
self.mss_capture_instance = monitor_instance #otherwise create instance from monitor
# extract global frame from instance
self.frame = np.asanyarray(self.mss_object.grab(self.mss_capture_instance))
if self.threaded_queue_mode:
#intitialize and append to queue
# Two parameters are available since OpenCV 4+ (master branch)
self.stream = cv2.VideoCapture(source, backend)
else:
# initialize the camera stream
self.stream = cv2.VideoCapture(source)
#initializing colorspace variable
self.color_space = None
try:
# try to apply attributes to source if specified
#reformat dict
options = {k.strip(): v for k,v in options.items()}
for key, value in options.items():
self.stream.set(capPropId(key),value)
# separately handle colorspace value to int conversion
if not(colorspace is None):
self.color_space = capPropId(colorspace.strip())
except Exception as e:
# Catch if any error occurred
if logging:
print(e)
#initialize and assign framerate variable
self.framerate = 0
try:
_fps = self.stream.get(cv2.CAP_PROP_FPS)
if _fps>1:
self.framerate = _fps
self.framerate = framerate
#initializing colorspace variable
self.color_space = None
#reformat dict
options = {k.strip(): v for k,v in options.items()}
try:
# apply attributes to source if specified
for key, value in options.items():
setattr(self.camera, key, value)
# separately handle colorspace value to int conversion
if not(colorspace is None):
self.color_space = capPropId(colorspace.strip())
except Exception as e:
# Catch if any error occurred
if logging:
print(e)
# enable rgb capture array thread and capture stream
self.rawCapture = PiRGBArray(self.camera, size=resolution)
self.stream = self.camera.capture_continuous(self.rawCapture,format="bgr", use_video_port=True)
#frame variable initialization
for stream in self.stream:
self.frame = stream.array
self.rawCapture.seek(0)
self.rawCapture.truncate()
break