Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
requests_cache.install_cache(cache_file_name, allowable_methods=('GET', 'POST'), deny_outbound=use_golden,
ignored_parameters=["active_only"])
if use_golden:
#
# We don't want to connect to a cbserver so using bogus values
#
self.c = CbResponseAPI(url="https://localhost", token="N/A", ssl_verify=False)
else:
self.c = CbResponseAPI()
self.sensor = self.c.select(Sensor, 1)
self.lr_session = self.sensor.lr_session()
def export_sensors(cb, export_file_name, export_fields, query):
print("Starting CbR Sensor Export")
if query:
sensors = list(cb.select(Sensor).where(query).all())
else:
sensors = list(cb.select(Sensor))
with open(export_file_name, "w", encoding="utf8") as csv_file:
csv_writer = csv.writer(csv_file, delimiter=',', lineterminator='\n')
csv_writer.writerow(export_fields)
for sensor in sensors:
try:
row = [getattr(sensor, field) for field in export_fields]
csv_writer.writerow(row)
except Exception as e:
print("Exception {1} caused sensor export to fail for {0}".format(sensor.hostname, str(e)))
traceback.format_exc(0)
print("Export finished, exported {0} sensors to {1}".format(len(sensors), export_file_name))
def process_sensors(cb, query_base=None, update=False, max_threads=None,
debug=False, ignore_hosts=None):
"""Fetch all sensor objects associated with the cb server instance, and
keep basic state as they are processed.
"""
if query_base is not None:
query_result = cb.select(Sensor).where(query_base)
else:
query_result = cb.select(Sensor)
query_result_len = len(query_result)
q = Queue()
# unique_sensors exists because we sometimes see the same sensor ID
# returned multiple times in the paginated query results for
# cb.select(Sensor).
unique_sensors = set()
for sensor in query_result:
if sensor.id in unique_sensors:
continue
else:
unique_sensors.add(sensor.id)
q.put(sensor)
def execute_analysis(self, ipv4):
analysis = self.create_analysis(ipv4)
from cbapi.response.models import Sensor
query = self.cb.select(Sensor)
query = query.where('ip:{}'.format(ipv4.value))
for sensor in query:
analysis.search_results.append(str(sensor))
if sensor.hostname not in analysis.discovered_hostnames:
analysis.discovered_hostnames.append(sensor.hostname)
logging.info("found hostname {} for {}".format(sensor.hostname, ipv4.value))
for hostname in analysis.discovered_hostnames[:self.hostname_limit]:
hostname = analysis.add_observable(F_HOSTNAME, hostname)
return True
def jobrunner(callable, cb, sensor_id):
with cb.select(Sensor, sensor_id).lr_session() as sess:
return callable(sess)
'clock_delta',
'checkin_ip']
writer.writerow(header_row)
query_base = None
if args.group_id:
query_base = 'groupid:{0}'.format(args.group_id)
elif args.hostname:
query_base = 'hostname:{0}'.format(args.hostname)
elif args.ip:
query_base = 'ip:{0}'.format(args.ip)
if query_base is None:
sensors = cb.select(Sensor)
else:
sensors = cb.select(Sensor).where(query_base)
num_sensors = len(sensors)
log_info("Found {0} sensors".format(num_sensors))
counter = 1
for sensor in sensors:
if counter % 10 == 0:
print("{0} of {1}".format(counter, num_sensors))
if len(sensor.resource_status) > 0:
commit_charge = "{0:.2f}".format(float(sensor.resource_status[0]['commit_charge'])/1024/1024)
else:
commit_charge = ''
num_eventlog_mb = "{0:.2f}".format(float(sensor.num_eventlog_bytes)/1024/1024)
num_storefiles_mb = "{0:.2f}".format(float(sensor.num_storefiles_bytes)/1024/1024)
systemvolume_free_size = "{0:.2f}".format(float(sensor.systemvolume_free_size)/1024/1024)