Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _postUrl(self, url,
parameters={},
headers={},
extrasleep=None,
usecache=True):
'''
When should cache be cleared or not used? logins...
extrasleep is primarily for ffnet adapter which has extra
sleeps. Passed into fetchs so it can be bypassed when
cache hits.
'''
url = quote_plus(ensure_binary(url),safe=';/?:@&=+$,%&#')
if self.getConfig('force_https'): ## For developer testing only.
url = url.replace("http:","https:")
cachekey=self._get_cachekey(url, parameters, headers)
if usecache and self._has_cachekey(cachekey) and not cachekey.startswith('file:'):
logger.debug("#####################################\npagecache(POST) HIT: %s"%safe_url(cachekey))
data,redirecturl = self._get_from_pagecache(cachekey)
return data
logger.debug("#####################################\npagecache(POST) MISS: %s"%safe_url(cachekey))
if not cachekey.startswith('file:'): # don't sleep for file: URLs.
self.do_sleep(extrasleep)
## Request assumes POST when data!=None. Also assumes data
## is application/x-www-form-urlencoded.
if 'Content-type' not in headers:
def _ConvertStringToFile(self, html_data, out):
html = HtmlProcessor(html_data)
data = ensure_binary(html.CleanHtml())
# collect offsets of '' tags, use to make index list.
# indexlist = [] # list of (offset,length) tuples.
# not in current use.
# j=0
# lastj=0
# while True:
# j=data.find('',lastj+10) # plus a bit so we find the next.
# if j < 0:
# break
# indexlist.append((lastj,j-lastj))
# print "index offset: %d length: %d" % (lastj,j-lastj)
# lastj=j
records = []
def _ReplaceAnchorStubs(self):
# TODO: Browsers allow extra whitespace in the href names.
assembled_text = ensure_binary(unicode(self._soup))
# html5lib/bs4 creates close tags for
assembled_text = assembled_text.replace(b'',b'')
assembled_text = assembled_text.replace(b'',b'')
del self._soup # shouldn't touch this anymore
for anchor_num, original_ref in self._anchor_references:
ref = unquote(original_ref[1:]) # remove leading '#'
# Find the position of ref in the utf-8 document.
# TODO(chatham): Using regexes and looking for name= would be better.
newpos = assembled_text.find(b'name="'+ensure_binary(ref)) # .encode('utf-8')
if newpos == -1:
logger.warn('Could not find anchor "%s"' % original_ref)
continue
# instead of somewhere slightly *after* the <a> tag pointed to,
# let's go right in front of it instead by looking for the page
# break before it.
newpos = assembled_text.rfind(b'<',0,newpos)
# logger.debug("Anchor Pos: %s %s '%s|%s'"%((anchor_num, newpos,assembled_text[newpos-15:newpos],assembled_text[newpos:newpos+15])))
old_filepos = b'filepos="%.10d"' % anchor_num
new_filepos = b'filepos="%.10d"' % newpos
assert assembled_text.find(old_filepos) != -1
assembled_text = assembled_text.replace(old_filepos, new_filepos, 1)
return assembled_text
</a>
def _fetchUrlRawOpened(self, url,
parameters=None,
extrasleep=None,
usecache=True,
referer=None):
'''
When should cache be cleared or not used? logins...
extrasleep is primarily for ffnet adapter which has extra
sleeps. Passed into fetchs so it can be bypassed when
cache hits.
'''
url = quote_plus(ensure_binary(url),safe=';/?:@&=+$,%&')
if self.getConfig('force_https'): ## For developer testing only.
url = url.replace("http:","https:")
cachekey=self._get_cachekey(url, parameters)
if usecache and self._has_cachekey(cachekey) and not cachekey.startswith('file:'):
logger.debug("#####################################\npagecache(GET) HIT: %s"%safe_url(cachekey))
data,redirecturl = self._get_from_pagecache(cachekey)
class FakeOpened:
def __init__(self,data,url):
self.data=data
self.url=url
def geturl(self): return self.url
def read(self): return self.data
return (data,FakeOpened(data,redirecturl))
logger.debug("#####################################\npagecache(GET) MISS: %s"%safe_url(cachekey))
## Could have defaulted to "" instead, but this way it's
## not present at all
headers.append(('Referer',referer))
# logger.debug("GET http login for SB xf2test %s"%url)
# if "xf2test" in url:
# import base64
# base64string = base64.encodestring(b"sbreview2019:Fs2PwuVE9").replace(b'\n', b'')
# headers.append(('Authorization',b"Basic %s" % base64string))
# logger.debug("http login for SB xf2test")
self.opener.addheaders = headers
if parameters != None:
opened = self.opener.open(url,
ensure_binary(urlencode(parameters)),
float(self.getConfig('connect_timeout',30.0)))
else:
opened = self.opener.open(url,
None,
float(self.getConfig('connect_timeout',30.0)))
self._progressbar()
data = opened.read()
## postURL saves data to the pagecache *after* _decode() while
## fetchRaw saves it *before* _decode()--because raw.
self._set_to_pagecache(cachekey,data,opened.url)
return (data,opened)
## Could have defaulted to "" instead, but this way it's
## not present at all
headers.append(('Referer',referer))
# logger.debug("GET http login for SB xf2test %s"%url)
# if "xf2test" in url:
# import base64
# base64string = base64.encodestring(b"sbreview2019:Fs2PwuVE9").replace(b'\n', b'')
# headers.append(('Authorization',b"Basic %s" % base64string))
# logger.debug("http login for SB xf2test")
self.opener.addheaders = headers
if parameters != None:
opened = self.opener.open(url,
ensure_binary(urlencode(parameters)),
float(self.getConfig('connect_timeout',30.0)))
else:
opened = self.opener.open(url,
None,
float(self.getConfig('connect_timeout',30.0)))
self._progressbar()
data = opened.read()
## postURL saves data to the pagecache *after* _decode() while
## fetchRaw saves it *before* _decode()--because raw.
self._set_to_pagecache(cachekey,data,opened.url)
return (data,opened)
def _fetchUrlRawOpened(self, url,
parameters=None,
extrasleep=None,
usecache=True,
referer=None):
'''
When should cache be cleared or not used? logins...
extrasleep is primarily for ffnet adapter which has extra
sleeps. Passed into fetchs so it can be bypassed when
cache hits.
'''
url = quote_plus(ensure_binary(url),safe=';/?:@&=+$,%&#')
if self.getConfig('force_https'): ## For developer testing only.
url = url.replace("http:","https:")
cachekey=self._get_cachekey(url, parameters)
if usecache and self._has_cachekey(cachekey) and not cachekey.startswith('file:'):
logger.debug("#####################################\npagecache(GET) HIT: %s"%safe_url(cachekey))
data,redirecturl = self._get_from_pagecache(cachekey)
class FakeOpened:
def __init__(self,data,url):
self.data=data
self.url=url
def geturl(self): return self.url
def read(self): return self.data
return (data,FakeOpened(data,redirecturl))
logger.debug("#####################################\npagecache(GET) MISS: %s"%safe_url(cachekey))
# title = html.title
# if title:
# self._header.SetTitle(title)
record_id = 1
# logger.debug("len(data):%s"%len(data))
for start_pos in range(0, len(data), Record.MAX_SIZE):
end = min(len(data), start_pos + Record.MAX_SIZE)
record_data = data[start_pos:end]
records.append(self._header.AddRecord(record_data, record_id))
# logger.debug("HTML Record %03d: (size:%d) [[%s ... %s]]" % ( record_id, len(record_data), record_data[:20], record_data[-20:] ))
record_id += 1
self._header.SetImageRecordIndex(record_id)
records[0:0] = [self._header.MobiHeader()]
header, rec_offset = self._header.PDBHeader(len(records))
out.write(ensure_binary(header))
for record in records:
record.WriteHeader(out, rec_offset)
# logger.debug("rec_offset: %d len(record.data): %d" % (rec_offset,len(record.data)))
rec_offset += (len(record.data)+1) # plus one for trailing null
# Write to nuls for some reason
out.write(b'\0\0')
for record in records:
record.WriteData(out)
out.write(b'\0')
# needs a trailing null, I believe it indicates zero length 'overlap'.
def _ReplaceAnchorStubs(self):
# TODO: Browsers allow extra whitespace in the href names.
assembled_text = ensure_binary(unicode(self._soup))
# html5lib/bs4 creates close tags for
assembled_text = assembled_text.replace(b'',b'')
assembled_text = assembled_text.replace(b'',b'')
del self._soup # shouldn't touch this anymore
for anchor_num, original_ref in self._anchor_references:
ref = unquote(original_ref[1:]) # remove leading '#'
# Find the position of ref in the utf-8 document.
# TODO(chatham): Using regexes and looking for name= would be better.
newpos = assembled_text.find(b'name="'+ensure_binary(ref)) # .encode('utf-8')
if newpos == -1:
logger.warn('Could not find anchor "%s"' % original_ref)
continue
# instead of somewhere slightly *after* the <a> tag pointed to,
# let's go right in front of it instead by looking for the page
# break before it.</a>