Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# full HTTP request url
request_url = build_get_url(url, data, overwrite=True)
# split URL into base url and query string to use utility function
spliturl = request_url.split('?')
u = openURL(spliturl[0], spliturl[
1], method='Get', username=username, password=password,
headers=headers, verify=verify, cert=cert, timeout=self.timeout)
return etree.fromstring(
elif method == 'Post':
u = openURL(url, data, method='Post',
username=username, password=password,
headers=headers, verify=verify, cert=cert, timeout=timeout)
return etree.fromstring(
raise Exception("Unrecognized HTTP method: %s" % method)
def getCoverages(self, unpackdir='./unpacked'):
if self.urlType=='XML':
u_xml =
u_tree = etree.fromstring(u_xml)
for ref in u_tree.findall('{}Coverage/{}Reference'):
path = ref.attrib['{}href']
for ref in u_tree.findall('{}Coverage/{{}Reference'):
path = ref.attrib['{}href']
elif self.urlType=='Multipart':
#Decode multipart mime and return fileobjects
mpart =MpartMime(u_mpart)
paths= mpart.unpackToDir(unpackdir)
return paths
def read(self, service_url):
"""Get and parse a WCS capabilities document, returning an
elementtree tree
@type service_url: string
@param service_url: The base url, to which is appended the service,
version, and request parameters
@rtype: elementtree tree
@return: An elementtree tree representation of the capabilities document
request = self.capabilities_url(service_url)
req = Request(request)
if self.cookies is not None:
req.add_header('Cookie', self.cookies)
u = urlopen(req)
return etree.fromstring(
request['temporalFilter'] = eventTime
url_kwargs = {}
if 'timeout' in kwargs:
url_kwargs['timeout'] = kwargs.pop('timeout') # Client specified timeout value
if procedure is not None:
request['procedure'] = procedure
if kwargs:
for kw in kwargs:
request[kw] = kwargs[kw]
response = openURL(base_url, request, method, username=self.username, password=self.password, **url_kwargs).read()
tr = etree.fromstring(response)
if tr.tag == nspath_eval("ows:ExceptionReport", namespaces):
raise ows.ExceptionReport(tr)
return response
except ows.ExceptionReport:
except BaseException:
return response
def read(self, url, timeout=30):
"""Get and parse a WFS capabilities document, returning an
instance of WFSCapabilitiesInfoset
url : string
The URL to the WFS capabilities document.
timeout : number
A timeout value (in seconds) for the request.
request = self.capabilities_url(url)
u = openURL(request, timeout=timeout,
username=self.username, password=self.password)
return etree.fromstring(
# We're going to assume that anything with a content-length > 32k
# is data. We'll check anything smaller.
if 'Content-Length' in
length = int(['Content-Length'])
have_read = False
data =
have_read = True
length = len(data)
if length < 32000:
if not have_read:
data =
tree = etree.fromstring(data)
except BaseException:
# Not XML
return self._makeStringIO(data)
if tree.tag == "{%s}ServiceExceptionReport" % OGC_NAMESPACE:
se = tree.find(nspath('ServiceException', OGC_NAMESPACE))
raise ServiceException(str(se.text).strip())
return self._makeStringIO(data)
if have_read:
return self._makeStringIO(data)
return u
if cookies is not None:
rkwargs['cookies'] = cookies
req = requests.request(method.upper(), url_base, headers=headers, **rkwargs)
if req.status_code in [400, 401]:
raise ServiceException(req.text)
if req.status_code in [404, 500, 502, 503, 504]: # add more if needed
# check for service exceptions without the http header set
if 'Content-Type' in req.headers and \
req.headers['Content-Type'] in ['text/xml', 'application/xml', 'application/vnd.ogc.se_xml']:
# just in case 400 headers were not set, going to have to read the xml to see if it's an exception report.
se_tree = etree.fromstring(req.content)
# to handle the variety of namespaces and terms across services
# and versions, especially for "legacy" responses like WMS 1.3.0
possible_errors = [
for possible_error in possible_errors:
serviceException = se_tree.find(possible_error)
if serviceException is not None:
# and we need to deal with some message nesting
raise ServiceException('\n'.join([t.strip() for t in serviceException.itertext() if t.strip()]))
execution = WPSExecution(version=self.version, url=self.url,
username=self.username, password=self.password, verbose=self.verbose,
headers=self.headers, verify=self.verify, cert=self.cert)
# build XML request from parameters
if request is None:
requestElement = execution.buildRequest(identifier, inputs, output, mode=mode, lineage=lineage)
request = etree.tostring(requestElement)
execution.request = request
# submit the request to the live server
if response is None:
response = execution.submitRequest(request)
response = etree.fromstring(response)
# parse response
return execution
def parse_remote_metadata(self, timeout=30):
"""Parse remote metadata for MetadataURL of format 'XML' and add it as metadataUrl['metadata']"""
for metadataUrl in self.metadataUrls:
if metadataUrl['url'] is not None \
and metadataUrl['format'].lower() == 'xml':
content = openURL(metadataUrl['url'], timeout=timeout)
doc = etree.fromstring(
if metadataUrl['type'] == 'FGDC':
mdelem = doc.find('.//metadata')
if mdelem is not None:
metadataUrl['metadata'] = Metadata(mdelem)
metadataUrl['metadata'] = None
elif metadataUrl['type'] == 'TC211':
mdelem = doc.find('.//' + util.nspath_eval('gmd:MD_Metadata', n.get_namespaces(['gmd']))) \
or doc.find('.//' + util.nspath_eval('gmi:MI_Metadata', n.get_namespaces(['gmi'])))
if mdelem is not None:
metadataUrl['metadata'] = MD_Metadata(mdelem)
metadataUrl['metadata'] = None
except Exception:
metadataUrl['metadata'] = None