Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.Aaid = spade.AID.aid("a@"+host,["xmpp://a@"+host])
self.Baid = spade.AID.aid("b@"+host,["xmpp://b@"+host])
self.a = MyAgent("a@"+host, "secret")
self.a.start()
self.b = MyAgent("b@"+host, "secret")
#self.b.setDebugToScreen()
#self.a.setDebugToScreen()
self.b.start()
self.a.wui.start()
self.b.wui.start()
def testCreateAid(self):
aid = spade.AID.aid()
self.assertEqual(aid.getName(),None)
self.assertEqual(aid.getAddresses(),[])
self.assertEqual(aid.getResolvers(),[])
self.assertEqual(aid.getProperties(),[])
aid.setName("aidname")
self.assertEqual(aid.getName(),"aidname")
aid.addAddress("address@server.com")
self.assertEqual(len(aid.getAddresses()),1)
self.assertEqual(aid.getAddresses()[0],"address@server.com")
aid.addResolvers("resolver@server.com")
self.assertEqual(len(aid.getResolvers()),1)
self.assertEqual(aid.getResolvers()[0],"resolver@server.com")
aid.addProperty("prop1")
self.assertEqual(len(aid.getProperties()),1)
def CreateDAD(s=""):
dad = spade.DF.DfAgentDescription()
aid = spade.AID.aid()
aid.setName("aidname"+s)
dad.setAID(aid)
dad.addProtocol("protocol1"+s)
dad.addOntologies("ontology1"+s)
dad.addLanguage("language1"+s)
dad.setLeaseTime(1000)
dad.addScope("scope1"+s)
return dad
def testCreateDad(self):
dad = spade.DF.DfAgentDescription()
self.assertEqual(dad.getName(),spade.AID.aid())
self.assertEqual(dad.getServices(),[])
self.assertEqual(dad.getProtocols(),[])
self.assertEqual(dad.getOntologies(),[])
self.assertEqual(dad.getLanguages(),[])
self.assertEqual(dad.getLeaseTime(),None)
self.assertEqual(dad.getScope(),[])
aid = spade.AID.aid()
aid.setName("aidname")
dad.setAID(aid)
self.assertEqual(dad.getName(),"aidname")
dad.addProtocol("protocol1")
self.assertEqual(dad.getProtocols(),["protocol1"])
dad.addOntologies("ontology1")
self.assertEqual(dad.getOntologies(),["ontology1"])
def testMatch(self):
a1 = spade.AID.aid(name="name1",addresses=["addr1","addresses2"],resolvers=["resolver1"])
a2 = spade.AID.aid(name="name1",addresses=["addr1"],resolvers=["resolver1"])
self.assertEqual(a1.match(a2),True)
a2 = spade.AID.aid(name="nam",addresses=["addr1","addresses2"],resolvers=["resolver1"])
self.assertEqual(a1.match(a2),True)
a2 = spade.AID.aid(addresses=["addresses2"])
self.assertEqual(a1.match(a2),True)
a2 = spade.AID.aid(name="other",addresses=["addr1","addresses2"],resolvers=["resolver1"])
self.assertEqual(a1.match(a2),False)
a2 = spade.AID.aid(name="name1",addresses=["addr2"],resolvers=["resolver1"])
self.assertEqual(a1.match(a2),False)
def CreateCO(s=""):
co = spade.content.ContentObject()
co["name"] = spade.AID.aid(name="aidname"+s).asContentObject()
co["lease-time"] = 1000
co["protocols"] = ["protocol1"+s,"sdprotocol1"+s]
co["ontologies"] = ["ontology1"+s,"sdontology1"+s]
co["languages"] = ["language1"+s, "sdlanguage1"+s]
co["scope"] = "scope1"+s
sdco = spade.content.ContentObject()
sdco["name"] = "servicename1"+s
sdco["type"] = "type1"+s
sdco["protocols"] = ["sdprotocol1"+s]
sdco["languages"] = ["sdlanguage1"+s]
sdco["ontologies"] = ["sdontology1"+s]
sdco["ownership"] = "agent1"+s
sdco["properties"] = spade.content.ContentObject()
sdco["properties"]["P"] = "valueP"+s
sdco["properties"]["Q"] = "valueQ"+s
def setUp(self):
self.Aaid = spade.AID.aid("a@" + host, ["xmpp://a@" + host])
self.Baid = spade.AID.aid("b@" + host, ["xmpp://b@" + host])
self.a = MyAgent("a@" + host, "secret")
self.a.start()
self.b = MyAgent("b@" + host, "secret")
#self.b.setDebugToScreen()
#self.a.setDebugToScreen()
self.b.start()
#self.a.wui.start()
def setUp(self):
self.Aaid = spade.AID.aid("a@"+host,["xmpp://a@"+host])
self.Baid = spade.AID.aid("b@"+host,["xmpp://b@"+host])
self.a = MyAgent("a@"+host, "secret",p2p=True)
#self.a._debug = True
self.a.start()
self.b = MyAgent("b@"+host, "secret",p2p=True)
#self.b._debug=True
self.b.start()
#Required constants
PROTOCOL = "http"
INSTANCE = http
if __name__ == "__main__":
http_mtp = http("SpadeHttpSever", None, None)
import time
time.sleep(5)
msg = ACLMessage.ACLMessage()
msg.setPerformative("inform")
msg.addReceiver(AID.aid("b@localhost", ["xmpp://b@localhost", "http://localhost:2099/b"]))
msg.setContent("testACLMessage")
msg.setLanguage(ACLMessage.FIPA_ACL_REP_XML)
env = Envelope.Envelope()
env.addTo(AID.aid("b@localhost", ["xmpp://b@localhost", "http://localhost:2099/b"]))
env.setFrom(AID.aid("a@localhost", ["xmpp://a@localhost", "http://localhost:2099/a"]))
env.setAclRepresentation(ACLMessage.FIPA_ACL_REP_XML)
env.setPayloadEncoding("US-ASCII")
env.setDate("20121105T134259368626")
http_mtp.send(env, msg)
"""
TODO
- send must return an error code when fails
"""