Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def receive(self, command_list):
for cmd in command_list:
if not cmd.gate == FlushGate():
self.cache_cmd(cmd)
if not self.is_last_engine:
self.send(command_list)
def _handle(self, cmd):
if isinstance(cmd.gate, FlushGate):
return
if cmd.gate == Measure:
for qr in cmd.qubits:
for qb in qr:
self.main_engine.set_measurement_result(qb,
self.read_bit(qb))
return
if cmd.gate == Allocate:
new_id = cmd.qubits[0][0].id
self._bit_positions[new_id] = len(self._bit_positions)
return
if cmd.gate == Deallocate:
old_id = cmd.qubits[0][0].id
def flush(self, deallocate_qubits=False):
"""
Flush the entire circuit down the pipeline, clearing potential buffers
(of, e.g., optimizers).
Args:
deallocate_qubits (bool): If True, deallocates all qubits that are
still alive (invalidating references to them by setting their
id to -1).
"""
if deallocate_qubits:
for qb in self.active_qubits:
qb.__del__()
self.active_qubits = weakref.WeakSet()
self.receive([Command(self, FlushGate(), ([WeakQubitRef(self, -1)],))])
def _call_cluster_scheduler(self):
self._prepare_ctrlz()
local_qubits = self._get_local_ids_list_from_backend()
global_qubits = self._get_global_ids_list_from_backend()
while True:
gate, gate_ctrl, gate_diag = self._get_commands()
cluster_scheduler = ClusterScheduler(gate, gate_ctrl, gate_diag, local_qubits, global_qubits,
self.CLUSTER_SIZE)
avail = cluster_scheduler.ScheduleCluster()
if len(avail) == 0:
return
for i in avail:
self.send([self._cmd_list[i]])
self.send([Command(self, FlushGate(), ([WeakQubitRef(self, -1)],))])
for i in reversed(sorted(avail)):
del self._cmd_list[i]
# print("in", len(self._cmd_list))
def _store(self, cmd):
"""
Store a command and handle CNOTs.
Args:
cmd (Command): A command to store
"""
if not cmd.gate == FlushGate():
apply_to = cmd.qubits[0][0].id
if apply_to not in self._interactions:
self._interactions[apply_to] = set()
self._num_cnot_target[apply_to] = 0
if self._is_cnot(cmd):
# CNOT encountered
ctrl = cmd.control_qubits[0].id
if ctrl not in self._interactions:
self._interactions[ctrl] = set()
self._num_cnot_target[ctrl] = 0
self._interactions[ctrl].add(apply_to)
self._interactions[apply_to].add(ctrl)
self._num_cnot_target[apply_to] += 1
self._cmds.append(cmd)
Receives a command list and, for each command, stores it until
completion.
Args:
command_list (list of Command objects): list of commands to
receive.
Raises:
Exception: If mapping the CNOT gates to 1 qubit would require
Swaps. The current version only supports remapping of CNOT
gates without performing any Swaps due to the large costs
associated with Swapping given the CNOT constraints.
"""
for cmd in command_list:
self._store(cmd)
if isinstance(cmd.gate, FlushGate):
self._run()
self._reset()
def _store(self, cmd):
"""
Store a command and handle CNOTs.
Args:
cmd (Command): A command to store
"""
if not cmd.gate == FlushGate():
target = cmd.qubits[0][0].id
if self._is_cnot(cmd):
# CNOT encountered
ctrl = cmd.control_qubits[0].id
if not (ctrl, target) in self._interactions:
self._interactions[(ctrl, target)] = 0
self._interactions[(ctrl, target)] += 1
elif cmd.gate == Allocate:
if target not in self.current_mapping:
new_max = 0
if len(self.current_mapping) > 0:
new_max = max(self.current_mapping.values()) + 1
self._current_mapping[target] = new_max
self._cmds.append(cmd)
def receive(self, command_list):
"""
Receive a list of commands from the previous engine and handle them
(simulate them classically) prior to sending them on to the next
engine.
Args:
command_list (list): List of commands to execute on the
simulator.
"""
for cmd in command_list:
if isinstance(cmd.gate, FlushGate) or isinstance(cmd.gate, FastForwardingGate):
self._simulator.run() # flush gate --> run all saved gates
self._handle(cmd)
if not self.is_last_engine:
self.send([cmd])