Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise UserMessageError(
"""It looks like you've tried to run a program against a QPU but do
not currently have a reservation on one. To reserve time on Rigetti
QPUs, use the command line interface, qcs, which comes pre-installed
in your QMI. From within your QMI, type:
qcs reserve --lattice
For more information, please see the docs at
https://www.rigetti.com/qcs/docs/reservations or reach out to Rigetti
support at support@rigetti.com."""
)
logger.debug("QPU Client connecting to %s", endpoint)
return Client(endpoint, auth_config=self._get_client_auth_config())
def __init__(self, endpoint=None, timeout=None):
"""
Client to communicate with the benchmarking data endpoint.
:param endpoint: TCP or IPC endpoint of the Compiler Server
"""
self.client = Client(endpoint, timeout=timeout)
:param device: PyQuil Device object to use as compilation target
"""
if not endpoint.startswith("tcp://"):
raise ValueError(
f"PyQuil versions >= 2.4 can only talk to quilc "
f"versions >= 1.4 over network RPCQ. You've supplied the "
f"endpoint '{endpoint}', but this doesn't look like a network "
f"ZeroMQ address, which has the form 'tcp://domain:port'. "
f"You might try clearing (or correcting) your COMPILER_URL "
f"environment variable and removing (or correcting) the "
f"compiler_server_address line from your .forest_config file."
)
self.endpoint = endpoint
self.client = Client(endpoint, timeout=timeout)
self.target_device = TargetDevice(isa=device.get_isa().to_dict(), specs=None)
try:
self.connect()
except QuilcNotRunning as e:
warnings.warn(f"{e}. Compilation using quilc will not be available.")
self.timeout = timeout
_quilc_endpoint = quilc_endpoint or self.session.config.quilc_url
if not _quilc_endpoint.startswith("tcp://"):
raise ValueError(
f"PyQuil versions >= 2.4 can only talk to quilc "
f"versions >= 1.4 over network RPCQ. You've supplied the "
f"endpoint '{quilc_endpoint}', but this doesn't look like a network "
f"ZeroMQ address, which has the form 'tcp://domain:port'. "
f"You might try clearing (or correcting) your COMPILER_URL "
f"environment variable and removing (or correcting) the "
f"compiler_server_address line from your .forest_config file."
)
self.quilc_client = Client(_quilc_endpoint, timeout=timeout)
self.qpu_compiler_endpoint = qpu_compiler_endpoint
self._qpu_compiler_client = None
self._device = device
self.target_device = TargetDevice(isa=device.get_isa().to_dict(), specs=None)
self.name = name
try:
self.connect()
except QuilcNotRunning as e:
warnings.warn(f"{e}. Compilation using quilc will not be available.")
except QPUCompilerNotRunning as e:
warnings.warn(f"{e}. Compilation using the QPU compiler will not be available.")
def qpu_compiler_client(self) -> Client:
if not self._qpu_compiler_client:
endpoint = self.qpu_compiler_endpoint or self.session.config.qpu_compiler_url
if endpoint is not None:
if endpoint.startswith(("http://", "https://")):
device_endpoint = urljoin(
endpoint, f'devices/{self._device._raw["device_name"]}/'
)
self._qpu_compiler_client = HTTPCompilerClient(
endpoint=device_endpoint, session=self.session
)
elif endpoint.startswith("tcp://"):
self._qpu_compiler_client = Client(endpoint, timeout=self.timeout)
else:
raise UserMessageError(
"Invalid endpoint provided to QPUCompiler. Expected protocol in [http://, "
f"https://, tcp://], but received endpoint {endpoint}"
)
return self._qpu_compiler_client
def reset(self) -> None:
"""
Reset the state of the QVMCompiler quilc connection.
"""
timeout = self.client.timeout
self.client.close()
self.client = Client(self.endpoint, timeout=timeout)