How to use the mocket.Mocket function in mocket

To help you get started, we’ve selected a few mocket examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mindflayer / python-mocket / tests / main / test_http.py View on Github external
def test_truesendall_with_dump_from_recording(self):
        requests.get("http://httpbin.org/ip", headers={"user-agent": "Fake-User-Agent"})
        requests.get(
            "http://httpbin.org/gzip", headers={"user-agent": "Fake-User-Agent"}
        )

        dump_filename = os.path.join(
            Mocket.get_truesocket_recording_dir(), Mocket.get_namespace() + ".json"
        )
        with io.open(dump_filename) as f:
            responses = json.load(f)

        self.assertEqual(len(responses["httpbin.org"]["80"].keys()), 2)
github mindflayer / python-mocket / tests / main / test_https.py View on Github external
def test_truesendall_with_recording_https():
    url = 'https://mockbin.com/ip'

    requests.get(url, headers={"Accept": "application/json"})
    resp = requests.get(url, headers={"Accept": "application/json"})
    print(resp.content)
    assert resp.status_code == 200

    dump_filename = os.path.join(
        Mocket.get_truesocket_recording_dir(),
        Mocket.get_namespace() + '.json',
    )
    with io.open(dump_filename) as f:
        responses = json.load(f)

    assert len(responses['mockbin.com']['443'].keys()) == 1
github mindflayer / python-mocket / tests / main / test_http.py View on Github external
def test_register(self):
        with mock.patch("time.gmtime") as tt:
            tt.return_value = time.struct_time((2013, 4, 30, 10, 39, 21, 1, 120, 0))
            Entry.register(
                Entry.GET,
                "http://testme.org/get?a=1&b=2#test",
                Response('{"a": "€"}', headers={"content-type": "application/json"}),
            )
        entries = Mocket._entries[("testme.org", 80)]
        self.assertEqual(len(entries), 1)
        entry = entries[0]
        self.assertEqual(entry.method, "GET")
        self.assertEqual(entry.schema, "http")
        self.assertEqual(entry.path, "/get")
        self.assertEqual(entry.query, "a=1&b=2")
        self.assertEqual(len(entry.responses), 1)
        response = entry.responses[0]
        self.assertEqual(response.body, b'{"a": "\xe2\x82\xac"}')
        self.assertEqual(response.status, 200)
        self.assertEqual(
            response.headers,
            {
                "Status": "200",
                "Date": "Tue, 30 Apr 2013 10:39:21 GMT",
                "Connection": "close",
github mindflayer / python-mocket / tests / main / test_mocket.py View on Github external
def test_recv_into(self):
        Mocket.register(
            MocketEntry(
                ('localhost', 80),
                [
                    b'Long payload',
                    b'Short'
                ]
            )
        )
        buffer = io.BytesIO()
        with Mocketizer():
            _so = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            _so.connect(('localhost', 80))
            _so.sendall(b'first\r\n')
            assert _so.recv_into(buffer, 4096) == 12
            _so.sendall(b'second\r\n')
            assert _so.recv_into(buffer) == 5
github mindflayer / python-mocket / mocket / utils.py View on Github external
def write(self, content):
        super(MocketSocketCore, self).write(content)

        from mocket import Mocket

        if Mocket.r_fd and Mocket.w_fd:
            os.write(Mocket.w_fd, content)
github mindflayer / python-mocket / mocket / plugins / httpretty / __init__.py View on Github external
super(Response, self).set_base_headers()
        self.headers = httprettifier_headers(self.headers)

    original_set_base_headers = set_base_headers

    def set_extra_headers(self, headers):
        self.headers.update(headers)


class Entry(MocketHttpEntry):
    response_cls = Response


activate = mocketize
httprettified = mocketize
enable = Mocket.enable
disable = Mocket.disable
reset = Mocket.reset

GET = Entry.GET
PUT = Entry.PUT
POST = Entry.POST
DELETE = Entry.DELETE
HEAD = Entry.HEAD
PATCH = Entry.PATCH
OPTIONS = Entry.OPTIONS


def register_uri(
    method,
    uri,
    body="HTTPretty :)",
github lyft / amundsenmetadatalibrary / metadata_service / proxy / aws4authwebsocket / transport.py View on Github external
def register(cls, *, uri: str, body: str = '', status: int = 101,
                 headers: Mapping[str, str] = {'Connection': 'Upgrade', 'Upgrade': 'WebSocket'}) \
            -> 'SelfRecordingWebSocketEntry':
        host, port = cls.host_and_port(uri)
        entry = cls(host, port, cls.response_cls(body=body, status=status, headers=headers))
        mocket.Mocket.register(entry)
        return entry
github mindflayer / python-mocket / mocket / plugins / httpretty / __init__.py View on Github external
original_set_base_headers = set_base_headers

    def set_extra_headers(self, headers):
        self.headers.update(headers)


class Entry(MocketHttpEntry):
    response_cls = Response


activate = mocketize
httprettified = mocketize
enable = Mocket.enable
disable = Mocket.disable
reset = Mocket.reset

GET = Entry.GET
PUT = Entry.PUT
POST = Entry.POST
DELETE = Entry.DELETE
HEAD = Entry.HEAD
PATCH = Entry.PATCH
OPTIONS = Entry.OPTIONS


def register_uri(
    method,
    uri,
    body="HTTPretty :)",
    adding_headers=None,
    forcing_headers=None,