Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start_new_server(bind='', port=8806, first_size=None, max_size=None,
threads=None, down_rate=None, proxy=None, headers=None, **kwargs):
if first_size:
RangeFetch.first_size = first_size
if max_size:
RangeFetch.max_size = max_size
if threads:
RangeFetch.threads = threads
if down_rate:
RangeFetch.down_rate_min = int(down_rate * 2)
RangeFetch.down_rate_max = RangeFetch.down_rate_min + min(max(down_rate, 1024 * 100), 1024 * 200)
if proxy:
RangeFetch.proxy = proxy
if headers:
RangeFetch.headers.update(headers)
new_server = LocalTCPServer((bind, port), RangeFetchHandler)
RangeFetch.bufsize = RangeFetchHandler.bufsize
thread.start_new_thread(new_server.serve_forever, ())
return new_server
def start_new_server(bind='', port=8806, first_size=None, max_size=None,
threads=None, down_rate=None, proxy=None, headers=None, **kwargs):
if first_size:
RangeFetch.first_size = first_size
if max_size:
RangeFetch.max_size = max_size
if threads:
RangeFetch.threads = threads
if down_rate:
RangeFetch.down_rate_min = int(down_rate * 2)
RangeFetch.down_rate_max = RangeFetch.down_rate_min + min(max(down_rate, 1024 * 100), 1024 * 200)
if proxy:
RangeFetch.proxy = proxy
if headers:
RangeFetch.headers.update(headers)
new_server = LocalTCPServer((bind, port), RangeFetchHandler)
RangeFetch.bufsize = RangeFetchHandler.bufsize
thread.start_new_thread(new_server.serve_forever, ())
return new_server
def start_new_server(bind='', port=8806, first_size=None, max_size=None,
threads=None, down_rate=None, proxy=None, headers=None, **kwargs):
if first_size:
RangeFetch.first_size = first_size
if max_size:
RangeFetch.max_size = max_size
if threads:
RangeFetch.threads = threads
if down_rate:
RangeFetch.down_rate_min = int(down_rate * 2)
RangeFetch.down_rate_max = RangeFetch.down_rate_min + min(max(down_rate, 1024 * 100), 1024 * 200)
if proxy:
RangeFetch.proxy = proxy
if headers:
RangeFetch.headers.update(headers)
new_server = LocalTCPServer((bind, port), RangeFetchHandler)
RangeFetch.bufsize = RangeFetchHandler.bufsize
thread.start_new_thread(new_server.serve_forever, ())
return new_server
threads=None, down_rate=None, proxy=None, headers=None, **kwargs):
if first_size:
RangeFetch.first_size = first_size
if max_size:
RangeFetch.max_size = max_size
if threads:
RangeFetch.threads = threads
if down_rate:
RangeFetch.down_rate_min = int(down_rate * 2)
RangeFetch.down_rate_max = RangeFetch.down_rate_min + min(max(down_rate, 1024 * 100), 1024 * 200)
if proxy:
RangeFetch.proxy = proxy
if headers:
RangeFetch.headers.update(headers)
new_server = LocalTCPServer((bind, port), RangeFetchHandler)
RangeFetch.bufsize = RangeFetchHandler.bufsize
thread.start_new_thread(new_server.serve_forever, ())
return new_server
def start_new_server(bind='', port=8806, first_size=None, max_size=None,
threads=None, down_rate=None, proxy=None, headers=None, **kwargs):
if first_size:
RangeFetch.first_size = first_size
if max_size:
RangeFetch.max_size = max_size
if threads:
RangeFetch.threads = threads
if down_rate:
RangeFetch.down_rate_min = int(down_rate * 2)
RangeFetch.down_rate_max = RangeFetch.down_rate_min + min(max(down_rate, 1024 * 100), 1024 * 200)
if proxy:
RangeFetch.proxy = proxy
if headers:
RangeFetch.headers.update(headers)
new_server = LocalTCPServer((bind, port), RangeFetchHandler)
RangeFetch.bufsize = RangeFetchHandler.bufsize
thread.start_new_thread(new_server.serve_forever, ())
return new_server
def start_new_server(bind='', port=8806, first_size=None, max_size=None,
threads=None, down_rate=None, proxy=None, headers=None, **kwargs):
if first_size:
RangeFetch.first_size = first_size
if max_size:
RangeFetch.max_size = max_size
if threads:
RangeFetch.threads = threads
if down_rate:
RangeFetch.down_rate_min = int(down_rate * 2)
RangeFetch.down_rate_max = RangeFetch.down_rate_min + min(max(down_rate, 1024 * 100), 1024 * 200)
if proxy:
RangeFetch.proxy = proxy
if headers:
RangeFetch.headers.update(headers)
new_server = LocalTCPServer((bind, port), RangeFetchHandler)
RangeFetch.bufsize = RangeFetchHandler.bufsize
thread.start_new_thread(new_server.serve_forever, ())
return new_server
if ('range=' in url_parts.query or
'live=1' in url_parts.query or
'range/' in url_parts.path):
self.send_error(500,
'Range request not be accepted, range fetch can not be finished, url: %s' % self.url)
return
request_range = self.headers.get('Range')
if request_range:
request_range = getbytes(request_range)
range_start, range_end = [int(n) if n else 0 for n in request_range.group(1, 2)]
else:
range_start = range_end = 0
RangeFetch(self, range_start, range_end).fetch()
def start_new_server(bind='', port=8806, first_size=None, max_size=None,
threads=None, down_rate=None, proxy=None, headers=None, **kwargs):
if first_size:
RangeFetch.first_size = first_size
if max_size:
RangeFetch.max_size = max_size
if threads:
RangeFetch.threads = threads
if down_rate:
RangeFetch.down_rate_min = int(down_rate * 2)
RangeFetch.down_rate_max = RangeFetch.down_rate_min + min(max(down_rate, 1024 * 100), 1024 * 200)
if proxy:
RangeFetch.proxy = proxy
if headers:
RangeFetch.headers.update(headers)
new_server = LocalTCPServer((bind, port), RangeFetchHandler)
RangeFetch.bufsize = RangeFetchHandler.bufsize
thread.start_new_thread(new_server.serve_forever, ())
return new_server