Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_reader_values(create_csv):
csv_file = create_csv(
[{'name': 'User1', 'age': 40}, {'name': 'User2', 'age': 30}]
)
with csv_file.open() as f:
reader = DataclassReader(f, User)
items = list(reader)
assert items and len(items) == 2
for item in items:
assert dataclasses.is_dataclass(item)
user1, user2 = items[0], items[1]
assert user1.name == 'User1'
assert user1.age == 40
assert user2.name == 'User2'
assert user2.age == 30
def _is_property_name(t: Type[T], property_name: str) -> bool:
return (
is_dataclass(t) and property_name in (f.name for f in fields(t)) or property_name in dir(t)
)
def _annotation_to_dict(dc):
# convenience method
if is_dataclass(dc):
d = asdict(dc)
ret = dict()
for k, v in d.items():
ret[k] = _annotation_to_dict(v)
return ret
elif isinstance(dc, dict):
ret = dict()
for k, v in dc.items():
k = _annotation_to_dict(k)
v = _annotation_to_dict(v)
ret[k] = v
return ret
elif isinstance(dc, str):
return dc
elif isinstance(dc, (set, frozenset, list, tuple)):
ret = []
def create_cloned_field(field: ModelField) -> ModelField:
original_type = field.type_
if is_dataclass(original_type) and hasattr(original_type, "__pydantic_model__"):
original_type = original_type.__pydantic_model__ # type: ignore
use_type = original_type
if lenient_issubclass(original_type, BaseModel):
original_type = cast(Type[BaseModel], original_type)
use_type = create_model(
original_type.__name__, __config__=original_type.__config__
)
for f in original_type.__fields__.values():
use_type.__fields__[f.name] = f
use_type.__validators__ = original_type.__validators__
if PYDANTIC_1:
new_field = ModelField(
name=field.name,
type_=use_type,
class_validators={},
default=None,
def get_fields(self, graph: Type, is_mutation=False) -> Dict[str, Union[GraphQLField, GraphQLInputField]]:
result: Dict[str, Union[GraphQLField, GraphQLInputField]] = dict()
hints = get_type_hints(graph)
if not is_dataclass(graph):
raise ValueError(f'Expected dataclass for {graph}.')
for field in fields(graph):
if field.name.startswith('_') or field.metadata.get('skip') is True:
continue
_type = hints.get(field.name, field.type)
if is_optional(_type):
_type = _type.__args__[0]
if is_mutation and self.is_readonly(field):
continue
graph_type = self.map_type(_type, is_mutation=is_mutation)
if not graph_type:
continue
if self.is_required(field):
graph_type = graphql.GraphQLNonNull(graph_type)
def pin_memory(batch):
"""
This is ripped off from dataloader. The only difference is that it preserves
the type of Mapping so that the OrderedDict is maintained.
"""
if isinstance(batch, torch.Tensor):
return batch.pin_memory().cuda(non_blocking=True)
elif isinstance(batch, string_classes):
return batch
elif dataclasses.is_dataclass(batch):
return dataclasses.replace(
batch,
**{
field.name: pin_memory(getattr(batch, field.name))
for field in dataclasses.fields(batch)
}
)
elif isinstance(batch, collections.Mapping):
# NB: preserving OrderedDict
return type(batch)((k, pin_memory(sample)) for k, sample in batch.items())
elif isinstance(batch, NamedTuple) or hasattr(batch, "_asdict"):
# This is mainly for WorkerDone
return type(batch)(
**{name: pin_memory(value) for name, value in batch._asdict().items()}
)
elif isinstance(batch, collections.Sequence):
def isinstance(o, t):
if t is dataclass:
return original_isinstance(o, type) and is_dataclass(o)
if original_isinstance(t, GenericMeta):
if t is Dict:
return original_isinstance(o, dict)
if get_origin(t) in (dict, Dict):
key_type, value_type = get_args(t)
return original_isinstance(o, dict) and all(
isinstance(key, key_type) and isinstance(value, value_type)
for key, value in o.items()
)
return original_isinstance(o, t)
def _hydrate_dataclass_properties(cls):
assert dataclasses.is_dataclass(cls)
# Ensure that the `_list_fields` classproperty is converted into a simple attribute in the
# resulting class.
list_fields = cls._list_fields
cls._list_fields = list_fields
# NB: Delete the `abstractmethod`s defined in superclasses. `dataclass`es do *not* have any
# class-level `property` or anything defined for their fields, which appear to only get set when
# an instance of the `dataclass` object is created! In order to avoid errors saying that
# `abstractproperty`s haven't been resolved, we have to *both* set them to None and remove them
# from the `__abstractmethods__` dict.
for name in cls.__dataclass_fields__.keys():
prev_field_value = getattr(cls, name)
assert isinstance(prev_field_value, (_list_field, abstractproperty))
setattr(cls, name, None)
assert name in cls.__abstractmethods__
cls.__abstractmethods__ = cls.__abstractmethods__ - frozenset([name])
return cls
>>> protocols = typic.protocols(Foo)
See Also
--------
:py:class:`SerdeProtocol`
"""
if not any(
(inspect.ismethod(obj), inspect.isfunction(obj), inspect.isclass(obj))
):
obj = obj.__class__
hints = util.cached_type_hints(obj)
params = util.safe_get_params(obj)
fields: Mapping[str, dataclasses.Field] = {}
if dataclasses.is_dataclass(obj):
fields = {f.name: f for f in dataclasses.fields(obj)}
ann = {}
for name in params.keys() | hints.keys():
param = params.get(name)
hint = hints.get(name)
field = fields.get(name)
annotation = hint or param.annotation # type: ignore
annotation = util.resolve_supertype(annotation)
param = param or inspect.Parameter(
name,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
default=EMPTY,
annotation=hint or annotation,
)
if repr(param.default) == "":
param = param.replace(default=EMPTY)