Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_wps_request7():
# Process input/ouutput arguments
processid = "wordcount"
textdoc = ComplexDataInput("http://emu.readthedocs.org/en/latest/index.html")
inputs = [("text", textdoc), ]
outputs = [("output", False)]
# Build XML request for WPS process execution, sync request
execution = WPSExecution()
requestElement = execution.buildRequest(processid, inputs, output=outputs, mode=SYNC, lineage=False)
request = etree.tostring(requestElement)
# Compare to cached XML request
_request = open(resource_file('wps_EmuExecuteRequest7.xml'), 'rb').read()
print(request)
assert compare_xml(request, _request) is True
("STATISTICS", "MAXIMUM"),
("STATISTICS", "WEIGHT_SUM"),
("STATISTICS", "VARIANCE"),
("STATISTICS", "STD_DEV"),
("STATISTICS", "COUNT"),
("GROUP_BY", "STATISTIC"),
("SUMMARIZE_TIMESTEP", "true"),
("SUMMARIZE_FEATURE_ATTRIBUTE", "true"),
("FEATURE_COLLECTION", featureCollection)
]
output = "OUTPUT"
# build XML request for WPS process execution
execution = WPSExecution()
requestElement = execution.buildRequest(processid, inputs, output=[(output, True)])
request = etree.tostring(requestElement)
# Compare to cached XML request
_request = open(resource_file('wps_USGSExecuteRequest4.xml'), 'rb').read()
assert compare_xml(request, _request) is True
if verify and not auth.verify:
auth.verify = verify
else:
auth = Authentication(username, password, cert, verify)
if auth.username and auth.password:
rkwargs['auth'] = (auth.username, auth.password)
rkwargs['cert'] = auth.cert
rkwargs['verify'] = auth.verify
# FIXUP for WFS in particular, remove xml style namespace
# @TODO does this belong here?
method = method.split("}")[-1]
if method.lower() == 'post':
try:
etree.fromstring(data)
headers['Content-Type'] = 'text/xml'
except (ParseError, UnicodeEncodeError):
pass
rkwargs['data'] = data
elif method.lower() == 'get':
rkwargs['params'] = data
else:
raise ValueError("Unknown method ('%s'), expected 'get' or 'post'" % method)
if cookies is not None:
rkwargs['cookies'] = cookies
req = requests.request(method.upper(), url_base, headers=headers, **rkwargs)
def axml_resource(d):
"""
encodes an OwcResource as dict into atom xml tree
:param d:
:return:
"""
entry = etree.Element("entry", nsmap=ns)
etree.SubElement(entry, "id").text = d['id']
geospatial_extent = extract_p('geometry', d, None)
if geospatial_extent is not None:
try:
gml = etree.fromstring(geospatial_extent)
georss = etree.SubElement(entry, ns_elem("georss", "where"))
georss.append(gml)
except Exception as ex:
log.warn('could encode geometry into georss:where', ex)
pass
title = d['properties']['title']
if title is not None: etree.SubElement(entry, "title").text = title
subtitle = extract_p('properties.abstract', d, None)
# <content type="text">
if subtitle is not None: etree.SubElement(entry, "content").text = subtitle
update_date = extract_p('properties.updated', d, None)
if update_date is not None: etree.SubElement(entry, "updated").text = update_date
</content>
def axml_author(d):
# ..
#
#
if is_empty(d):
return None
else:
try:
author = etree.Element("author", nsmap=ns)
name = extract_p('name', d, None)
if name is not None: etree.SubElement(author, "name").text = name
email = extract_p('email', d, None)
if email is not None: etree.SubElement(author, "email").text = email
uri = extract_p('uri', d, None)
if uri is not None: etree.SubElement(author, "uri").text = uri
return author
except Exception as ex:
log.warn('could encode author', ex)
return None
x for x in filter(
list,
([pv.get('url')
for const in pv.get('constraints')
if 'kvp' in [x.lower() for x in const.values]]
for pv in get_verbs if pv.get('constraints'))))[0]
elif len(get_verbs) == 1:
base_url = get_verbs[0].get('url')
except StopIteration:
pass
u = openURL(base_url, data, headers=self.headers, auth=self.auth, timeout=self.timeout)
# check for service exceptions, and return
if u.info()['Content-Type'] == 'application/vnd.ogc.se_xml':
se_xml = u.read()
se_tree = etree.fromstring(se_xml)
err_message = str(se_tree.find('ServiceException').text)
raise ServiceException(err_message.strip(), se_xml)
return u
def _setconstraint(self, parent, qtype=None, propertyname='csw:AnyText', keywords=[], bbox=None, cql=None, identifier=None):
if keywords or bbox is not None or qtype is not None or cql is not None or identifier is not None:
node0 = etree.SubElement(parent, util.nspath_eval('csw:Constraint', namespaces))
node0.set('version', '1.1.0')
if identifier is not None: # set identifier filter, overrides all other parameters
flt = fes.FilterRequest()
node0.append(flt.set(identifier=identifier))
elif cql is not None: # send raw CQL query
# CQL passed, overrides all other parameters
node1 = etree.SubElement(node0, util.nspath_eval('csw:CqlText', namespaces))
node1.text = cql
else: # construct a Filter request
flt = fes.FilterRequest()
node0.append(flt.set(qtype=qtype, keywords=keywords, propertyname=propertyname,bbox=bbox))
def gmlas_config(self):
path = self.gmlasConfigLineEdit.text()
if path == '':
raise InputError(self.tr("You must select a GMLAS config file"))
xmlConfig = etree.parse(self.gmlasConfigLineEdit.text())
# Set parameters
c = xmlConfig.getroot()
for l in c.iter('ExposeMetadataLayers'):
l.text = str(self.ogrExposeMetadataLayersCheckbox.isChecked()).lower()
for l in c.iter('LayerBuildingRules'):
for n in l.iter('RemoveUnusedLayers'):
n.text = str(self.ogrRemoveUnusedLayersCheckbox.isChecked()).lower()
for n in l.iter('RemoveUnusedFields'):
n.text = str(self.ogrRemoveUnusedFieldsCheckbox.isChecked()).lower()
for l in c.findall("XLinkResolution/URLSpecificResolution/HTTPHeader"):
name = l.find('Name').text
if name == 'Accept-Language':
l.find('Value').text = self.acceptLanguageHeaderInput.text()
----------
- parent: parent etree.Element object
- propertyname: the PropertyName
- literal: the Literal value
- wildcard: the wildCard character (default is '%')
- singlechar: the singleChar character (default is '_')
- escapechar: the escapeChar character (default is '\')
"""
tmp = etree.SubElement(parent, util.nspath_eval('ogc:PropertyIsLike', namespaces))
tmp.set('wildCard', wildcard)
tmp.set('singleChar', singlechar)
tmp.set('escapeChar', escapechar)
etree.SubElement(tmp, util.nspath_eval('ogc:PropertyName', namespaces)).text = propertyname
etree.SubElement(tmp, util.nspath_eval('ogc:Literal', namespaces)).text = literal