Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise AlreadyExecutingError("Cannot execute through the IQ# client while another execution is completing.")
self._busy = True
reply = self.kernel_client.execute_interactive(input, output_hook=_output_hook, **kwargs)
finally:
self._busy = False
logger.debug(f"received:\n{reply}")
# There should be either zero or one execute_result messages.
if errors:
raise IQSharpError(errors)
if results:
assert len(results) == 1
content = results[0]['content']
if 'application/json' in content['data']:
obj = unmap_tuples(json.loads(content['data']['application/json']))
else:
obj = None
return (obj, content) if return_full_result else obj
else:
return None
def capture(msg):
# We expect a display_data with the version table.
if msg["msg_type"] == "display_data":
data = unmap_tuples(json.loads(msg["content"]["data"]["application/json"]))
for component, version in data["rows"]:
versions[component] = LooseVersion(version)
self._execute("%version", output_hook=capture, _quiet_=True, **kwargs)