Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handler(event, context):
evt = json.loads(event)
oss_bucket_name = evt["bucket_name"]
object_key = evt["object_key"]
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId,
creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
object_url = oss_client.sign_url('GET', object_key, 15 * 60)
cmd = ["/code/ffprobe", "-show_entries", "format=duration",
"-v", "quiet", "-of", "csv", "-i", object_url]
raw_result = subprocess.check_output(cmd)
result = raw_result.decode().replace("\n", "").strip().split(",")[1]
duration = float(result)
return duration
def uploadBytes(self, bytes):
credentials = self._getCredential();
if credentials == None:
raise RuntimeError('can not get upload credentials')
auth = oss2.StsAuth(self._credentials.get_AccessKeyId(), self._credentials.get_AccessKeySecret(),
self._credentials.get_SecurityToken())
bucket = oss2.Bucket(auth, self._credentials.get_OssEndpoint(), self._credentials.get_UploadBucket())
objectKey = self._credentials.get_UploadFolder() + '/' + self._fileType + '/' + str(uuid.uuid1()) + '.jpg'
bucket.put_object(objectKey, bytes)
return 'oss://' + self._credentials.get_UploadBucket() + '/' + objectKey
ss = evt.get("start", 0)
ss = str(ss)
t = evt.get("duration")
if t:
t = str(t)
itsoffset = evt.get("itsoffset", 0)
itsoffset = str(itsoffset)
scale = evt.get("scale", "-1:-1")
interval = str(evt.get("interval", 1))
padding = str(evt.get("padding", 0))
color = str(evt.get("color", "black"))
dst_type = str(evt.get("dst_type", "jpg"))
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId,
creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
input_path = oss_client.sign_url('GET', object_key, 3600)
fileDir, shortname, extension = get_fileNameExt(object_key)
cmd = ['/code/ffmpeg', '-ss', ss, '-itsoffset', itsoffset, '-y', '-i', input_path,
'-f', 'image2', '-vf', "fps=1/{0},scale={1},tile={2}:padding={3}:color={4}".format(
interval, scale, tile, padding, color),
'/tmp/{0}%d.{1}'.format(shortname, dst_type)]
if t:
cmd = ['/code/ffmpeg', '-ss', ss, '-itsoffset', itsoffset, '-t', t, '-y', '-i', input_path,
'-f', 'image2', '-vf', "fps=1/{0},scale={1},tile={2}:padding={3}:color={4}".format(
interval, scale, tile, padding, color),
def handler(event, context):
LOGGER.info(event)
evt = json.loads(event)
oss_bucket_name = evt["bucket_name"]
object_key = evt["object_key"]
output_dir = evt["output_dir"]
dst_type = evt["dst_type"]
ac = evt.get("ac")
ar = evt.get("ar")
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId,
creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
input_path = oss_client.sign_url('GET', object_key, 3600)
fileDir, shortname, extension = get_fileNameExt(object_key)
cmd = ['/code/ffmpeg', '-i', input_path, '/tmp/{0}{1}'.format(shortname, dst_type)]
if ac:
if ar:
cmd = ['/code/ffmpeg', '-i', input_path, "-ac", str(ac), "-ar", str(ar), '/tmp/{0}{1}'.format(shortname, dst_type)]
else:
cmd = ['/code/ffmpeg', '-i', input_path, "-ac", str(ac), '/tmp/{0}{1}'.format(shortname, dst_type)]
else:
if ar:
cmd = ['/code/ffmpeg', '-i', input_path, "-ar", str(ar), '/tmp/{0}{1}'.format(shortname, dst_type)]
def handler(event, context):
LOGGER.info(event)
evt = json.loads(event)
oss_bucket_name = evt["bucket_name"]
object_key = evt["object_key"]
output_dir = evt["output_dir"]
vf_args = evt.get("vf_args", "")
filter_complex_args = evt.get("filter_complex_args")
if not (vf_args or filter_complex_args):
assert "at least one of 'vf_args' and 'filter_complex_args' has value"
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId,
creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
input_path = oss_client.sign_url('GET', object_key, 3600)
fileDir, shortname, extension = get_fileNameExt(object_key)
dst_video_path = os.path.join("/tmp", "watermark_" + shortname + extension)
cmd = ["/code/ffmpeg", "-y", "-i", input_path,
"-vf", vf_args, dst_video_path]
if filter_complex_args: # gif
cmd = ["/code/ffmpeg", "-y", "-i", input_path, "-ignore_loop", "0",
"-i", "/code/logo.gif", "-filter_complex", filter_complex_args, dst_video_path]
LOGGER.info("cmd = {}".format(" ".join(cmd)))
def handler(event, context):
evt = json.loads(event)
video_key = evt['video_key']
oss_bucket_name = evt['oss_bucket_name']
segment_time_seconds = str(evt['segment_time_seconds'])
shortname, extension = get_fileNameExt(video_key)
video_name = shortname + extension
video_proc_dir = NAS_ROOT + context.request_id
os.mkdir(video_proc_dir)
os.system("chmod -R 777 " + video_proc_dir)
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
input_path = os.path.join(video_proc_dir, video_name)
obj = oss_client.get_object_to_file(video_key, input_path)
video_duration = getVideoDuration(input_path)
segment_time_seconds = int(segment_time_seconds)
split_num = math.ceil(video_duration/segment_time_seconds)
# adjust segment_time_seconds
if split_num > MAX_SPLIT_NUM:
segment_time_seconds = int(math.ceil(video_duration/MAX_SPLIT_NUM)) + 1
segment_time_seconds = str(segment_time_seconds)
exec_FFmpeg_cmd([FFMPEG_BIN, '-i', input_path, "-c", "copy", "-f", "segment", "-segment_time",
segment_time_seconds, "-reset_timestamps", "1", video_proc_dir + "/split_" + shortname + '_piece_%02d' + extension])
def handler(event, context):
LOGGER.info(event)
evt = json.loads(event)
oss_bucket_name = evt["bucket_name"]
object_key = evt["object_key"]
output_dir = evt["output_dir"]
vframes = evt.get("vframes")
if vframes:
vframes = str(vframes)
ss = evt.get("start", 0)
ss = str(ss)
t = evt.get("duration")
if t:
t = str(t)
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId,
creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
input_path = oss_client.sign_url('GET', object_key, 3600)
fileDir, shortname, extension = get_fileNameExt(object_key)
gif_path = os.path.join("/tmp", shortname + ".gif")
cmd = ["/code/ffmpeg", "-y", "-ss", ss, "-accurate_seek",
"-i", input_path, "-pix_fmt", "rgb24", gif_path]
if t:
cmd = ["/code/ffmpeg", "-y", "-ss", ss, "-t", t, "-accurate_seek",
"-i", input_path, "-pix_fmt", "rgb24", gif_path]
else:
if vframes:
cmd = ["/code/ffmpeg", "-y", "-ss", ss, "-accurate_seek", "-i",
def handler(event, context):
evt = json.loads(event)
video_key = evt['video_key']
output_prefix = evt['output_prefix']
oss_bucket_name = evt['oss_bucket_name']
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
video_proc_dir = NAS_ROOT + context.request_id
os.system("mkdir -p {}".format(video_proc_dir))
shortname, extension = get_fileNameExt(video_key)
video_name = shortname + extension
input_path = os.path.join(video_proc_dir, video_name)
obj = oss_client.get_object_to_file(video_key, input_path)
mp3_filename = '{}.mp3'.format(shortname)
mp3_filepath = video_proc_dir + "/" + mp3_filename
mp3_key = os.path.join(output_prefix, shortname, mp3_filename)
if os.path.exists(mp3_filepath):
os.remove(mp3_filepath)