Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _loadedEntryGen(self) -> Iterator[BaseEntry]:
wordCount = len(self._data)
progressbar = self.ui and self._progressbar
if progressbar:
self.progressInit("Writing")
for index, rawEntry in enumerate(self._data):
yield Entry.fromRaw(
rawEntry,
defaultDefiFormat=self._defaultDefiFormat
)
if progressbar:
self.progress(index, wordCount)
if progressbar:
self.progressEnd()
def newEntry(self, word: str, defi: str, defiFormat: str = "") -> Entry:
"""
create and return a new entry object
"""
if not defiFormat:
defiFormat = self._defaultDefiFormat
return Entry(word, defi, defiFormat)
def __next__(self):
if not self._slobObj:
log.error("iterating over a reader which is not open")
raise StopIteration
self._refIndex += 1
if self._refIndex >= len(self._slobObj):
raise StopIteration
blob = self._slobObj[self._refIndex]
# blob.key is str, blob.content is bytes
word = blob.key
defi = toStr(blob.content)
return Entry(word, defi)
self._pos += 1
try:
return self._pendingEntries.pop(0)
except IndexError:
pass
###
try:
wordDefi = self.nextPair()
except StopIteration as e:
self._wordCount = self._pos
raise e
if not wordDefi:
return
word, defi = wordDefi
###
return Entry(word, defi)
def sortWords(
self,
key: Optional[Callable[[str], Any]] = None,
cacheSize: int = 0,
) -> None:
# only sort by main word, or list of words + alternates? FIXME
if self._readers:
self._sortKey = key
if cacheSize > 0:
self._sortCacheSize = cacheSize # FIXME
else:
self._data.sort(
key=Entry.getRawEntrySortKey(key),
)
self._updateIter(sort=True)
def loadInfo(self):
self._pendingEntries = []
self._leadingLinesCount = 0
try:
while True:
wordDefi = self.nextPair()
if not wordDefi:
continue
word, defi = wordDefi
if not self.isInfoWord(word):
self._pendingEntries.append(Entry(word, defi))
break
self._leadingLinesCount += 1
word = self.fixInfoWord(word)
if not word:
continue
self._glos.setInfo(word, defi)
except StopIteration:
pass
depending on:
1- Wheather or not direct mode is On (self._readers not empty)
or Off (self._readers empty)
2- Wheather sort is True, and if it is,
checks for self._sortKey and self._sortCacheSize
"""
if self._readers: # direct mode
if sort:
sortKey = self._sortKey
cacheSize = self._sortCacheSize
log.info("Stream sorting enabled, cache size: %s", cacheSize)
# only sort by main word, or list of words + alternates? FIXME
gen = hsortStreamList(
self._readers,
cacheSize,
key=Entry.getEntrySortKey(sortKey),
)
else:
gen = self._readersEntryGen()
else:
gen = self._loadedEntryGen()
self._iter = self._applyEntryFiltersGen(gen)