How to use the zeep.transports.Transport function in zeep

To help you get started, we’ve selected a few zeep examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mvantellingen / python-zeep / tests / test_wsdl.py View on Github external
def test_parse_soap_wsdl():
    client = Client("tests/wsdl_files/soap.wsdl", transport=Transport())

    response = """
        
        
           
           
              
                 120.123
              
           
        
    """.strip()

    client.set_ns_prefix("stoc", "http://example.com/stockquote.xsd")
github mvantellingen / python-zeep / tests / test_soap_multiref.py View on Github external
bar
                  
                
              

              
                foo
                bar
                
              
           
        
    """.strip()  # noqa

    client = Client(wsdl_file, transport=Transport())
    response = stub(status_code=200, headers={}, content=content)

    operation = client.service._binding._operations["TestOperation"]
    result = client.service._binding.process_reply(client, operation, response)

    assert result.item_1.subitem_1 == "foo"
    assert result.item_1.subitem_2 == "bar"
    assert result.item_2.subitem_1.subitem_1 == "foo"
    assert result.item_2.subitem_1.subitem_2 == "bar"
    assert result.item_2.subitem_2 == "bar"
github mvantellingen / python-zeep / tests / test_wsdl.py View on Github external
def test_parse_soap_header_wsdl():
    client = Client("tests/wsdl_files/soap_header.wsdl", transport=Transport())

    response = """
    
    
       
       
          
             120.123
          
       
    
    """.strip()

    with requests_mock.mock() as m:
github mvantellingen / python-zeep / src / zeep / client.py View on Github external
def __init__(self, *args, **kwargs):

        # Don't use setdefault since we want to lazily init the Transport cls
        from zeep.cache import SqliteCache

        kwargs["transport"] = kwargs.get("transport") or Transport(cache=SqliteCache())

        super(CachingClient, self).__init__(*args, **kwargs)
github AliAbdelaal / ATKSpy / atkspy / Parser.py View on Github external
def __init__(self, app_id):
        self._WSDL = "https://atks.microsoft.com/Services/ParserService.svc"
        self._PORT = "HTTPS_IParserService"
        session = Session()
        session.verify = False
        trasnport = Transport(session=session)
        self._client = Client(wsdl=self._WSDL, port_name=self._PORT, transport=trasnport)
        self._app_id = app_id
github AliAbdelaal / ATKSpy / atkspy / Transliteration.py View on Github external
def __init__(self, app_id):
        self._WSDL = "https://atks.microsoft.com/Services/TransliteratorService.svc"
        self._PORT = "HTTPS_ITransliteratorService"
        session = Session()
        session.verify = False
        trasnport = Transport(session=session)
        self._client = Client(wsdl=self._WSDL, port_name=self._PORT, transport=trasnport)
        self._app_id = app_id
github mytardis / mytardis / tardis / apps / publication_forms / doi.py View on Github external
def _init_client(self, endpoint):
        session = Session()
        session.auth = HTTPDigestAuth(
            settings.MODC_DOI_API_ID,
            settings.MODC_DOI_API_PASSWORD)
        return Client(endpoint, transport=Transport(session=session))
github AliAbdelaal / ATKSpy / atkspy / Part_of_Speech_Tagger.py View on Github external
def __init__(self, app_id):
        self._WSDL = "https://atks.microsoft.com/Services/POSTaggerService.svc"
        self._PORT = "HTTPS_IPOSTaggerService"
        session = Session()
        session.verify = False
        trasnport = Transport(session=session)
        self._client = Client(wsdl=self._WSDL, port_name=self._PORT, transport=trasnport)
        self._app_id = app_id
github indico / indico-plugins / vc_vidyo / indico_vc_vidyo / api / client.py View on Github external
def __init__(self, wsdl, settings):
        session = Session()
        transport = Transport(session=session, cache=ZeepCache())
        session.auth = HTTPBasicAuth(settings.get('username'), settings.get('password'))
        self.client = Client(wsdl, transport=transport)
        self.factory = self.client.type_factory('ns0')
github jmagnusson / netsuite / netsuite / client.py View on Github external
if not is_success:
                response_detail = response_status["statusDetail"]
                raise NetsuiteResponseError(response_detail)

            if extract is not None:
                response = extract(response)

            return response

        return wrapper

    return decorator


class NetSuiteTransport(Transport):
    """
    NetSuite dynamic domain wrapper for zeep.transports.transport

    Latest NetSuite WSDL now uses relative definition addresses

    zeep maps reflective remote calls to the base WSDL address,
    rather than the dynamic subscriber domain

    Wrap the zeep transports service with our address modifications
    """

    def __init__(self, wsdl_url, *args, **kwargs):
        """
        Assign the dynamic host domain component to a class variable
        """
        parsed_wsdl_url = urlparse(wsdl_url)