Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
args = arg_parser()
handlers = []
if args.timeout:
socket.setdefaulttimeout(args.timeout)
if args.user_agent:
fake_headers['User-Agent'] = args.user_agent
if args.http_proxy:
proxy_handler = ProxyHandler({
'http': args.http_proxy,
'https': args.http_proxy
})
handlers.append(proxy_handler)
elif args.socks_proxy:
try:
import socks
addr, port = args.socks_proxy.split(':')
socks.set_default_proxy(socks.SOCKS5, addr, int(port))
socket.socket = socks.socksocket
except:
print('Failed to set socks5 proxy. Please install PySocks.', file=sys.stderr)
def load_m3u8_playlist(url):
stream_types = []
streams = {}
m = m3u8.load(url, headers=fake_headers).playlists
for l in m:
stream_types.append(str(l.stream_info.bandwidth))
streams[str(l.stream_info.bandwidth)] = {'container': 'm3u8', 'video_profile': str(l.stream_info.bandwidth), 'src' : [l.absolute_uri], 'size': 0}
stream_types.sort()
return stream_types, streams
def launch_ffmpeg_download(url, name, live):
print('Now downloading: %s' % name)
if live:
print('stop downloading by press \'q\'')
cmd = ['ffmpeg', '-headers', ''.join('%s: %s\r\n' % x for x in fake_headers.items()), '-y']
url = encode_for_wrap(url)
if os.path.isfile(url):
cmd += ['-protocol_whitelist', 'file,http,https,tls,rtp,tcp,udp,crypto,httpproxy']
cmd += ['-i', url, '-c', 'copy', '-bsf:a', 'aac_adtstoasc', '-hide_banner', name]
subprocess.call(cmd)
def load_m3u8(url):
urls = []
m = m3u8.load(url, headers=fake_headers)
for seg in m.segments:
urls.append(seg.absolute_uri)
return urls
def prepare(self):
info = VideoInfo(self.name)
info.extra['referer'] = 'https://www.bilibili.com/'
info.extra['ua'] = fake_headers['User-Agent']
self.vid, info.title, info.artist = self.get_page_info()
assert self.vid, "can't play this video: {}".format(self.url)
def get_video_info(qn=0):
# need login with "qn=112"
if int(qn) > 80:
return
api_url = self.get_api_url(qn)
html = get_content(api_url)
self.logger.debug('HTML> ' + html)
code = match1(html, '<code>([^<])')
if code:
return</code>
from logging import getLogger
from ykdl.compact import (
Queue, thread, urlsplit,
BaseHTTPRequestHandler, SocketServer
)
from ykdl.util.html import fake_headers as _fake_headers
import urllib3
import re
import socket
from time import time, sleep
logger = getLogger('RangeFetch')
fake_headers = _fake_headers.copy()
# Set 'keep-alive'
fake_headers['Connection'] = 'keep-alive'
del fake_headers['Accept-Encoding']
class LocalTCPServer(SocketServer.ThreadingTCPServer):
request_queue_size = 2
allow_reuse_address = True
def server_bind(self):
sock = self.socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
self.RequestHandlerClass.bufsize = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
SocketServer.TCPServer.server_bind(self)