Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@exportRpc("delete-extdirectremote")
def deleteExtDirectRemote(self, extDirectUri):
"""
Delete an Ext.Direct remote.
"""
return self.proto.dbpool.runInteraction(self._deleteExtDirectRemote, extDirectUri)
@exportRpc("scratch-database")
def scratchDatabase(self, restart = True):
self.proto.factory.services["database"].scratchDatabase()
if restart:
self.restartHub()
@exportRpc("getWebGLData")
def getWebGLData(self, view_id, object_id, part):
view = self.getView(view_id)
data = self.getApplication().GetWebGLBinaryData(view.SMProxy, str(object_id), part-1)
return data
@exportRpc("delete-pgconnect")
def deletePgConnect(self, pgConnectUri, cascade = False):
"""
Delete a PostgreSQL database connect.
"""
return self.proto.dbpool.runInteraction(self._deletePgConnect, pgConnectUri, cascade)
@exportRpc("get-eula-accepted")
def getEulaAccepted(self):
return self.dbpool.runInteraction(self._getEulaAccepted)
@exportRpc("query-oraapi-by-appkey")
def queryOraApiByAppKey(self, appkey):
"""
Query Oracle remoted API for authentication key.
"""
return self.proto.factory.services["oraremoter"].getRemotes(appkey)
@exportRpc("get-license-options")
def getLicenseOptions(self):
return self.factory.services['database'].getLicenseOptions()
@exportRpc("modify-appcred")
def modifyAppCred(self, appCredUri, specDelta):
"""
Modify existing application credential.
Parameters:
appCredUri: URI or CURIE of application credential to modify.
specDelta: Application credential change specification, a dictionary.
specDelta[]:
label: Label, a string, not necessarily unique.
key: Key, a string, must be unique.
secret: Secret, a string.
Result:
{"uri": ,
@exportRpc("create-restremote")
def createRestRemote(self, spec):
return self.proto.dbpool.runInteraction(self._createRestRemote, spec)
@exportRpc
def data (self):
if self.monitor is None:
raise Exception(BASE + "/error#not_monitoring", "You are not connected to an experiment")
if len(self.monitor_streams) == 0:
return None
# make sure the updates come in consistent steps.
time_now = now()
frequency = self.monitor_frequency
time_now -= time_now % frequency
# make sure we have a decent time-step (at least one frequency)
start = self.last_update + frequency
interval = time_now - start