Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multiple_outputs_inline(self):
def _partitions(task, to):
root = to.build_target(task)
return {t: root.partition(name="t_output/%s" % t) for t in task.t_types}
class TTaskMultipleOutputs(TTask):
t_types = parameter(default=[1, 2])[List]
t_output = output.folder(output_factory=_partitions)
def run(self):
for t_name, t_target in six.iteritems(self.t_output):
t_target.write("%s" % t_name)
task = TTaskMultipleOutputs()
assert_run_task(task)
logger.error(task.t_output)
def test_optional(self):
EmpD = TypedDict(b'EmpD', name=str, id=int)
self.assertEqual(typing.Optional[EmpD], typing.Union[None, EmpD])
self.assertNotEqual(typing.List[EmpD], typing.Tuple[EmpD])
@argument("hosts", type=List[str], description="list of hostnames")
@argument(
"sort",
type=str,
description="What field to sort by the tabular output",
choices=list(FIELDS),
)
async def status(
format="tabular",
sort="node_id",
nodes=None,
hosts=None,
hostnames=None,
extended=False,
):
"""
Next gen status command using the Thrift interfaces
def create_repr(obj: Any, attrs: Optional[Sequence[str]] = None):
if attrs is None:
attrs = obj.__dict__.keys()
attrs_kv: List[str] = []
for attr in attrs:
attr_value = getattr(obj, attr)
if attr_value is not None:
attrs_kv.append(f"{attr}={attr_value!r}")
attrs_repr = ", ".join(attrs_kv)
return f"{obj.__class__.__qualname__}({attrs_repr})"
return self.owner_id == other.owner_id and self.id == other.id
class DocAttachmentType(Enum):
doc = "doc"
graffiti = "graffiti"
audio_message = "audio_message"
class DocPreview(BaseModel):
photo: "DocPreviewPhoto" = None
video: "DocPreviewVideo" = None
class DocPreviewPhoto(BaseModel):
sizes: typing.List["photos.PhotoSizes"] = None
class DocPreviewVideo(BaseModel):
filesize: int = None
height: int = None
src: str = None
width: int = None
class DocTypes(BaseModel):
count: int = None
id: int = None
title: str = None
class DocUploadResponse(BaseModel):
def __init__(self,
scheduler: BackgroundScheduler,
args: Tuple[Flask, Config]):
"""Initialize scheduler class."""
self.scheduler = scheduler
self.args = args
self.modules: List[ModuleBase] = []
self.__init_periodic_tasks()
atexit.register(self.scheduler.shutdown)
from authorityspoke.facts import Fact
from authorityspoke.holdings import Holding
from authorityspoke.io.name_index import Mentioned
from authorityspoke.io.name_index import RawFactor, RawPredicate
from authorityspoke.io.nesting import nest_fields
from authorityspoke.io import text_expansion
from authorityspoke.opinions import Opinion
from authorityspoke.pleadings import Pleading, Allegation
from authorityspoke.predicates import Predicate, ureg, Q_
from authorityspoke.procedures import Procedure
from authorityspoke.rules import Rule
from authorityspoke.utils.marshmallow_oneofschema.one_of_schema import OneOfSchema
RawSelector = Union[str, Dict[str, str]]
RawEnactment = Dict[str, Union[str, List[RawSelector]]]
RawProcedure = Dict[str, Sequence[RawFactor]]
RawRule = Dict[str, Union[RawProcedure, Sequence[RawEnactment], str, bool]]
RawHolding = Dict[str, Union[RawRule, str, bool]]
class ExpandableSchema(Schema):
"""Base schema for classes that can be cross-referenced by name in input JSON."""
def get_from_mentioned(self, data, **kwargs):
"""Replace data to load with any object with same name in "mentioned"."""
if isinstance(data, str):
mentioned = self.context.get("mentioned") or Mentioned()
return deepcopy(mentioned.get_by_name(data))
return data
def consume_type_field(self, data, **kwargs):
# You should have received a copy of the GNU General Public License
# along with repology. If not, see .
import os
import pickle
from contextlib import ExitStack
from typing import Any, BinaryIO, Iterable, Iterator, List, Optional
from repology.package import Package
class ChunkedSerializer:
path: str
next_chunk_number: int
chunk_size: int
packages: List[Package]
total_packages: int
def __init__(self, path: str, chunk_size: int) -> None:
self.path = path
self.next_chunk_number = 0
self.chunk_size = chunk_size
self.packages = []
self.total_packages = 0
def _flush(self) -> None:
if not self.packages:
return
packages = sorted(self.packages, key=lambda package: package.effname)
with open(os.path.join(self.path, str(self.next_chunk_number)), 'wb') as outfile:
def __init__(
self, repository: EventSourcedRepository[TAggregate, TAggregateEvent]
) -> None:
self.repository = repository
self.retrieved_aggregates: Dict[UUID, TAggregate] = {}
self.causal_dependencies: List[Tuple[UUID, int]] = []
self.orm_objs_pending_save: List[Any] = []
self.orm_objs_pending_delete: List[Any] = []
stop = to_datetime(stop)
else:
stop = start + timedelta(days=1)
if progressbar is True:
if stop - start > date_delta:
progressbar = tqdm
else:
progressbar = iter
if progressbar is False:
progressbar = iter
progressbar = cast(Callable[[Iterable], Iterable], progressbar)
cumul: List[pd.DataFrame] = []
sequence = list(split_times(start, stop, date_delta))
for bt, at, bh, ah in progressbar(sequence):
logging.info(
f"Sending request between time {bt} and {at} "
f"and hour {bh} and {ah}"
)
request = request_pattern.format(
before_time=bt.timestamp(),
after_time=at.timestamp(),
before_hour=bh.timestamp(),
after_hour=ah.timestamp(),
)
df = self._impala(