Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@handler("test", filter=True)
def on_test(self, event, *args, **kwargs):
self.events_executed.append('test_filter')
return True
@handler("values", priority=1.0)
def _value2(self):
return "bar"
@handler("response_complete")
def _on_response_complete(self, e, value):
response = e.args[0]
request = response.request
if request.sock in self._codecs:
self.fire(
connect(
request.sock,
*request.sock.getpeername()
),
self._wschannel
)
@handler("load")
def onLOAD(self):
"""C.onLOAD()
Load the configuration file.
"""
self.read(self.filename)
self._p = Popen(
command, shell=True, stdout=PIPE, stderr=PIPE,
close_fds=True, preexec_fn=os.setsid
)
self._stdin = None
if self._p.stdin is not None:
self._stdin = File(self._p.stdin, channel="%s.stdin" % channel)
self._stdin.register(self)
self._stdout = File(self._p.stdout, channel="%s.stdout" % channel)
self.addHandler(
handler("eof", channel="%s.stdout" % channel)(self._on_stdout_eof)
)
self.addHandler(
handler("read", channel="%s.stdout" % channel)(
self._on_stdout_read
)
)
self._stdout.register(self)
@handler("read", target="stdin")
def stdin_read(self, data):
self.write(self.dest, data)
@handler("read")
def _on_read(self, data):
self.__protocol.add_buffer(data)
@handler("read")
def on_read(self, sock, data):
"""Read Event Handler
This is fired by the underlying Socket Component when there has been
new data read from the connected client.
..note :: By simply returning, client/server socket components listen
to ValueChagned events (feedback) to determine if a handler
returned some data and fires a subsequent Write event with
the value returned.
"""
return data
@handler("email.header", "email.header.to")
def _lookup_email_sender(self, event, *args, **kwargs):
# event.artifact is a ThreatServiceArtifactDTO
artifact_type = event.artifact['type']
artifact_value = event.artifact['value']
LOG.debug("_lookup_email_sender started for Artifact Type {0} - Artifact Value {1}".format(
artifact_type, artifact_value))
hits = self._query_hibp_api(artifact_value)
yield hits
@handler("write", target="ws")
def _on_write(self, sock, data):
payload = chr(0x00) + data.encode("utf-8") + chr(0xFF)
self.push(Write(sock, payload))