Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
import os
import sys
import time
import types
import unittest
import spade
from spade.kb import *
host = "127.0.0.1"
class MyAgent(spade.Agent.Agent):
def _setup(self):
self.result = None
def sumVec(vec):
r = 0
for i in vec:
r += int(i)
return {"sum": r}
def CreateService(name, owner, inputs=[], outputs=[]):
return spade.DF.Service(name, owner, inputs=inputs, outputs=outputs)
def setUp(self):
self.Aaid = spade.AID.aid("a@" + host, ["xmpp://a@" + host])
self.Baid = spade.AID.aid("b@" + host, ["xmpp://b@" + host])
self.a = spade.Agent.Agent("a@" + host, "secret")
self.a.start()
#self.a.setDebugToScreen()
self.b = spade.Agent.Agent("b@" + host, "secret")
self.b.start()
self.a.setSocialItem('b@' + host)
#self.a._socialnetwork['b@'+host].unsubscribe()
del self.a._socialnetwork['b@' + host]
self.b.setSocialItem('a@' + host)
#self.b._socialnetwork['a@'+host].unsubscribe()
del self.b._socialnetwork['a@' + host]
def setUp(self):
self.Aaid = spade.AID.aid("a@"+host,["xmpp://a@"+host])
self.Baid = spade.AID.aid("b@"+host,["xmpp://b@"+host])
self.a = spade.Agent.Agent("a@"+host, "secret")
self.a.wui.start()
self.a.start()
self.b = spade.Agent.Agent("b@"+host, "secret")
self.b.wui.start()
self.b.start()
self.a.setSocialItem('b@'+host)
self.a._socialnetwork['b@'+host].subscribe()
self.b.setSocialItem('a@'+host)
self.b._socialnetwork['a@'+host].subscribe()
self.a.deleteEvent("ExistsNode")
self.b.deleteEvent("ExistsNode")
self.a.deleteEvent("NENode")
self.b.deleteEvent("NENode")
import os
import sys
import time
import unittest
sys.path.append('../..')
import spade
host = "127.0.0.1"
class MyAgent(spade.Agent.Agent):
def _setup(self):
self.search = None
class SearchBehav(spade.Behaviour.OneShotBehaviour):
def __init__(self, s):
self.s = s
spade.Behaviour.OneShotBehaviour.__init__(self)
def _process(self):
aad = spade.AMS.AmsAgentDescription()
aad.setAID(spade.AID.aid(self.s+"@"+host,["xmpp://"+self.s+"@"+host]))
self.myAgent.search = self.myAgent.searchAgent(aad)
#####################################
# MODIFY EXAMPLE #
#####################################
'''
This file shows a simple agent which just modifies its
own information stored in the AMS agent.
You need to be running a SPADE platform on the same host
'''
import sys,os
sys.path.append('..'+os.sep+'trunk')
sys.path.append('..')
import spade
class MyAgent(spade.Agent.Agent):
class MyBehav(spade.Behaviour.OneShotBehaviour):
def onStart(self):
print "Starting behaviour . . ."
def _process(self):
print "I'm going to modify my data"
aad = spade.AMS.AmsAgentDescription()
#aad.setAID(spade.AID.aid("agent@127.0.0.1",["xmpp://agent@127.0.0.1"]))
aad.ownership = "FREE"
result = self.myAgent.modifyAgent(aad)
if result:
print "Modification OK"
print "I'm going to check the modification"
search = self.myAgent.searchAgent(aad)
print search
# SEARCH AGENT EXAMPLE #
#####################################
'''
This file shows a simple agent which just searches for
an agent. Then it prints the results to the screen.
You need to be running a SPADE platform on the same host
'''
import os
import sys
sys.path.append('..'+os.sep+'trunk')
sys.path.append('..')
import spade
class MyAgent(spade.Agent.Agent):
class MyBehav(spade.Behaviour.OneShotBehaviour):
def onStart(self):
print "Starting behaviour . . ."
def _process(self):
print "I'm going to search for an agent"
aad = spade.AMS.AmsAgentDescription()
search = self.myAgent.searchAgent(aad)
for a in search:
print a.asRDFXML()
def onEnd(self):
print "Ending behaviour . . ."
def _setup(self):
print "MyAgent starting . . ."
'''
This file shows a simple agent which just asks for the
Platform Information (pi) to the AMS agent and prints it
to the debug system.
It uses a OneShot Behaviour
You need to be running a SPADE platform on the same host
'''
import os
import sys
sys.path.append('..'+os.sep+'trunk')
sys.path.append('..')
import spade
class MyAgent(spade.Agent.Agent):
class MyBehav(spade.Behaviour.OneShotBehaviour):
def onStart(self):
self.myAgent.DEBUG("Starting behaviour . . .")
def _process(self):
self.myAgent.DEBUG("Hello World from a OneShot")
pi = self.myAgent.getPlatformInfo()
self.myAgent.DEBUG(str(pi))
def onEnd(self):
self.myAgent.DEBUG("Ending behaviour . . .")
def _setup(self):
self.DEBUG("MyAgent starting . . .")
b = self.MyBehav()
self.addBehaviour(b, None)