Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
parser = build_cli_parser("Query processes")
parser.add_argument("-p", type=str, help="process guid", default=None)
parser.add_argument("-q", type=str, help="query string", default=None)
parser.add_argument("-s", type=bool, help="silent mode", default=False)
parser.add_argument("-n", type=int, help="only output N events", default=None)
parser.add_argument("-f", type=str, help="output file name", default=None)
parser.add_argument("-of", type=str, help="output file format: csv or json", default="json")
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
if not args.p and not args.q:
print("Error: Missing Process GUID to search for events with")
sys.exit(1)
if args.q:
processes = cb.select(Process).where(args.q)
else:
processes = cb.select(Process).where(process_guid=args.p)
if args.n:
processes = [p for p in processes[0:args.n]]
if not args.s:
for process in processes:
print("Process: {}".format(process.process_name))
def main():
parser = build_cli_parser("Query processes")
parser.add_argument("-p", type=str, help="process guid", default=None)
parser.add_argument("-f", type=str, help="output file name", default=None)
parser.add_argument("-of", type=str, help="output file format: csv or json", default="json")
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
if not args.p:
print("Error: Missing Process GUID to query the process tree with")
sys.exit(1)
tree = cb.select(Process).where(process_guid=args.p)[0].tree()
for idx, child in enumerate(tree.children):
print("Child #{}".format(idx))
print("\tName: {}".format(child.process_name))
print("\tNumber of children: {}".format(len(child.children)))
if args.f is not None:
if args.of == "json":
with open(args.f, 'w') as outfile:
for idx, child in enumerate(tree.children):
def main():
parser = build_cli_parser("Query processes")
parser.add_argument("-p", type=str, help="process guid", default=None)
parser.add_argument("-s", type=bool, help="silent mode", default=False)
parser.add_argument("-n", type=int, help="only output N events", default=None)
parser.add_argument("-f", type=str, help="output file name", default=None)
parser.add_argument("-of", type=str, help="output file format: csv or json", default="json")
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
if not args.p:
print("Error: Missing Process GUID to search for events with")
sys.exit(1)
events = cb.select(Event).where(process_guid=args.p)
if args.n:
events = events[0:args.n]
if not args.s:
for event in events:
print("Event type: {}".format(event.event_type))
print("\tEvent GUID: {}".format(event.event_guid))
print("\tEvent Timestamp: {}".format(event.event_timestamp))
parser.add_argument("--summary", type=str, help="Feed summary", required=True)
parser.add_argument("--category", type=str, help="Feed category", required=True)
parser.add_argument("--source_label", type=str, help="Feed source label", required=True)
parser.add_argument("--access", type=str, help="Feed access scope", default="private")
# Report metadata arguments.
parser.add_argument("--rep_timestamp", type=int, help="Report timestamp", default=int(time.time()))
parser.add_argument("--rep_title", type=str, help="Report title", required=True)
parser.add_argument("--rep_desc", type=str, help="Report description", required=True)
parser.add_argument("--rep_severity", type=int, help="Report severity", default=1)
parser.add_argument("--rep_link", type=str, help="Report link")
parser.add_argument("--rep_tags", type=str, help="Report tags, comma separated")
parser.add_argument("--rep_visibility", type=str, help="Report visibility")
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
feed_info = {
"name": args.name,
"owner": args.owner,
"provider_url": args.url,
"summary": args.summary,
"category": args.category,
"access": args.access,
}
rep_tags = []
if args.rep_tags:
rep_tags = args.rep_tags.split(",")
report = {
"timestamp": args.rep_timestamp,
def main():
parser = build_cli_parser("Query processes")
parser.add_argument("-p", type=str, help="process guid", default=None)
parser.add_argument("-n", type=int, help="only output N events", default=None)
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
if not args.p:
print("Error: Missing Process GUID to search for events with")
sys.exit(1)
events = cb.select(Event).where(process_guid=args.p)
if args.n:
events = events[0:args.n]
for event in events:
print("Event type: {}".format(event.event_type))
print("\tEvent GUID: {}".format(event.event_guid))
print("\tEvent Timestamp: {}".format(event.event_timestamp))
def main():
parser = build_cli_parser("Search processes")
parser.add_argument("-q", type=str, help="process query", default="process_name:notepad.exe")
parser.add_argument("-f", help="show full objects", action="store_true", default=False)
parser.add_argument("-n", type=int, help="only output N processes", default=None)
parser.add_argument("-e", help="show events for query results", action="store_true", default=False)
parser.add_argument("-c", help="show children for query results", action="store_true", default=False)
parser.add_argument("-p", help="show parents for query results", action="store_true", default=False)
parser.add_argument("-t", help="show tree for query results", action="store_true", default=False)
parser.add_argument("-S", type=str, help="sory by this field", required=False)
parser.add_argument("-D", help="return results in descending order", action="store_true")
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
processes = cb.select(Process).where(args.q)
direction = "ASC"
if args.D:
direction = "DESC"
if args.S:
processes.sort_by(args.S, direction=direction)
print("Number of processes: {}".format(len(processes)))
if args.n:
processes = processes[0:args.n]
for process in processes:
specifier = delete_report_command.add_mutually_exclusive_group(required=True)
specifier.add_argument("-i", "--id", type=str, help="Feed ID")
specifier.add_argument("-f", "--feedname", type=str, help="Feed Name")
specifier = delete_report_command.add_mutually_exclusive_group(required=True)
specifier.add_argument("-I", "--reportid", type=str, help="Report ID")
specifier.add_argument("-r", "--reportname", type=str, help="Report Name")
replace_report_command = commands.add_parser(
"replace-report", help="Replace a feed's report"
)
specifier = replace_report_command.add_mutually_exclusive_group(required=True)
specifier.add_argument("-i", "--id", type=str, help="Feed ID")
specifier.add_argument("-f", "--feedname", type=str, help="Feed Name")
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
if args.command_name == "list":
return list_feeds(cb, parser, args)
elif args.command_name == "list-iocs":
return list_iocs(cb, parser, args)
elif args.command_name == "export":
return export_feed(cb, parser, args)
elif args.command_name == "import":
return import_feed(cb, parser, args)
elif args.command_name == "delete":
return delete_feed(cb, parser, args)
elif args.command_name == "export-report":
return export_report(cb, parser, args)
elif args.command_name == "import-report":
return import_report(cb, parser, args)
elif args.command_name == "delete-report":
alter_ioc_command.add_argument("-i", "--watchlist_id", type=str, help="Watchlist ID", required=True)
alter_ioc_command.add_argument("-r", "--report_id", type=str, help="Report ID", required=True)
alter_ioc_command.add_argument("-I", "--ioc_id", type=str, help="IOC ID", required=True)
specifier = alter_ioc_command.add_mutually_exclusive_group(required=False)
specifier.add_argument("-d", "--deactivate", action="store_true", help="Deactive alerts for this IOC")
specifier.add_argument("-a", "--activate", action="store_true", help="Activate alerts for this IOC")
export_command = commands.add_parser("export", help="Export a watchlist into an importable format")
specifier = export_command.add_mutually_exclusive_group(required=True)
specifier.add_argument("-i", "--watchlist_id", type=str, help="Watchlist ID")
specifier.add_argument("-w", "--watchlist_name", type=str, help="Watchlist name")
commands.add_parser("import", help="Import a previously exported watchlist")
args = parser.parse_args()
cb = get_cb_threathunter_object(args)
if args.command_name == "list":
return list_watchlists(cb, parser, args)
elif args.command_name == "subscribe":
return subscribe_watchlist(cb, parser, args)
elif args.command_name == "create":
return create_watchlist(cb, parser, args)
elif args.command_name == "delete":
return delete_watchlist(cb, parser, args)
elif args.command_name == "alter-report":
return alter_report(cb, parser, args)
elif args.command_name == "alter-ioc":
return alter_ioc(cb, parser, args)
elif args.command_name == "export":
return export_watchlist(cb, parser, args)
elif args.command_name == "import":