Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# class and root for all backends.
self.backends = {}
self.backends['sqlite'] = {
'class': LocalStatusSQLiteFolder,
'root': os.path.join(account.getaccountmeta(), 'LocalStatus-sqlite')
}
self.backends['plain'] = {
'class': LocalStatusFolder,
'root': os.path.join(account.getaccountmeta(), 'LocalStatus')
}
if self.account.getconf('status_backend', None) is not None:
raise OfflineImapError(
"the 'status_backend' configuration option is not supported"
" anymore; please, remove this configuration option.",
OfflineImapError.ERROR.REPO
)
# Set class and root for sqlite.
self.setup_backend('sqlite')
if not os.path.exists(self.root):
os.mkdir(self.root, 0o700)
# self._folders is a dict of name:LocalStatusFolders().
self._folders = {}
# Got new UID, change the local uid.
# Save uploaded status in the statusfolder
statusfolder.savemessage(new_uid, message, flags, rtime)
# Check whether the mail has been seen
if 'S' not in flags:
self.have_newmail = True
elif new_uid == 0:
# Message was stored to dstfolder, but we can't find it's UID
# This means we can't link current message to the one created
# in IMAP. So we just delete local message and on next run
# we'll sync it back
# XXX This could cause infinite loop on syncing between two
# IMAP servers ...
self.deletemessage(uid)
else:
raise OfflineImapError("Trying to save msg (uid %d) on folder "
"%s returned invalid uid %d"% (uid, dstfolder.getvisiblename(),
new_uid), OfflineImapError.ERROR.MESSAGE)
except (KeyboardInterrupt): # bubble up CTRL-C
raise
except OfflineImapError as e:
if e.severity > OfflineImapError.ERROR.MESSAGE:
raise # bubble severe errors up
self.ui.error(e, exc_info()[2])
except Exception as e:
self.ui.error(e, exc_info()[2],
msg = "Copying message %s [acc: %s]"% (uid, self.accountname))
raise #raise on unknown errors, so we can fix those
def getfolder(self, foldername):
"""Return a Folder instance of this Maildir
If necessary, scan and cache all foldernames to make sure that
we only return existing folders and that 2 calls with the same
name will return the same object."""
# getfolders() will scan and cache the values *if* necessary
folders = self.getfolders()
for f in folders:
if foldername == f.name:
return f
raise OfflineImapError("getfolder() asked for a nonexisting "
"folder '%s'."% foldername,
OfflineImapError.ERROR.FOLDER)
# create and return a LocalStatusRepository.
name = account.getconf('localrepository')
return LocalStatusRepository(name, account)
else:
errstr = "Repository type %s not supported" % reqtype
raise OfflineImapError(errstr, OfflineImapError.ERROR.REPO)
# Get repository type.
config = account.getconfig()
try:
repostype = config.get('Repository ' + name, 'type').strip()
except NoSectionError as e:
errstr = ("Could not find section '%s' in configuration. Required "
"for account '%s'." % ('Repository %s' % name, account))
raise OfflineImapError(errstr, OfflineImapError.ERROR.REPO), \
None, exc_info()[2]
try:
repo = typemap[repostype]
except KeyError:
errstr = "'%s' repository not supported for '%s' repositories."% \
(repostype, reqtype)
raise OfflineImapError(errstr, OfflineImapError.ERROR.REPO), \
None, exc_info()[2]
return repo(name, account)
# Create new folders from remote to local.
for remote_name, remote_folder in remote_hash.items():
# Don't create on local_repo, if it is readonly.
if not local_repo.should_create_folders():
break
# Apply remote nametrans and fix serparator.
local_name = remote_folder.getvisiblename().replace(
remote_repo.getsep(), local_repo.getsep())
if remote_folder.sync_this and not local_name in local_hash.keys():
try:
local_repo.makefolder(local_name)
# Need to refresh list.
local_repo.forgetfolders()
except OfflineImapError as e:
self.ui.error(e, exc_info()[2],
"Creating folder %s on repository %s"%
(local_name, local_repo))
raise
status_repo.makefolder(local_name.replace(
local_repo.getsep(), status_repo.getsep()))
# Create new folders from local to remote.
for local_name, local_folder in local_hash.items():
if not remote_repo.should_create_folders():
# Don't create missing folder on readonly repo.
break
# Apply reverse nametrans and fix serparator.
remote_name = local_folder.getvisiblename().replace(
local_repo.getsep(), remote_repo.getsep())
def serverdiagnostics(self, repository, type):
"""Connect to repository and output useful information for debugging."""
conn = None
self._msg("%s repository '%s': type '%s'" % (type, repository.name,
self.getnicename(repository)))
try:
if hasattr(repository, 'gethost'): # IMAP
self._msg("Host: %s Port: %s SSL: %s"% (repository.gethost(),
repository.getport(), repository.getssl()))
try:
conn = repository.imapserver.acquireconnection()
except OfflineImapError as e:
self._msg("Failed to connect. Reason %s" % e)
else:
if 'ID' in conn.capabilities:
self._msg("Server supports ID extension.")
#TODO: Debug and make below working, it hangs Gmail
#res_type, response = conn.id((
# 'name', offlineimap.__productname__,
# 'version', offlineimap.__version__))
#self._msg("Server ID: %s %s" % (res_type, response[0]))
self._msg("Server welcome string: %s" % str(conn.welcome))
self._msg("Server capabilities: %s\n" % str(conn.capabilities))
repository.imapserver.releaseconnection(conn)
if type != 'Status':
folderfilter = repository.getconf('folderfilter', None)
if folderfilter:
self._msg("folderfilter= %s\n" % folderfilter)
:param severity: denoting which operations should be
aborted. E.g. a ERROR.MESSAGE can occur on a faulty
message, but a ERROR.REPO occurs when the server is
offline.
:param errcode: optional number denoting a predefined error
situation (which let's us exit with a predefined exit
value). So far, no errcodes have been defined yet.
:type severity: OfflineImapError.ERROR value"""
self.errcode = errcode
self.severity = severity
# 'reason' is stored in the Exception().args tuple.
super(OfflineImapError, self).__init__(reason)
def __init__(self, reposname, account):
BaseRepository.__init__(self, reposname, account)
# class and root for all backends.
self.backends = {}
self.backends['sqlite'] = {
'class': LocalStatusSQLiteFolder,
'root': os.path.join(account.getaccountmeta(), 'LocalStatus-sqlite')
}
self.backends['plain'] = {
'class': LocalStatusFolder,
'root': os.path.join(account.getaccountmeta(), 'LocalStatus')
}
if self.account.getconf('status_backend', None) is not None:
raise OfflineImapError(
"the 'status_backend' configuration option is not supported"
" anymore; please, remove this configuration option.",
OfflineImapError.ERROR.REPO
)
# Set class and root for sqlite.
self.setup_backend('sqlite')
if not os.path.exists(self.root):
os.mkdir(self.root, 0o700)
# self._folders is a dict of name:LocalStatusFolders().
self._folders = {}
def getfolder(self, foldername):
"""Return a Folder instance of this Maildir
If necessary, scan and cache all foldernames to make sure that
we only return existing folders and that 2 calls with the same
name will return the same object."""
# getfolders() will scan and cache the values *if* necessary
folders = self.getfolders()
for f in folders:
if foldername == f.name:
return f
raise OfflineImapError("getfolder() asked for a nonexisting "
"folder '%s'."% foldername,
OfflineImapError.ERROR.FOLDER)
self.have_newmail = True
elif new_uid == 0:
# Message was stored to dstfolder, but we can't find it's UID
# This means we can't link current message to the one created
# in IMAP. So we just delete local message and on next run
# we'll sync it back
# XXX This could cause infinite loop on syncing between two
# IMAP servers ...
self.deletemessage(uid)
else:
raise OfflineImapError("Trying to save msg (uid %d) on folder "
"%s returned invalid uid %d"% (uid, dstfolder.getvisiblename(),
new_uid), OfflineImapError.ERROR.MESSAGE)
except (KeyboardInterrupt): # bubble up CTRL-C
raise
except OfflineImapError as e:
if e.severity > OfflineImapError.ERROR.MESSAGE:
raise # bubble severe errors up
self.ui.error(e, exc_info()[2])
except Exception as e:
self.ui.error(e, exc_info()[2],
msg = "Copying message %s [acc: %s]"% (uid, self.accountname))
raise #raise on unknown errors, so we can fix those