Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def Register(agent, param):
s = spade.DF.Service()
s.setName("unittest_name_1_"+param)
s.setOwner(agent.getAID())
s.addP("service_precondition")
s.addQ("service_postcondition")
s.setInputs(['login','password'])
s.setOutputs(['account_id'])
s.setDescription("Login Service")
s.setOntology("account_managing")
agent.result = agent.registerService(s)
def CreateService(name, owner, inputs=[], outputs=[]):
return spade.DF.Service(name, owner, inputs=inputs, outputs=outputs)
def DeRegisterService(agent):
service = spade.DF.DfAgentDescription()
service.setAID(agent.getAID())
#service = CreateService("VecSum",agent.getAID())
agent.result = agent.deregisterService(service)
def DeRegister_DAD(agent, param):
dad = spade.DF.DfAgentDescription()
dad.setAID(agent.getAID())
agent.result = agent.deregisterService(dad)
def Modify_DAD(agent, param):
sd = spade.DF.ServiceDescription()
sd.setName("unittest_name_1_"+param)
sd.setType("unittest_type_1_modified_"+param)
dad = spade.DF.DfAgentDescription()
dad.addService(sd)
dad.setAID(agent.getAID())
agent.result = agent.modifyService(dad)
(property :name P :value valueP1)
)
)
)
:lease-time 1000
:scope scope11
)
"""
content = parser.parse(sl0)
self.assertNotEqual(content,None)
dad1 = CreateDAD("1")
sd = CreateSD("1")
dad1.addService(sd)
dad2 = spade.DF.DfAgentDescription()
dad2.loadSL0(content)
assert dad1.match(dad2)
assert sl0 == "("+dad2.asSL0()+")"
assert dad1.asSL0() == dad2.asSL0()
def DeRegisterService(agent):
service = spade.DF.DfAgentDescription()
service.setAID(agent.getAID())
#service = CreateService("VecSum",agent.getAID())
agent.result = agent.deregisterService(service)
def CreateSD(s=""):
sd = spade.DF.ServiceDescription()
sd.setName("servicename1"+s)
sd.setType("type1"+s)
sd.addProtocol("sdprotocol1"+s)
sd.addOntologies("sdontology1"+s)
sd.addLanguage("sdlanguage1"+s)
sd.setOwnership("agent1"+s)
sd.addProperty("P","valueP"+s)
sd.addProperty("Q","valueQ"+s)
return sd
def onStart(self):
self.myAgent.DEBUG("Starting behaviour . . .",'ok')
s = spade.DF.Service(name="RPCmethod",owner=self.myAgent.getAID(),P="P")
self.myAgent.registerService(s,self.myAgent.RPCmethod)
self.myAgent.DEBUG("Service registered "+str(s),'ok')
def start(self):
#TODO: this should be configurable
self.acc = self.runAgent(self.config, "acc", Platform.SpadePlatform)
#self.acc.setDebugToScreen()
self.ams = self.runAgent(self.config, "ams", AMS.AMS)
self.ams.DEBUG = self.acc.DEBUG
#self.ams.setDebugToScreen()
self.df = self.runAgent(self.config, "df", DF.DF)
self.df.DEBUG = self.acc.DEBUG
#self.df.setDebugToScreen()