Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_from_cache(
self, cache_filename: Union[Path, str]) -> Iterator[PackType]:
r"""Reads one or more Packs from ``cache_filename``, and yields Pack(s)
from the cache file.
Args:
cache_filename: Path to the cache file.
Returns: List of cached data packs.
"""
logger.info("reading from cache file %s", cache_filename)
with open(cache_filename, "r") as cache_file:
for line in cache_file:
pack = data_utils.deserialize(self._pack_manager, line.strip())
if not isinstance(pack, self.pack_type):
raise TypeError(
f"Pack deserialized from {cache_filename} "
f"is {type(pack)}, but expect {self.pack_type}")
yield pack
# pylint: disable=protected-access
with open(os.path.join(
self.configs.data_path, multi_pack_path)) as m_data:
# m_pack: MultiPack = MultiPack.deserialize(m_data.read())
m_pack: MultiPack = deserialize(self._pack_manager, m_data.read())
for pid in m_pack._pack_ref:
sub_pack_path = self.__pack_index[pid]
if self._pack_manager.get_remapped_id(pid) >= 0:
# This pid is already been read.
continue
with open(os.path.join(
self.configs.data_path, sub_pack_path)) as pack_data:
# pack: DataPack = DataPack.deserialize(pack_data.read())
pack: DataPack = deserialize(self._pack_manager,
pack_data.read())
# Add a reference count to this pack, because the multipack
# needs it.
self._pack_manager.reference_pack(pack)
m_pack.realign_packs()
yield m_pack
def _parse_pack(self, data_source: str) -> Iterator[DataPack]:
if data_source is None:
raise ProcessExecutionException(
"Data source is None, cannot deserialize.")
# pack: DataPack = DataPack.deserialize(data_source)
pack: DataPack = deserialize(self._pack_manager, data_source)
if pack is None:
raise ProcessExecutionException(
f"Cannot recover pack from the following data source: \n"
f"{data_source}")
yield pack
resource_name, info_box_data = collection
if resource_name in self.redirects:
resource_name = self.redirects[resource_name]
if resource_name in self.pack_index:
print_progress(f'Add infobox to resource: [{resource_name}]')
pack_path = os.path.join(
self.pack_dir,
self.pack_index[resource_name]
)
if os.path.exists(pack_path):
with open(pack_path) as pack_file:
pack = data_utils.deserialize(
self._pack_manager, pack_file.read())
add_info_boxes(pack, info_box_data['literals'])
add_info_boxes(pack, info_box_data['objects'])
add_property(pack, info_box_data['properties'])
yield pack
else:
print_notice(f"Resource {resource_name} is not in the raw packs.")
self.logger.warning("Resource %s is not in the raw packs.",
resource_name)
def _parse_pack(self, multi_pack_path: str) -> Iterator[MultiPack]:
# pylint: disable=protected-access
with open(os.path.join(
self.configs.data_path, multi_pack_path)) as m_data:
# m_pack: MultiPack = MultiPack.deserialize(m_data.read())
m_pack: MultiPack = deserialize(self._pack_manager, m_data.read())
for pid in m_pack._pack_ref:
sub_pack_path = self.__pack_index[pid]
if self._pack_manager.get_remapped_id(pid) >= 0:
# This pid is already been read.
continue
with open(os.path.join(
self.configs.data_path, sub_pack_path)) as pack_data:
# pack: DataPack = DataPack.deserialize(pack_data.read())
pack: DataPack = deserialize(self._pack_manager,
pack_data.read())
# Add a reference count to this pack, because the multipack
# needs it.
self._pack_manager.reference_pack(pack)