Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, reference=None, values_form=None):
self.reference = reference
self.values_form = values_form
if not self.reference:
raise InvalidParameterValue("values reference is missing.")
wpsrequest.store_execute = response_document[
0].attrib.get('storeExecuteResponse', 'false')
wpsrequest.status = response_document[
0].attrib.get('status', 'false')
if tagname == self.WPS.GetCapabilities().tag:
self.operation = 'getcapabilities'
return parse_post_getcapabilities
elif tagname == self.WPS.DescribeProcess().tag:
self.operation = 'describeprocess'
return parse_post_describeprocess
elif tagname == self.WPS.Execute().tag:
self.operation = 'execute'
return parse_post_execute
else:
raise InvalidParameterValue(
'Unknown request %r' % tagname, 'request')
def _get_request(self):
"""HTTP GET request parser
"""
# service shall be WPS
service = _get_get_param(self.http_request, 'service')
if service:
if str(service).lower() != 'wps':
raise InvalidParameterValue(
'parameter SERVICE [%s] not supported' % service, 'service')
else:
raise MissingParameterValue('service', 'service')
operation = _get_get_param(self.http_request, 'request')
request_parser = self._get_request_parser(operation)
request_parser(self.http_request)
# catch error generated by process code
try:
wps_response = process.execute(wps_request)
except Exception as e:
raise NoApplicableCode('Service error: %s' % e)
# get the specified output as raw
if wps_request.raw:
for outpt in wps_request.outputs:
for proc_outpt in process.outputs:
if outpt == proc_outpt.identifier:
return Response(proc_outpt.data)
# if the specified identifier was not found raise error
raise InvalidParameterValue('')
return wps_response
outinputs = deque(maxlen=source.max_occurs)
for inpt in inputs:
data_input = source.clone()
frmt = data_input.supported_formats[0]
if 'mimeType' in inpt:
if inpt['mimeType']:
frmt = data_input.get_format(inpt['mimeType'])
else:
frmt = data_input.data_format
if frmt:
data_input.data_format = frmt
else:
raise InvalidParameterValue(
'Invalid mimeType value %s for input %s' %
(inpt.get('mimeType'), source.identifier),
'mimeType')
data_input.method = inpt.get('method', 'GET')
data_input.process(inpt)
outinputs.append(data_input)
if len(outinputs) < source.min_occurs:
raise MissingParameterValue(description="Given data input is missing", locator=source.identifier)
return outinputs
raise MissingParameterValue('', 'identifier')
identifier_elements = []
# 'all' keyword means all processes
if 'all' in (ident.lower() for ident in identifiers):
for process in self.processes:
try:
identifier_elements.append(self.processes[process].describe_xml())
except Exception as e:
raise NoApplicableCode(e)
else:
for identifier in identifiers:
try:
process = self.processes[identifier]
except KeyError:
raise InvalidParameterValue("Unknown process %r" % identifier, "identifier")
else:
try:
identifier_elements.append(process.describe_xml())
except Exception as e:
raise NoApplicableCode(e)
doc = WPS.ProcessDescriptions(
*identifier_elements
)
doc.attrib['{http://www.w3.org/2001/XMLSchema-instance}schemaLocation'] = 'http://www.opengis.net/wps/1.0.0 http://schemas.opengis.net/wps/1.0.0/wpsDescribeProcess_response.xsd'
doc.attrib['service'] = 'WPS'
doc.attrib['version'] = '1.0.0'
doc.attrib['{http://www.w3.org/XML/1998/namespace}lang'] = 'en-CA'
return xml_response(doc)
self.outputs = get_output_from_xml(doc)
self.raw = False
if xpath_ns(doc, '/wps:Execute/wps:ResponseForm/wps:RawDataOutput'):
self.raw = True
# executeResponse XML will not be stored
self.store_execute = 'false'
# check if response document tag has been set then retrieve
response_document = xpath_ns(doc, './wps:ResponseForm/wps:ResponseDocument')
if len(response_document) > 0:
self.lineage = response_document[0].attrib.get('lineage', 'false')
self.store_execute = response_document[0].attrib.get('storeExecuteResponse', 'false')
self.status = response_document[0].attrib.get('status', 'false')
else:
raise InvalidParameterValue(doc.tag)
else:
raise MethodNotAllowed()
def data_format(self, data_format):
"""self data_format setter
"""
if self._is_supported(data_format):
self._data_format = data_format
if not data_format.validate or data_format.validate == emptyvalidator:
data_format.validate = get_validator(data_format.mime_type)
else:
raise InvalidParameterValue("Requested format {}, {}, {} not supported".format(
data_format.mime_type,
data_format.encoding,
data_format.schema),
'mimeType')