Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
from .basicaction import BasicAction
class GenericAction(BasicAction):
"""Action to execute a custom function"""
name = "GenericAction"
def __init__(self, func):
super().__init__()
self.func = func
def perform(self, paste, analyzer_name=None, matches=None):
self.func(paste)
# -*- coding: utf-8 -*-
import logging
import re
from pastepwn.util import Request, TemplatingEngine
from .basicaction import BasicAction
class TelegramAction(BasicAction):
"""Action to send a Telegram message to a certain user or group/channel"""
name = "TelegramAction"
def __init__(self, token, receiver, template=None):
"""
Action to send a Telegram message to a certain user or group/channel
:param token: The Telegram API token for your bot obtained by @BotFather
:param receiver: The userID/groupID or channelID of the receiving entity
:param template: A template string describing how the paste variables should be filled in
"""
super().__init__()
self.logger = logging.getLogger(__name__)
if not re.match(r"[0-9]+:[a-zA-Z0-9\-_]+", token) or token is None:
raise ValueError("Bot token not correct or None!")
# -*- coding: utf-8 -*-
from .basicaction import BasicAction
class DatabaseAction(BasicAction):
"""Action to save a paste to a database"""
name = "DatabaseAction"
def __init__(self, database):
super().__init__()
self.database = database
def perform(self, paste, analyzer_name=None):
self.database.store(paste)
# -*- coding: utf-8 -*-
import json
import logging
import time
from pastepwn.util import Request
from .basicaction import BasicAction
class MISPAction(BasicAction):
"""
Action to add an event to a MISP instance on a matched paste
Documentation for adding events:
https://www.circl.lu/doc/misp/automation/#post-events
The MISPAction objects can take a `transformer` function as a constructor parameter.
This function (by default MISPAction.default_transformer) should take a Paste and an
optional analyzer name as parameters (just like BasicAction.perform), and return a
dictionary representing a MISP event, which will then be sent to the API.
Additional attributes can be sent with each event, specified by the `attributes`
parameter. Here is the documentation regarding types and categories:
https://www.circl.lu/doc/misp/categories-and-types/
"""
name = "MISPAction"
# -*- coding: utf-8 -*-
import os
from pastepwn.util import TemplatingEngine
from .basicaction import BasicAction
class SaveFileAction(BasicAction):
"""Action to save each paste as a file named '.txt'"""
name = "SaveFileAction"
def __init__(self, path, file_ending=".txt", template=None):
"""
Action to save each paste as a file named '.txt'
If you want to store metadata within the file, use template strings
> https://github.com/d-Rickyy-b/pastepwn/wiki/Templating-in-actions
:param path: The directory in which the file(s) should be stored
:param template: A template string describing how the paste variables should be filled in
"""
super().__init__()
self.path = path
self.file_ending = file_ending
self.template = template or "${body}"
# -*- coding: utf-8 -*-
from pastepwn.util import Request
from .basicaction import BasicAction
class WebhookAction(BasicAction):
"""Action to perform a Webhook on a matched paste"""
name = "WebhookAction"
def __init__(self, url, post_data=None):
"""
Init method for the WebhookAction
:param url: string, URL to POST against
:param post_data: boolean, to decide wheather a paste should be sent in the body
"""
super().__init__()
self.url = url
self.post_data = post_data
def perform(self, paste, analyzer_name=None, matches=None):
"""
Trigger the webhook
# -*- coding: utf-8 -*-
import logging
from .basicaction import BasicAction
class LogAction(BasicAction):
"""Action to log a paste to console"""
name = "LogAction"
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
def perform(self, paste, analyzer_name=None, matches=None):
self.logger.debug("New Paste matched: {0}".format(paste))
# -*- coding: utf-8 -*-
import asyncio
import json
import logging
import sys
from string import Template
from pastepwn.util import Request, DictWrapper
from .basicaction import BasicAction
class DiscordAction(BasicAction):
"""Action to send a Discord message to a certain webhook or channel."""
name = "DiscordAction"
def __init__(self, webhook=None, token=None, channel_id=None, template=None):
super().__init__()
self.logger = logging.getLogger(__name__)
self.bot_available = True
try:
import websockets
except ImportError:
self.logger.warning("Could not import 'websockets' module. So you can only use webhooks for discord.")
self.bot_available = False
self.webhook = webhook
if webhook is None:
# -*- coding: utf-8 -*-
import logging.handlers
from pastepwn.util import TemplatingEngine
from .basicaction import BasicAction
class SyslogAction(BasicAction):
"""Action to log a paste to the syslog"""
name = "SyslogAction"
def __init__(self, syslog_address="/dev/log", template=None):
"""
This sets up a syslogger, which defaults to /dev/log.
That means that it will work on most linux systems,
but will not work on for example OSX.
For OSX, the correct syslog address would be
/var/run/syslog
"""
super().__init__()
self.template = template
self.logger = logging.getLogger('SyslogLogger')
self.logger.setLevel(logging.DEBUG)
# -*- coding: utf-8 -*-
import socket
from pastepwn.util import TemplatingEngine
from .basicaction import BasicAction
class IrcAction(BasicAction):
"""Action to send an irc message to a certain channel"""
name = "IrcAction"
irc = socket.socket()
def __init__(
self=None,
server=None,
channel=None,
port=6667,
nick="pastepwn",
template=None
):
super().__init__()
self.ircsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server = server