Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def reverse_resolve(self, target_ip):
for name, candidate_ip in self.dns.items():
if not isIPAddress(name) and candidate_ip == target_ip:
return defer.succeed(name)
return defer.fail(socket.herror())
for group in groups:
try:
g = self.dbm['groups'][group]
except KeyError:
pass
else:
index = g.maxArticle + 1
g.maxArticle += 1
g.articles[index] = article
xref.append((group, str(index)))
self.dbm['groups'][group] = g
if not xref:
return defer.fail(NewsServerError("No groups carried: " + ' '.join(groups)))
article.putHeader('Xref', '%s %s' % (socket.gethostname().split()[0], ' '.join(map(lambda x: ':'.join(x), xref))))
self.dbm['Message-IDs'][article.getHeader('Message-ID')] = xref
return defer.succeed(None)
@type contextFactory: L{NoneType } or
L{ClientContextFactory }
@param contextFactory: The context factory with which to negotiate TLS.
If not provided, try to create a new one.
@rtype: L{Deferred } which successfully results in
L{dict} mapping L{bytes} to L{list} of L{bytes} and/or L{bytes} to
L{NoneType } or fails with L{TLSError}
@return: A deferred which fires when the transport has been
secured according to the given context factory with the server
capabilities, or which fails with a TLS error if the transport
cannot be secured.
"""
tls = interfaces.ITLSTransport(self.transport, None)
if tls is None:
return defer.fail(TLSError(
"POP3Client transport does not implement "
"interfaces.ITLSTransport"))
if contextFactory is None:
contextFactory = self._getContextFactory()
if contextFactory is None:
return defer.fail(TLSError(
"POP3Client requires a TLS context to "
"initiate the STLS handshake"))
d = self.capabilities()
d.addCallback(self._startTLS, contextFactory, tls)
return d
@param path: the directory or file to check.
@type path: C{str}
@param keys: the list of desired metadata
@type keys: C{list} of C{str}
"""
filePath = self._path(path)
if filePath.isdir():
entries = filePath.listdir()
fileEntries = [filePath.child(p) for p in entries]
elif filePath.isfile():
entries = [os.path.join(*filePath.segmentsFrom(self.filesystemRoot))]
fileEntries = [filePath]
else:
return defer.fail(FileNotFoundError(path))
results = []
for fileName, filePath in zip(entries, fileEntries):
ent = []
results.append((fileName, ent))
if keys:
try:
ent.extend(self._statNode(filePath, keys))
except (IOError, OSError), e:
return errnoToFailure(e.errno, fileName)
except:
return defer.fail()
return defer.succeed(results)
def receive(self, path):
path = self._path(path)
return defer.fail(AnonUserDeniedError())
def claim_renew_all_before_expiration(self, height):
return defer.fail(NotImplementedError())
def ftp_RNTO(self, toName):
fromName = self._fromName
del self._fromName
self.state = self.AUTHED
try:
fromsegs = toSegments(self.workingDirectory, fromName)
tosegs = toSegments(self.workingDirectory, toName)
except InvalidPath:
return defer.fail(FileNotFoundError(fromName))
return self.shell.rename(fromsegs, tosegs).addCallback(lambda ign: (REQ_FILE_ACTN_COMPLETED_OK,))
def rename(self, fromPath, toPath):
return defer.fail(AnonUserDeniedError())
def push(self, namespace):
try:
package = pickle.dumps(namespace, 2)
except:
return defer.fail(failure.Failure())
else:
if isinstance(package, failure.Failure):
return defer.fail(package)
else:
d = self.callRemote('push', package)
return d.addCallback(self.checkReturnForFailure)
def ftp_RNTO(self, toName):
fromName = self._fromName
del self._fromName
self.state = self.AUTHED
try:
fromsegs = toSegments(self.workingDirectory, fromName)
tosegs = toSegments(self.workingDirectory, toName)
except InvalidPath:
return defer.fail(FileNotFoundError(fromName))
return self.shell.rename(fromsegs, tosegs).addCallback(lambda ign: (REQ_FILE_ACTN_COMPLETED_OK,))