Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# first do general GBS compilation to make sure
# Fock measurements are correct
# ---------------------------------------------
seq = GBSSpecs().compile(seq, registers)
A, B, C = group_operations(seq, lambda x: isinstance(x, ops.MeasureFock))
if len(B[0].reg) != self.modes:
raise CircuitError("All modes must be measured.")
# Check circuit begins with two mode squeezers
# --------------------------------------------
A, B, C = group_operations(seq, lambda x: isinstance(x, ops.S2gate))
if A:
raise CircuitError("Circuits must start with two S2gates.")
# get circuit registers
regrefs = {q for cmd in B for q in cmd.reg}
if len(regrefs) != self.modes:
raise CircuitError("S2gates do not appear on the correct modes.")
# Compile the unitary: combine and then decompose all unitaries
# -------------------------------------------------------------
A, B, C = group_operations(seq, lambda x: isinstance(x, (ops.Rgate, ops.BSgate)))
# begin unitary lists for mode [0, 1] and modes [2, 3] with
# two identity matrices. This is because multi_dot requires
# at least two matrices in the list.
U_list01 = [np.identity(self.modes // 2, dtype=np.complex128)] * 2
U_list23 = [np.identity(self.modes // 2, dtype=np.complex128)] * 2
def test_not_all_modes_measured(self):
"""Test exceptions raised if not all modes are measured"""
prog = sf.Program(4)
U = random_interferometer(2)
with prog.context as q:
ops.S2gate(0.5) | (q[0], q[2])
ops.S2gate(0.5) | (q[1], q[3])
ops.Interferometer(U) | (q[0], q[1])
ops.Interferometer(U) | (q[2], q[3])
ops.MeasureFock() | (q[0], q[1])
with pytest.raises(CircuitError, match="All modes must be measured"):
res = prog.compile("chip0")
# clear the program for reuse
prog.locked = False
prog.circuit = []
with prog.context as r:
# fake a measurement for speed
r[0].val = 0.1
r[1].val = 0.2
if G.ns == 1:
G | r[0]
else:
G | (r[0], r[1])
try:
eng.run(prog, **kwargs)
except pu.CircuitError as err:
# record CircuitSpecs-forbidden op/backend combinations here
warnings.warn(str(err))
eng.reset()
kwargs = self.decompositions[op_name]
temp = cmd.op.decompose(cmd.reg, **kwargs)
# now compile the decomposition
temp = self.decompose(temp)
compiled.extend(temp)
except NotImplementedError as err:
# Operation does not have _decompose() method defined!
# simplify the error message by suppressing the previous exception
raise err from None
elif op_name in self.primitives:
# target can handle the op natively
compiled.append(cmd)
else:
raise pu.CircuitError("The operation {} cannot be used with the target '{}'.".format(cmd.op.__class__.__name__, self.short_name))
return compiled
def append(self, op, reg):
"""Append a command to the program.
Args:
op (Operation): quantum operation
reg (list[int, RegRef]): register subsystem(s) to apply it to
Returns:
list[RegRef]: subsystem list as RegRefs
"""
if self.locked:
raise CircuitError('The Program is locked, no more Commands can be appended to it.')
# test that the target subsystem references are ok
reg = self._test_regrefs(reg)
# also test possible Parameter-related dependencies
self._test_regrefs(op.measurement_deps)
for rr in reg:
# it's used now
self.unused_indices.discard(rr.ind)
self.circuit.append(Command(op, reg))
return reg
m, n = modes
t = np.cos(params[0])
r = np.exp(1j * params[1]) * np.sin(params[0])
U[m % 2, m % 2] = t
U[m % 2, n % 2] = -np.conj(r)
U[n % 2, m % 2] = r
U[n % 2, n % 2] = t
if set(modes).issubset({0, 1}):
U_list01.insert(0, U)
elif set(modes).issubset({2, 3}):
U_list23.insert(0, U)
else:
raise CircuitError(
"Unitary must be applied separately to modes [0, 1] and modes [2, 3]."
)
# multiply all unitaries together, to get the final
# unitary representation on modes [0, 1] and [2, 3].
U01 = multi_dot(U_list01)
U23 = multi_dot(U_list23)
# check unitaries are equal
if not np.allclose(U01, U23):
raise CircuitError(
"Interferometer on modes [0, 1] must be identical to interferometer on modes [2, 3]."
)
U = block_diag(U01, U23)
# (but this is already guaranteed by group_operations() and our primitive set)
# without Fock measurements GBS is pointless
if not B:
raise CircuitError('GBS circuits must contain Fock measurements.')
# there should be only Fock measurements in B
measured = set()
for cmd in B:
if not isinstance(cmd.op, ops.MeasureFock):
raise CircuitError('The Fock measurements are not consecutive.')
# combine the Fock measurements
temp = set(cmd.reg)
if measured & temp:
raise CircuitError('Measuring the same mode more than once.')
measured |= temp
# replace B with a single Fock measurement
B = [Command(ops.MeasureFock(), sorted(list(measured), key=lambda x: x.ind))]
return super().compile(A + B, registers)
U_list0.insert(0, U)
elif set(modes).issubset({4, 5, 6, 7}):
U_list4.insert(0, U)
else:
raise CircuitError(
"Unitary must be applied separately to modes [0, 1, 2, 3] and modes [4, 5, 6, 7]."
)
# multiply all unitaries together, to get the final
# unitary representation on modes [0, 1] and [2, 3].
U0 = multi_dot(U_list0)
U4 = multi_dot(U_list4)
# check unitaries are equal
if not np.allclose(U0, U4):
raise CircuitError(
"Interferometer on modes [0, 1, 2, 3] must be identical to interferometer on modes [4, 5, 6, 7]."
)
U = block_diag(U0, U4)
# replace B with an interferometer
B = [
Command(ops.Interferometer(U0), registers[:4]),
Command(ops.Interferometer(U4), registers[4:]),
]
# decompose the interferometer, using Mach-Zehnder interferometers
B = self.decompose(B)
# Do a final circuit topology check
# ---------------------------------
# get set of circuit registers as a tuple for each S2gate
regrefs = {(cmd.reg[0].ind, cmd.reg[1].ind) for cmd in B}
# the set of allowed mode-tuples the S2gates must have
allowed_modes = set(zip(range(0, 4), range(4, 8)))
if not regrefs.issubset(allowed_modes):
raise CircuitError("S2gates do not appear on the correct modes.")
# ensure provided S2gates all have the allowed squeezing values
allowed_sq_value = {(0.0, 0.0), (self.sq_amplitude, 0.0)}
sq_params = {(float(np.round(cmd.op.p[0], 3)), float(cmd.op.p[1])) for cmd in B}
if not sq_params.issubset(allowed_sq_value):
wrong_params = sq_params - allowed_sq_value
raise CircuitError(
"Incorrect squeezing value(s) (r, phi)={}. Allowed squeezing "
"value(s) are (r, phi)={}.".format(wrong_params, allowed_sq_value)
)
# determine which modes do not have input S2gates specified
missing = allowed_modes - regrefs
for i, j in missing:
# insert S2gates with 0 squeezing
seq.insert(0, Command(ops.S2gate(0, 0), [registers[i], registers[j]]))
# Check if matches the circuit template
# --------------------------------------------
# This will avoid superfluous unitary compilation.
try:
seq = super().compile(seq, registers)
def _add_subsystems(self, n):
"""Create new subsystem references, add them to the reg_ref dictionary.
To avoid discrepancies with the backend this method must not be called directly,
but rather indirectly by using :func:`~strawberryfields.ops.New`
in a Program context.
.. note:: This is the only place where :class:`RegRef` instances are constructed.
Args:
n (int): number of subsystems to add
Returns:
tuple[RegRef]: tuple of the newly added subsystem references
"""
if self.locked:
raise CircuitError('The Program is locked, no new subsystems can be created.')
if not isinstance(n, numbers.Integral) or n < 1:
raise ValueError('Number of added subsystems {} is not a positive integer.'.format(n))
first_unassigned_index = len(self.reg_refs)
# create a list of RegRefs
inds = [first_unassigned_index+i for i in range(n)]
refs = tuple(RegRef(i) for i in inds)
# add them to the index map
for r in refs:
self.reg_refs[r.ind] = r
# all the newly reserved indices are unused for now
self.unused_indices.update(inds)
return refs