Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise Fault(
message=get_text("faultstring"),
code=get_text("faultcode"),
actor=get_text("faultactor"),
detail=fault_node.find("detail"),
)
def _set_http_headers(self, serialized, operation):
serialized.headers["Content-Type"] = "text/xml; charset=utf-8"
class Soap12Binding(SoapBinding):
nsmap = {
"soap": ns.SOAP_12,
"soap-env": ns.SOAP_ENV_12,
"wsdl": ns.WSDL,
"xsd": ns.XSD,
}
def process_error(self, doc, operation):
fault_node = doc.find("soap-env:Body/soap-env:Fault", namespaces=self.nsmap)
if fault_node is None:
raise Fault(
message="Unknown fault occured",
code=None,
actor=None,
detail=etree_to_string(doc),
)
def get_text(name):
def _verify_envelope_with_key(envelope, key):
soap_env = detect_soap_env(envelope)
header = envelope.find(QName(soap_env, "Header"))
if header is None:
raise SignatureVerificationFailed()
security = header.find(QName(ns.WSSE, "Security"))
signature = security.find(QName(ns.DS, "Signature"))
ctx = xmlsec.SignatureContext()
# Find each signed element and register its ID with the signing context.
refs = signature.xpath("ds:SignedInfo/ds:Reference", namespaces={"ds": ns.DS})
for ref in refs:
# Get the reference URI and cut off the initial '#'
referenced_id = ref.get("URI")[1:]
referenced = envelope.xpath(
"//*[@wsu:Id='%s']" % referenced_id, namespaces={"wsu": ns.WSU}
)[0]
ctx.register_id(referenced, "Id", ns.WSU)
ctx.key = key
try:
def apply(self, envelope, headers):
security = utils.get_security_header(envelope)
# The token placeholder might already exists since it is specified in
# the WSDL.
token = security.find("{%s}UsernameToken" % ns.WSSE)
if token is None:
token = utils.WSSE.UsernameToken()
security.append(token)
if self.timestamp_token is not None:
security.append(self.timestamp_token)
# Create the sub elements of the UsernameToken element
elements = [utils.WSSE.Username(self.username)]
if self.password is not None or self.password_digest is not None:
if self.use_digest:
elements.extend(self._create_password_digest())
else:
elements.extend(self._create_password_text())
token.extend(elements)
return child.text
raise Fault(
message=get_text("faultstring"),
code=get_text("faultcode"),
actor=get_text("faultactor"),
detail=fault_node.find("detail"),
)
def _set_http_headers(self, serialized, operation):
serialized.headers["Content-Type"] = "text/xml; charset=utf-8"
class Soap12Binding(SoapBinding):
nsmap = {
"soap": ns.SOAP_12,
"soap-env": ns.SOAP_ENV_12,
"wsdl": ns.WSDL,
"xsd": ns.XSD,
}
def process_error(self, doc, operation):
fault_node = doc.find("soap-env:Body/soap-env:Fault", namespaces=self.nsmap)
if fault_node is None:
raise Fault(
message="Unknown fault occured",
code=None,
actor=None,
detail=etree_to_string(doc),
)
import uuid
from lxml import etree
from lxml.builder import ElementMaker
from zeep import ns
from zeep.plugins import Plugin
from zeep.wsdl.utils import get_or_create_header
WSA = ElementMaker(namespace=ns.WSA, nsmap={"wsa": ns.WSA})
class WsAddressingPlugin(Plugin):
nsmap = {"wsa": ns.WSA}
def egress(self, envelope, http_headers, operation, binding_options):
"""Apply the ws-addressing headers to the given envelope."""
wsa_action = operation.input.abstract.wsa_action
if not wsa_action:
wsa_action = operation.soapaction
header = get_or_create_header(envelope)
headers = [
WSA.Action(wsa_action),
WSA.MessageID("urn:uuid:" + str(uuid.uuid4())),
import logging
import six
from lxml import etree
from zeep import ns
from zeep.exceptions import Fault
from zeep.utils import qname_attr
from zeep.wsdl import messages
from zeep.wsdl.definitions import Binding, Operation
logger = logging.getLogger(__name__)
NSMAP = {"http": ns.HTTP, "wsdl": ns.WSDL, "mime": ns.MIME}
class HttpBinding(Binding):
def create_message(self, operation, *args, **kwargs):
if isinstance(operation, six.string_types):
operation = self.get(operation)
if not operation:
raise ValueError("Operation not found")
return operation.create(*args, **kwargs)
def process_service_port(self, xmlelement, force_https=False):
address_node = xmlelement.find("http:address", namespaces=NSMAP)
if address_node is None:
raise ValueError("No `http:address` node found")
# Force the usage of HTTPS when the force_https boolean is true
return self
def signature(self, schema=None, standalone=True):
if self.restrict:
base = self.restrict.name
else:
base = "ANY"
if self.accepts_multiple:
return "%s[]" % base
return base
class AnyAttribute(Base):
name = None
_ignore_attributes = [etree.QName(ns.XSI, "type")]
def __init__(self, process_contents="strict"):
self.qname = None
self.process_contents = process_contents
def parse(self, attributes, context=None):
result = {}
for key, value in attributes.items():
if key not in self._ignore_attributes:
result[key] = value
return result
def resolve(self):
return self
def render(self, parent, value, render_path=None):
called later to do that.
Adds a Reference node to the signature with URI attribute pointing to the
target node, and registers the target node's ID so XMLSec will be able to
find the target node by ID when it signs.
"""
# Ensure the target node has a wsu:Id attribute and get its value.
node_id = ensure_id(target)
# Unlike HTML, XML doesn't have a single standardized Id. WSSE suggests the
# use of the wsu:Id attribute for this purpose, but XMLSec doesn't
# understand that natively. So for XMLSec to be able to find the referenced
# node by id, we have to tell xmlsec about it using the register_id method.
ctx.register_id(target, "Id", ns.WSU)
# Add reference to signature with URI attribute pointing to that ID.
ref = xmlsec.template.add_reference(
signature, digest_method or xmlsec.Transform.SHA1, uri="#" + node_id
)
# This is an XML normalization transform which will be performed on the
# target node contents before signing. This ensures that changes to
# irrelevant whitespace, attribute ordering, etc won't invalidate the
# signature.
xmlsec.template.add_transform(ref, xmlsec.Transform.EXCL_C14N)
def xsi_ns(localname):
return etree.QName(ns.XSI, localname)