Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import dataclasses
import magma as m
@dataclasses.dataclass(eq=True, frozen=True)
class GlobalBufferParams:
# Tile parameters
NUM_GLB_TILES: int = 16
TILE_SEL_ADDR_WIDTH: int = m.bitutils.clog2(NUM_GLB_TILES)
# CGRA Tiles
NUM_CGRA_TILES: int = 32
# CGRA tiles per GLB tile
CGRA_PER_GLB: int = NUM_CGRA_TILES // NUM_GLB_TILES # 2
# Bank parameters
BANKS_PER_TILE: int = 2
BANK_SEL_ADDR_WIDTH: int = m.bitutils.clog2(BANKS_PER_TILE)
BANK_DATA_WIDTH: int = 64
BANK_ADDR_WIDTH: int = 17
import difflib
import collections
import dataclasses
from typing import Set, FrozenSet, Tuple, Any, List
Metadata = collections.namedtuple("Metadata", "version")
Package = collections.namedtuple("Package", "name type")
Info = collections.namedtuple("Info", "packages metadata")
SimilarPackages = collections.namedtuple("SimilarPackages", "pkg missed")
ProbableMatch = collections.namedtuple("ProbableMatch", "pkg ratio")
@dataclasses.dataclass()
class Analysis:
"""
A package metadata analysis class. When given the raw syft and inline data, all necessary derivative information
needed to do a comparison of package and metadata is performed, allowing callers to interpret the results
"""
# all raw data from the inline scan and syft reports
syft_data: Info
inline_data: Info
# all derivative information (derived from the raw data above)
overlapping_packages: FrozenSet[Package] = dataclasses.field(init=False)
extra_packages: FrozenSet[Package] = dataclasses.field(init=False)
missing_packages: FrozenSet[Package] = dataclasses.field(init=False)
inline_metadata: Set[Tuple[Any, Any]] = dataclasses.field(init=False)
from dataclasses import dataclass, field
from typing import Dict, List
from graphql import GraphQLResolveInfo
from examples.library.types import Author, Book
from typegql import ID, RequiredListInputArgument
from typegql.pubsub import pubsub
@dataclass(init=False, repr=False)
class Mutation:
create_books: List[ID] = field(
metadata={
"description": "Create new `Book`s and return a list of ids for the created objects",
"arguments": [RequiredListInputArgument[Book](name="books")],
}
)
async def mutate_create_books(self, _: GraphQLResolveInfo, books: List[Dict]):
result = [1]
pubsub.publish("books_added", result)
return result
create_authors: List[ID] = field(
metadata={
"description": "Create new `Author`s and return a list of ids for the created objects",
from raiden.constants import EMPTY_ADDRESS, UINT256_MAX
from raiden.utils.formatting import to_checksum_address
from raiden.utils.typing import (
Address,
ChainID,
ChannelID,
T_Address,
T_ChainID,
T_ChannelID,
TokenNetworkAddress,
typecheck,
)
@dataclass(frozen=True, order=True)
class CanonicalIdentifier:
chain_identifier: ChainID
token_network_address: TokenNetworkAddress
channel_identifier: ChannelID
def validate(self) -> None:
typecheck(self.chain_identifier, T_ChainID)
typecheck(self.token_network_address, T_Address)
typecheck(self.channel_identifier, T_ChannelID)
if self.channel_identifier < 0 or self.channel_identifier > UINT256_MAX:
raise ValueError("channel id is invalid")
def __str__(self) -> str:
return (
"CanonicalIdentifier("
class Node(typing.Generic[T_collection]):
name: str
args: T_collection
def replace_with(self, node: "Node") -> None:
self.name = node.name
self.args = node.args # type: ignore
def could_equal(self, other_node: "Node") -> bool:
return self.name == other_node.name and len(self.args) == len(other_node.args)
ArgNames = typing.Collection[typing.Optional[str]]
@dataclasses.dataclass(frozen=True)
class Replacement:
name: str
arg_names: ArgNames
replace: typing.Callable[..., Node]
Replacements = typing.Collection[Replacement]
def args_match(args: typing.Collection, arg_names: ArgNames) -> bool:
if len(args) != len(arg_names):
return False
for arg, arg_name in zip(args, arg_names):
if arg_name is not None and (not isinstance(arg, Node) or arg.name != arg_name):
return False
return True
from typing import Optional
from asyncio import Queue
from itertools import chain
from collections import defaultdict
from dataclasses import dataclass
from . import images
from .utils import MessageType, terminate
from .types import DockerImage
from .tasks import build_image
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class Dep:
image: Optional[str]
docker_image: DockerImage
class ImagesCollector:
def __init__(self, images_map, services_map):
self._images_map = images_map
self._services_map = services_map
self._services_seen = set()
self._deps = set()
@classmethod
def collect(cls, images_map, services_map, obj):
self = cls(images_map, services_map)
from dataclasses import dataclass, field
from typing import List, Dict
import uuid
from common.database import Database
from models.item import Item
from models.user import User
from models.model import Model
@dataclass(eq=False)
class Alert(Model):
collection: str = field(init=False, default="alerts")
name: str
item_id: str
price_limit: str
user_email: str
_id: str = field(default_factory=lambda: uuid.uuid4().hex)
def __post_init__(self):
self.item = Item.get_by_id(self.item_id)
self.user = User.find_by_email(self.user_email)
def json(self) -> Dict:
return {
"_id": self._id,
"name": self.name,
def settings_wrapper(_cls):
_resolve_from_env(_cls, prefix, case_sensitive, aliases)
cls = wrap_cls(dataclasses.dataclass(_cls, frozen=frozen), jsonschema=False)
return cls
from ....typing import BrandID, UserID
ChannelID = NewType('ChannelID', str)
ItemID = NewType('ItemID', UUID)
ItemVersionID = NewType('ItemVersionID', UUID)
ImageID = NewType('ImageID', UUID)
@dataclass(frozen=True)
class Channel:
id: ChannelID
brand_id: BrandID
url_prefix: str
@dataclass(frozen=True)
class Image:
id: ImageID
created_at: datetime
creator_id: UserID
item_id: ItemID
number: int
filename: str
url_path: str
alt_text: Optional[str]
def wrapper(cls):
cls = dataclass(cls)
cls._csv_file = csv_file
cls._csv_id_field = None
cls._csv_parent_field = None
cls._csv_index_field = None
cls._csv_node_list_fields = []
cls._csv_node_fields = []
cls._csv_fields = {}
for field in fields(cls):
csv_type = field.metadata.get(FIELD_METADATA_CSVMODEL_TYPE, None)
if csv_type == 'id':
assert cls._csv_id_field is None, "Already has an ID field"
cls._csv_id_field = field
elif csv_type == 'child_parent_node':
assert cls._csv_parent_field is None, "Already has a parent ID field"