How to use the uplink.utils function in uplink

To help you get started, we’ve selected a few uplink examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github prkumar / uplink / tests / unit / test_utils.py View on Github external
def test_get_arg_spec():
    if is_py2:
        code = "def func(pos1, *args, **kwargs): pass"
    else:
        code = "def func(pos1, *args: 2, **kwargs: 3) -> 4: pass"
    exec(code, globals(), locals())
    signature = utils.get_arg_spec(locals()["func"])
    assert isinstance(signature, utils.Signature)
    assert signature.args == ["pos1", "args", "kwargs"]
    if not is_py2:
        assert signature.annotations == {"args": 2, "kwargs": 3}
        assert signature.return_annotation == 4
github prkumar / uplink / tests / unit / test_commands.py View on Github external
def test_call(self, mocker, annotation_mock):
        # Setup
        def func():
            pass

        sig = utils.Signature(
            args=["self", "arg1", "arg2"],
            annotations={"arg1": annotation_mock},
            return_annotation=None,
            positional_args=["self", "arg1", "arg2"],
            has_varargs=False,
            has_keywords=True,
        )
        mocker.patch("uplink.utils.get_arg_spec").return_value = sig

        http_method = commands.HttpMethod("METHOD", uri="/{hello}")
        builder = http_method(func)
        assert isinstance(builder, commands.RequestDefinitionBuilder)
        assert builder.__name__ == func.__name__
        assert builder.method == "METHOD"
        assert list(builder.uri.remaining_variables) == ["hello"]
github prkumar / uplink / tests / unit / test_integration.py View on Github external
def _get_url(url):
    return utils.urlparse.urljoin(BASE_URL, url)
github prkumar / uplink / uplink / ratelimit.py View on Github external
def __init__(
        self,
        calls=15,
        period=900,
        raise_on_limit=False,
        group_by=BY_HOST_AND_PORT,
        clock=now,
    ):
        self._max_calls = max(1, min(sys.maxsize, math.floor(calls)))
        self._period = period
        self._clock = clock
        self._limiter_cache = {}
        self._group_by = utils.no_op if group_by is None else group_by

        if utils.is_subclass(raise_on_limit, Exception) or isinstance(
            raise_on_limit, Exception
        ):
            self._create_limit_reached_exception = raise_on_limit
        elif raise_on_limit:
            self._create_limit_reached_exception = (
                self._create_rate_limit_exceeded
            )
        else:
            self._create_limit_reached_exception = None
github prkumar / uplink / uplink / arguments.py View on Github external
def from_func(cls, func):
        if not hasattr(func, cls.__ANNOTATION_BUILDER_KEY):
            spec = utils.get_arg_spec(func)
            handler = cls(func, spec.args)
            setattr(func, cls.__ANNOTATION_BUILDER_KEY, handler)
            handler.set_annotations(spec.annotations)
        return getattr(func, cls.__ANNOTATION_BUILDER_KEY)
github prkumar / uplink / uplink / arguments.py View on Github external
def handle_call(self, request_builder, args, kwargs):
        call_args = utils.get_call_args(self._func, None, *args, **kwargs)
        self.handle_call_args(request_builder, call_args)
github prkumar / uplink / uplink / commands.py View on Github external
def __call__(
        self, func, request_definition_builder_factory=RequestDefinitionBuilder
    ):
        spec = utils.get_arg_spec(func)
        arg_handler = arguments.ArgumentAnnotationHandlerBuilder(
            func, spec.args
        )
        builder = request_definition_builder_factory(
            self._method,
            URIDefinitionBuilder(self._uri),
            arg_handler,
            decorators.MethodAnnotationHandlerBuilder(),
        )

        # Need to add the annotations after constructing the request
        # definition builder so it has a chance to attach its listener.
        arg_handler.set_annotations(spec.annotations)

        # Use return value type hint as expected return type
        if spec.return_annotation is not None:
github prkumar / uplink / uplink / commands.py View on Github external
def remaining_variables(self):
        return utils.URIBuilder.variables(self._uri) - self._uri_variables