Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def sample_poly(self, poly, num_reads=1):
if any(bias > 1 for bias in poly.values()):
raise RuntimeError
if any(bias < -1 for bias in poly.values()):
raise RuntimeError
samples = np.ones((num_reads, len(poly.variables))), list(poly.variables)
sampleset = dimod.SampleSet.from_samples(samples, vartype=poly.vartype,
energy=poly.energies(samples))
return sampleset
def test_triu_spin(self):
arr = np.triu(np.ones((5, 5)))
variables = [0, 1, 'a', 'b', 'c']
samples = dimod.SampleSet.from_samples((2*arr-1, variables),
dimod.SPIN, energy=[4., 3, 2, 1, 0])
s = Formatter(width=79, depth=None).format(samples)
target = '\n'.join([" 0 1 a b c energy num_oc.",
"4 -1 -1 -1 -1 +1 0.0 1",
"3 -1 -1 -1 +1 +1 1.0 1",
"2 -1 -1 +1 +1 +1 2.0 1",
"1 -1 +1 +1 +1 +1 3.0 1",
"0 +1 +1 +1 +1 +1 4.0 1",
"['SPIN', 5 rows, 5 samples, 5 variables]"])
self.assertEqual(s, target)
def test_unpacked_bytes(self):
# dev note: we are using an unsupported back door that allows
# samplesets to handle integer variables. This support could
# disappear at any time
samples = np.arange(25).reshape((5, 5))
sampleset = dimod.SampleSet.from_samples(samples, 'BINARY', 1)
s = sampleset.to_serializable(use_bytes=True, pack_samples=False)
new = dimod.SampleSet.from_serializable(s)
np.testing.assert_array_equal(sampleset.record, new.record)
def test_sampleset_triu(self):
num_variables = 100
num_samples = 100
samples = 2*np.triu(np.ones((num_samples, num_variables)), -4) - 1
bqm = dimod.BinaryQuadraticModel.from_ising({v: .1*v for v in range(num_variables)}, {})
obj = dimod.SampleSet.from_samples_bqm(samples, bqm)
new = json.loads(json.dumps(obj, cls=DimodEncoder), cls=DimodDecoder)
self.assertEqual(obj, new)
solver_params = dict(chains=list(physical.embedding.values()),
num_reads=samples_list[i],
num_spin_reversal_transforms=spin_rev_list[i],
postprocess=postproc)
solver_params.update(self.extra_solver_params)
if anneal_sched == None:
solver_params["annealing_time"] = anneal_time
else:
solver_params["anneal_schedule"] = anneal_sched
with self.rejected_params_lock:
for p in self.rejected_params:
del solver_params[p]
# Submit the QMI to the solver in a background thread.
future = executor.submit(self._submit_and_block, sampler, bqm, i == 0, **solver_params)
results[i] = SampleSet.from_future(future)
# Wait for the QMIs to finish then return the results.
executor.shutdown(wait=False)
return self.complete_sample_acquisition(verbosity, results, overall_start_time, physical)
# some attribute lookups will be delegated to superclass, to handle things like pickling
_delegated = frozenset(('__reduce_ex__', '__reduce__',
'__getstate__', '__setstate__',
'__getinitargs__', '__getnewargs__'))
def __getattr__(self, name):
if name in self._delegated:
return super(PliableDict, self).__getattr__(name)
return self.get(name)
def __setattr__(self, name, value):
self[name] = value
class SampleSet(dimod.SampleSet):
"""The `dimod.SampleSet` extended with a few helper methods.
Note: this is basically a staging area for new `dimod.SampleSet` features
before we merge them upstream.
"""
def __init__(self, *args, **kwargs):
if not args and not kwargs:
# construct empty SampleSet
empty = self.empty()
super(SampleSet, self).__init__(empty.record, empty.variables,
empty.info, empty.vartype)
else:
super(SampleSet, self).__init__(*args, **kwargs)
@classmethod
bqm = inp1.problem
ss1 = inp1.samples.change_vartype(dimod.BINARY, inplace=False)
ss2 = inp2.samples.change_vartype(dimod.BINARY, inplace=False)
# sanity check: we operate on the same set of variables
if ss1.variables ^ ss2.variables:
raise ValueError("input samples not over the same set of variables")
# reorder variables, if necessary
# (use sequence comparison, not set)
variables = list(ss1.variables)
if ss2.variables != variables:
reorder = [ss2.variables.index(v) for v in variables]
record = ss2.record[:, reorder]
ss2 = dimod.SampleSet(record, variables, ss2.info, ss2.vartype)
# samples' symmetric difference (XOR)
# (form clusters of input variables with opposite values)
sample1 = ss1.record.sample[0]
sample2 = ss2.record.sample[0]
symdiff = sample1 ^ sample2
# for cluster detection we'll use a reduced problem graph
graph = bqm.to_networkx_graph()
# note: instead of numpy mask indexing of `notcluster`, we enumerate
# non-cluster variables manually to avoid conversion of potentially
# unhashable variable names to numpy types
notcluster = [v for v, d in zip(variables, symdiff) if d == 0]
graph.remove_nodes_from(notcluster)
# pick a random variable that belongs to a cluster, then select the cluster
# interpolate a geometric beta schedule
beta_schedule = np.geomspace(*beta_range, num=num_betas)
else:
raise ValueError("Beta schedule type {} not implemented".format(beta_schedule_type))
# run the simulated annealing algorithm
samples, energies = sa.simulated_annealing(
num_reads, ldata, irow, icol, qdata,
num_sweeps_per_beta, beta_schedule,
seed, initial_states_array, interrupt_function)
info = {
"beta_range": beta_range,
"beta_schedule_type": beta_schedule_type
}
response = dimod.SampleSet.from_samples(
(samples, variable_order),
energy=energies+bqm.offset, # add back in the offset
info=info,
vartype=dimod.SPIN
)
response.change_vartype(original_vartype, inplace=True)
return response
def sample(self, bqm, initial_states=None, initial_states_generator='random',
num_reads=None, tenure=None, timeout=20, scale_factor=1, **kwargs):
# validate/initialize initial_states
if initial_states is None:
initial_states = dimod.SampleSet.from_samples(
(np.empty((0, num_variables)), bqm.variables),
energy=0, vartype=bqm.vartype)
if not isinstance(initial_states, dimod.SampleSet):
raise TypeError("'initial_states' is not 'dimod.SampleSet' instance")
# validate num_reads and/or infer them from initial_states
if num_reads is None:
num_reads = len(initial_states) or 1
if not isinstance(num_reads, Integral):
raise TypeError("'num_reads' should be a positive integer")
if num_reads < 1:
raise ValueError("'num_reads' should be a positive integer")
# initial states generators
_generators = {