Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Note the removal of the fragment ID. This is necessary, per the HTTP spec
url = name.split('#')[0]
if socket.getfqdn().endswith('.w3.org'):
import checkremote
checkremote.check_url_safety(url)
if 'Accept' not in additional_headers:
additional_headers['Accept'] = 'text/html, application/xhtml+xml'
import requests
r = requests.get(url, headers=additional_headers)
self.data = r.content
self.headers = r.headers
if URIOpener.CONTENT_TYPE in self.headers :
# The call below will remove the possible media type parameters, like charset settings
ct = content_type(self.headers[URIOpener.CONTENT_TYPE])
self.content_type = ct.media_type
if 'charset' in ct.parmdict :
self.charset = ct.parmdict['charset']
else :
self.charset = None
# print
else :
# check if the suffix can be used for the content type; this may be important
# for file:// type URI or if the server is not properly set up to return the right
# mime type
self.charset = None
self.content_type = ""
for suffix in preferred_suffixes.keys() :
if name.endswith(suffix) :
self.content_type = preferred_suffixes[suffix]
break
self.charset = ct.parmdict['charset']
else :
self.charset = None
# print
else :
# check if the suffix can be used for the content type; this may be important
# for file:// type URI or if the server is not properly set up to return the right
# mime type
self.charset = None
self.content_type = ""
for suffix in preferred_suffixes.keys() :
if name.endswith(suffix) :
self.content_type = preferred_suffixes[suffix]
break
if URIOpener.CONTENT_LOCATION in self.headers :
self.location = urljoin(r.url,self.headers[URIOpener.CONTENT_LOCATION])
else :
self.location = name
self.expiration_date = datetime.datetime.utcnow() + datetime.timedelta(days=1)
if URIOpener.EXPIRES in self.headers :
try :
# Thanks to Deron Meranda for the HTTP date conversion method...
self.expiration_date = parse_http_datetime(self.headers[URIOpener.EXPIRES])
except :
# The Expires date format was wrong, sorry, forget it...
pass
self.last_modified_date = None
if URIOpener.LAST_MODIFIED in self.headers :
try :
@type name: string or a file-like object
@return: a file like object if opening "name" is possible and successful, "name" otherwise
"""
try :
# Python 2 branch
isstring = isinstance(name, basestring)
except :
# Python 3 branch
isstring = isinstance(name, str)
try :
if isstring :
# check if this is a URI, ie, if there is a valid 'scheme' part
# otherwise it is considered to be a simple file
if urlparse(name)[0] != "" :
url_request = URIOpener(name)
self.base = url_request.location
if self.media_type == "" :
if url_request.content_type in content_to_host_language :
self.media_type = url_request.content_type
else :
self.media_type = MediaTypes.xml
self.options.set_host_language(self.media_type)
self.charset = url_request.charset
if self.required_base == None :
self.required_base = name
return url_request.data
else :
# Creating a File URI for this thing
if self.required_base == None :
self.required_base = "file://" + os.path.join(os.getcwd(),name)
if self.media_type == "" :
@param options: used as a place where warnings can be sent
@param newCache: in case this is used with caching, whether a new cache is generated; that modifies the warning text
@return: A tuple consisting of an RDFLib Graph instance and an expiration date); None if the dereferencing or the parsing was unsuccessful
"""
def return_to_cache(msg) :
if newCache :
options.add_warning(err_unreachable_vocab % uri, warning_type=VocabReferenceError)
else :
options.add_warning(err_outdated_cache % uri, warning_type=VocabReferenceError)
retval = None
expiration_date = None
content = None
try :
content = URIOpener(uri,
{'Accept' : 'text/html;q=0.8, application/xhtml+xml;q=0.8, text/turtle;q=1.0, application/rdf+xml;q=0.9'})
except HTTPError :
(type,value,traceback) = sys.exc_info()
return_to_cache(value)
return (None,None)
except RDFaError :
(type,value,traceback) = sys.exc_info()
return_to_cache(value)
return (None,None)
except Exception :
(type,value,traceback) = sys.exc_info()
return_to_cache(value)
return (None,None)
# Store the expiration date of the newly accessed data
expiration_date = content.expiration_date
if URIOpener.CONTENT_LOCATION in self.headers :
self.location = urljoin(r.url,self.headers[URIOpener.CONTENT_LOCATION])
else :
self.location = name
self.expiration_date = datetime.datetime.utcnow() + datetime.timedelta(days=1)
if URIOpener.EXPIRES in self.headers :
try :
# Thanks to Deron Meranda for the HTTP date conversion method...
self.expiration_date = parse_http_datetime(self.headers[URIOpener.EXPIRES])
except :
# The Expires date format was wrong, sorry, forget it...
pass
self.last_modified_date = None
if URIOpener.LAST_MODIFIED in self.headers :
try :
# Thanks to Deron Meranda for the HTTP date conversion method...
self.last_modified_date = parse_http_datetime(self.headers[URIOpener.LAST_MODIFIED])
except :
# The last modified date format was wrong, sorry, forget it...
pass
except urllib_HTTPError :
e = sys.exc_info()[1]
from . import HTTPError
msg = BaseHTTPRequestHandler.responses[e.code]
raise HTTPError('%s' % msg[1], e.code)
except Exception :
e = sys.exc_info()[1]
from . import RDFaError
raise RDFaError('%s' % e)
self.content_type = ""
for suffix in preferred_suffixes.keys() :
if name.endswith(suffix) :
self.content_type = preferred_suffixes[suffix]
break
if URIOpener.CONTENT_LOCATION in self.headers :
self.location = urljoin(r.url,self.headers[URIOpener.CONTENT_LOCATION])
else :
self.location = name
self.expiration_date = datetime.datetime.utcnow() + datetime.timedelta(days=1)
if URIOpener.EXPIRES in self.headers :
try :
# Thanks to Deron Meranda for the HTTP date conversion method...
self.expiration_date = parse_http_datetime(self.headers[URIOpener.EXPIRES])
except :
# The Expires date format was wrong, sorry, forget it...
pass
self.last_modified_date = None
if URIOpener.LAST_MODIFIED in self.headers :
try :
# Thanks to Deron Meranda for the HTTP date conversion method...
self.last_modified_date = parse_http_datetime(self.headers[URIOpener.LAST_MODIFIED])
except :
# The last modified date format was wrong, sorry, forget it...
pass
except urllib_HTTPError :
e = sys.exc_info()[1]
from . import HTTPError