Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
statuses["fail"].append(ruleid)
elif result:
if rule.startswith("IGNORE"):
if not rulecategory.lower().startswith("comment"):
statuses["ignore"].append(ruleid)
print("IGNORE: %s ID %s %s" % (rulecategory, ruleid, rule), file=sys.stderr)
else:
statuses["pass"].append(ruleid)
print("PASS: %s ID %s %s" % (rulecategory, ruleid, rule), file=sys.stderr)
else:
print("FAIL: %s ID %s %s" % (rulecategory, ruleid, rule), file=sys.stderr)
statuses["fail"].append(ruleid)
return statuses
@BestPractices.register("proc_sys")
@SystemNode.add_json_processor
class BestPracticesCMA(BestPractices):
"Security Best Practices which are evaluated against various discovery modules"
application = "os"
discovery_name = "JSON_proc_sys"
def __init__(self, config, packetio, store, log, debug):
BestPractices.__init__(self, config, packetio, store, log, debug)
def fetch_rules(self, drone, _unusedsrcaddr, discovertype):
"""Evaluate our rules given the current/changed data.
Note that fetch_rules is separate from rule evaluation to
simplify testing.
In our case, we ask our Drone to provide us with the merged rule
sets for the current kind of incoming packet.
"""
def decorator(cls):
"""Register our class with the packet types given to 'register' above.
Return value: Class that we registered.
"""
for pkttype in pkttypes:
BestPractices.register_sensitivity(cls, pkttype)
return cls
"net.ipv6.conf.all.accept_redirects": 1,
"net.ipv6.conf.all.accept_source_route": 0
}}"""
rulefile = None
dummydrone = DummyDrone()
for dirname in (".", "..", "../..", "../../.."):
rulefile = "%s/best_practices/proc_sys.json" % dirname
if os.access(rulefile, os.R_OK):
break
with open(rulefile, "r") as procsys_file:
testrules = pyConfigContext(procsys_file.read())
testjsonobj = pyConfigContext(JSON_data)
logger = logging.getLogger("BestPracticesTest")
logger.addHandler(logging.StreamHandler(sys.stderr))
testconfig = {"allbpdiscoverytypes": ["login_defs", "pam", "proc_sys", "sshd"]}
bpobj = BestPractices(testconfig, None, None, logger, False)
for procsys in BestPractices.eval_classes["proc_sys"]:
ourstats = procsys.evaluate("testdrone", None, testjsonobj, testrules, "proc_sys")
size = sum([len(ourstats[st]) for st in ourstats.keys() if st != "score"])
# print size, len(testrules)
assert size == len(testrules) - 1 # One rule is an IGNOREd comment
assert ourstats["fail"] == ["itbp-00001", "nist_V-38526", "nist_V-38601"]
assert len(ourstats["NA"]) >= 13
assert len(ourstats["pass"]) >= 3
assert len(ourstats["ignore"]) == 0
score, tstdiffs = bpobj.compute_score_updates(
testjsonobj, dummydrone, testrules, ourstats, {}
)
assert str(pyConfigContext(score)) == '{"networking":1.0,"security":4.0}'
# pylint: disable=E1101
assert dummydrone.bp_category_networking_score == 1.0 # should be OK for integer values
assert dummydrone.bp_category_security_score == 4.0 # should be OK for integer values
oldstats = pyConfigContext(
{"pass": [], "fail": [], "ignore": [], "NA": [], "score": 0.0}
)
for stat in ("pass", "fail", "ignore", "NA"):
logmethod = self.log.info if stat == "pass" else self.log.warning
for ruleid in results[stat]:
oldstat = None
for statold in ("pass", "fail", "ignore", "NA"):
if ruleid in oldstats[statold]:
oldstat = statold
break
if oldstat == stat or stat == "NA":
# No change
continue
url = self.url(drone, ruleid, rulesobj[ruleid])
BestPractices.send_rule_event(oldstat, stat, drone, ruleid, rulesobj, url)
thisrule = rulesobj[ruleid]
rulecategory = thisrule["category"]
logmethod(
"%s %sED %s rule %s: %s [%s]"
% (drone, stat.upper(), rulecategory, ruleid, url, thisrule["rule"])
)
self.compute_score_updates(discoveryobj, drone, rulesobj, results, oldstats)
setattr(drone, status_name, str(results))
if rule.startswith("IGNORE"):
if not rulecategory.lower().startswith("comment"):
statuses["ignore"].append(ruleid)
print("IGNORE: %s ID %s %s" % (rulecategory, ruleid, rule), file=sys.stderr)
else:
statuses["pass"].append(ruleid)
print("PASS: %s ID %s %s" % (rulecategory, ruleid, rule), file=sys.stderr)
else:
print("FAIL: %s ID %s %s" % (rulecategory, ruleid, rule), file=sys.stderr)
statuses["fail"].append(ruleid)
return statuses
@BestPractices.register("proc_sys")
@SystemNode.add_json_processor
class BestPracticesCMA(BestPractices):
"Security Best Practices which are evaluated against various discovery modules"
application = "os"
discovery_name = "JSON_proc_sys"
def __init__(self, config, packetio, store, log, debug):
BestPractices.__init__(self, config, packetio, store, log, debug)
def fetch_rules(self, drone, _unusedsrcaddr, discovertype):
"""Evaluate our rules given the current/changed data.
Note that fetch_rules is separate from rule evaluation to
simplify testing.
In our case, we ask our Drone to provide us with the merged rule
sets for the current kind of incoming packet.
"""
return drone.get_merged_bp_rules(discovertype)
but the reverse cannot be true.
It's perfectly normal for a rule set to not contain all the rules that
a basis rule set specifies, which means they aren't overridden.
It's also perfectly OK for a dependent rule set to have rules not
present in the basis rule set.
"""
store.load_or_create(BPRuleSet, rulesetname=rulesetname, basisrules=basedon)
files = sorted(os.listdir(directoryname))
for filename in files:
if filename.startswith("."):
continue
path = os.path.join(directoryname, filename)
classname = filename.replace(".json", "")
yield BestPractices.load_from_file(store, path, classname, rulesetname, basedon)
def _processpkt_by_type(self, drone, srcaddr, evaltype, jsonobj):
"""process a discovery object against its set of rules"""
# print >> sys.stderr, 'IN PROCESSPKT_BY_TYPE for %s: %s %s' % \
# (drone, evaltype, BestPractices.eval_objects[evaltype])
for rule_obj in BestPractices.eval_objects[evaltype]:
# print >> sys.stderr, 'Fetching %s rules for %s' % (evaltype, drone)
rulesobj = rule_obj.fetch_rules(drone, srcaddr, evaltype)
# print >> sys.stderr, 'RULES ARE:', rulesobj
statuses = pyConfigContext(
rule_obj.evaluate(drone, srcaddr, jsonobj, rulesobj, evaltype)
)
# print >> sys.stderr, 'RESULTS ARE:', statuses
self.log_rule_results(statuses, drone, srcaddr, jsonobj, evaltype, rulesobj)
def __init__(self, config, packetio, store, log, debug):
BestPractices.__init__(self, config, packetio, store, log, debug)
def __init__(self, config, packetio, store=None, log=None, debug=False):
"""Initialize our BestPractices object"""
DiscoveryListener.__init__(self, config, packetio, store, log, debug)
if self.__class__ != BestPractices:
return
for pkttype in config["allbpdiscoverytypes"]:
BestPractices.register_sensitivity(BestPracticesCMA, pkttype)
for pkttype in BestPractices.eval_classes:
if pkttype not in BestPractices.eval_objects:
BestPractices.eval_objects[pkttype] = []
if pkttype not in BestPractices.evaled_classes:
BestPractices.evaled_classes[pkttype] = {}
for bpcls in BestPractices.eval_classes[pkttype]:
if bpcls not in BestPractices.evaled_classes[pkttype]:
BestPractices.eval_objects[pkttype].append(
bpcls(config, packetio, store, log, debug)
)
BestPractices.evaled_classes[pkttype][bpcls] = True