Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _applyCommand(self, command, callback, commandType = None):
try:
if commandType is None:
self.__commandsQueue.put_nowait((command, callback))
else:
self.__commandsQueue.put_nowait((_bchr(commandType) + command, callback))
if not self.__conf.appendEntriesUseBatch:
self.__pipeNotifier.notify()
except queue.Full:
self.__callErrCallback(FAIL_REASON.QUEUE_FULL, callback)
if callback is not None:
self.__commandsWaitingCommit[idx].append((term, callback))
else:
self.__send(requestNode, {
'type': 'apply_command_response',
'request_id': requestID,
'log_idx': idx,
'log_term': term,
})
if not self.__conf.appendEntriesUseBatch:
self.__sendAppendEntries()
else:
if requestNode is None:
if callback is not None:
callback(None, FAIL_REASON.REQUEST_DENIED)
else:
self.__send(requestNode, {
'type': 'apply_command_response',
'request_id': requestID,
'error': FAIL_REASON.REQUEST_DENIED,
})
elif self.__raftLeader is not None:
if requestNode is None:
message = {
'type': 'apply_command',
'command': command,
}
if callback is not None:
self.__commandsLocalCounter += 1
def __onLeaderChanged(self):
for id in sorted(self.__commandsWaitingReply):
self.__commandsWaitingReply[id](None, FAIL_REASON.LEADER_CHANGED)
self.__commandsWaitingReply = {}
self.__leaderCommitIndex = self.__raftCommitIndex
needSendAppendEntries = False
if self.__raftCommitIndex > self.__raftLastApplied:
count = self.__raftCommitIndex - self.__raftLastApplied
entries = self.__getEntries(self.__raftLastApplied + 1, count)
for entry in entries:
currentTermID = entry[2]
subscribers = self.__commandsWaitingCommit.pop(entry[1], [])
res = self.__doApplyCommand(entry[0])
for subscribeTermID, callback in subscribers:
if subscribeTermID == currentTermID:
callback(res, FAIL_REASON.SUCCESS)
else:
callback(None, FAIL_REASON.DISCARDED)
self.__raftLastApplied += 1
if not self.__conf.appendEntriesUseBatch:
needSendAppendEntries = True
if self.__raftState == _RAFT_STATE.LEADER:
if time.time() > self.__newAppendEntriesTime or needSendAppendEntries:
self.__sendAppendEntries()
if not self.__onReadyCalled and self.__raftLastApplied == self.__leaderCommitIndex:
if self.__conf.onReady:
self.__conf.onReady()
self.__onReadyCalled = True
self._checkCommandsToApply()
self.__tryLogCompaction()
message = {
'type': 'apply_command',
'command': command,
}
if callback is not None:
self.__commandsLocalCounter += 1
self.__commandsWaitingReply[self.__commandsLocalCounter] = callback
message['request_id'] = self.__commandsLocalCounter
self.__send(self.__raftLeader, message)
else:
self.__send(requestNode, {
'type': 'apply_command_response',
'request_id': requestID,
'error': FAIL_REASON.NOT_LEADER,
})
else:
self.__callErrCallback(FAIL_REASON.MISSING_LEADER, callback)
else:
break
self.__leaderCommitIndex = self.__raftCommitIndex
needSendAppendEntries = False
if self.__raftCommitIndex > self.__raftLastApplied:
count = self.__raftCommitIndex - self.__raftLastApplied
entries = self.__getEntries(self.__raftLastApplied + 1, count)
for entry in entries:
currentTermID = entry[2]
subscribers = self.__commandsWaitingCommit.pop(entry[1], [])
res = self.__doApplyCommand(entry[0])
for subscribeTermID, callback in subscribers:
if subscribeTermID == currentTermID:
callback(res, FAIL_REASON.SUCCESS)
else:
callback(None, FAIL_REASON.DISCARDED)
self.__raftLastApplied += 1
if not self.__conf.appendEntriesUseBatch:
needSendAppendEntries = True
if self.__raftState == _RAFT_STATE.LEADER:
if time.time() > self.__newAppendEntriesTime or needSendAppendEntries:
self.__sendAppendEntries()
if not self.__onReadyCalled and self.__raftLastApplied == self.__leaderCommitIndex:
if self.__conf.onReady:
self.__conf.onReady()
self.__onReadyCalled = True
}
if callback is not None:
self.__commandsLocalCounter += 1
self.__commandsWaitingReply[self.__commandsLocalCounter] = callback
message['request_id'] = self.__commandsLocalCounter
self.__send(self.__raftLeader, message)
else:
self.__send(requestNode, {
'type': 'apply_command_response',
'request_id': requestID,
'error': FAIL_REASON.NOT_LEADER,
})
else:
self.__callErrCallback(FAIL_REASON.MISSING_LEADER, callback)