How to use the instawow.api.Request function in instawow

To help you get started, we’ve selected a few instawow examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github layday / instawow / tests / test_ws_api.py View on Github external
from unittest.mock import patch

import aiohttp
import pytest

from instawow import api
from instawow.api import _PkgConverter as Pkg
from instawow.config import Config
from instawow.manager import WsManager
from instawow.models import PkgCoercer, PkgFolderCoercer, PkgOptionsCoercer


PORT = 55439


class FailRequest(api.Request):

    _name = 'fail'

    def prepare_response(self, manager):
        async def raise_():
            raise ValueError

        return raise_()


@pytest.fixture(scope='module', autouse=True)
def config(tmp_path_factory):
    config = Config(config_dir=tmp_path_factory.mktemp(f'{__name__}_config'),
                    addon_dir=tmp_path_factory.mktemp(f'{__name__}_addons'))
    config.write()
    yield config
github layday / instawow / instawow / api.py View on Github external
method: Literal['update']
    params: UpdateRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        return manager.prep_update([split_uri(manager, u) for u in self.params.uris])

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=result.new_pkg, id=self.id)


class RemoveRequestParams(RequestParams):

    uris: List[str]


class RemoveRequest(Request):

    method: Literal['remove']
    params: RemoveRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        return manager.prep_remove([split_uri(manager, u) for u in self.params.uris])

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=None, id=self.id)


class GetRequestParams(RequestParams):

    uris: List[str]
github layday / instawow / instawow / api.py View on Github external
params: RequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        raise NotImplementedError

    def consume_result(self, result: Any) -> SuccessResponse:
        raise NotImplementedError


class SetupRequestParams(RequestParams):

    addon_dir: Path
    game_flavour: Literal['retail', 'classic']


class SetupRequest(Request):

    method: Literal['setup']
    params: SetupRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        async def setup() -> Config:
            config = Config(**dict(self.params)).write()
            setup_logging(config)
            manager.finalise(config)
            return config

        return setup()

    def consume_result(self, result: Config) -> SuccessResponse:
        return SuccessResponse(result={'config': json_loads(result.json())}, id=self.id)
github layday / instawow / instawow / api.py View on Github external
method: Literal['remove']
    params: RemoveRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        return manager.prep_remove([split_uri(manager, u) for u in self.params.uris])

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=None, id=self.id)


class GetRequestParams(RequestParams):

    uris: List[str]


class GetRequest(Request):

    method: Literal['get']
    params: GetRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        async def get() -> list:
            if self.params.uris:
                return [manager.get(*split_uri(manager, u)) for u in self.params.uris]
            else:
                return manager.db.query(Pkg).all()

        return get()

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=result, id=self.id)
github layday / instawow / instawow / api.py View on Github external
def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=list(result.values()), id=self.id)


class InstallRequestParams(RequestParams):

    uris: List[str]
    resolution_strategy: Strategies
    replace: bool

    class Config:
        use_enum_values = True


class InstallRequest(Request):

    method: Literal['install']
    params: InstallRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        return manager.prep_install([split_uri(manager, u) for u in self.params.uris],
                                    self.params.resolution_strategy,
                                    self.params.replace)

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=result.new_pkg, id=self.id)


class UpdateRequestParams(RequestParams):

    uris: List[str]
github layday / instawow / instawow / api.py View on Github external
return setup()

    def consume_result(self, result: Config) -> SuccessResponse:
        return SuccessResponse(result={'config': json_loads(result.json())}, id=self.id)


class ResolveRequestParams(RequestParams):

    uris: List[str]
    resolution_strategy: Strategies

    class Config:
        use_enum_values = True


class ResolveRequest(Request):

    method: Literal['resolve']
    params: ResolveRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        return manager.resolve([split_uri(manager, u) for u in self.params.uris],
                               self.params.resolution_strategy)

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=list(result.values()), id=self.id)


class InstallRequestParams(RequestParams):

    uris: List[str]
    resolution_strategy: Strategies
github layday / instawow / instawow / api.py View on Github external
def parse_request(message: str) -> Request:
    "Parse a JSON string into a sub-``Request`` object."
    try:
        data = json_loads(message)
    except JSONDecodeError as error:
        raise ApiError.PARSE_ERROR('request is not valid JSON', str(error))
    try:
        Request(**data)
    except TypeError as error:
        raise ApiError.INVALID_REQUEST('request is malformed',
                                       *error.args, None)
    except ValidationError as error:
        raise ApiError.INVALID_REQUEST('request is malformed',
                                       error.json(indent=None),
                                       getattr(data, 'get', lambda _: None)('id'))

    cls = _methods.get(data['method'])
    if cls:
        try:
            return cls(**data)
        except ValidationError as error:
            raise ApiError.INVALID_PARAMS('request params are invalid',
                                          error.json(indent=None), data.get('id'))
    else:
github layday / instawow / instawow / api.py View on Github external
def prepare_response(self, manager: WsManager) -> Awaitable:
        return manager.prep_install([split_uri(manager, u) for u in self.params.uris],
                                    self.params.resolution_strategy,
                                    self.params.replace)

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=result.new_pkg, id=self.id)


class UpdateRequestParams(RequestParams):

    uris: List[str]


class UpdateRequest(Request):

    method: Literal['update']
    params: UpdateRequestParams

    def prepare_response(self, manager: WsManager) -> Awaitable:
        return manager.prep_update([split_uri(manager, u) for u in self.params.uris])

    def consume_result(self, result: Any) -> SuccessResponse:
        return SuccessResponse(result=result.new_pkg, id=self.id)


class RemoveRequestParams(RequestParams):

    uris: List[str]