Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _login_glances(self):
"""Login to a Glances server"""
client_version = None
try:
client_version = self.client.init()
except socket.error as err:
# Fallback to SNMP
self.client_mode = 'snmp'
logger.error("Connection to Glances server failed ({} {})".format(err.errno, err.strerror))
fallbackmsg = 'No Glances server found on {}. Trying fallback to SNMP...'.format(self.uri)
if not self.return_to_browser:
print(fallbackmsg)
else:
logger.info(fallbackmsg)
except ProtocolError as err:
# Other errors
msg = "Connection to server {} failed".format(self.uri)
if err.errcode == 401:
msg += " (Bad username/password)"
else:
msg += " ({} {})".format(err.errcode, err.errmsg)
self.log_and_exit(msg)
return False
if self.client_mode == 'glances':
def __init__(self, bind_address, bind_port=61209,
requestHandler=GlancesXMLRPCHandler):
self.bind_address = bind_address
self.bind_port = bind_port
try:
self.address_family = socket.getaddrinfo(bind_address, bind_port)[0][0]
except socket.error as e:
logger.error("Couldn't open socket: {}".format(e))
sys.exit(1)
super(GlancesXMLRPCServer, self).__init__((bind_address, bind_port), requestHandler)
def whitelisted(s,
whitelist=WHITELIST,
substitute=SUBSTITUTE):
return ''.join(c if c in whitelist else substitute for c in s)
for sensor, value in zip(columns, points):
try:
sensor = [whitelisted(name) for name in sensor.split('.')]
tobeexport = [self.topic, self.hostname, name]
tobeexport.extend(sensor)
topic = '/'.join(tobeexport)
self.client.publish(topic, value)
except Exception as e:
logger.error("Can not export stats to MQTT server (%s)" % e)
# Batinfo library (optional; Linux-only)
batinfo_tag = True
try:
import batinfo
except ImportError:
logger.debug("batinfo library not found. Fallback to psutil.")
batinfo_tag = False
# Availability:
# Linux, Windows, FreeBSD (psutil>=5.1.0)
# macOS (psutil>=5.4.2)
psutil_tag = True
try:
psutil.sensors_battery()
except Exception as e:
logger.error("Cannot grab battery status {}.".format(e))
psutil_tag = False
class Plugin(GlancesPlugin):
"""Glances battery capacity plugin.
stats is a list
"""
def __init__(self, args=None, config=None):
"""Init the plugin."""
super(Plugin, self).__init__(args=args,
config=config,
stats_init_value=[])
# Init the sensor class
fieldnames = iterkeys(all_stats[plugin])
csv_header += ('{}_{}'.format(plugin, fieldname)
for fieldname in fieldnames)
# Others lines: stats
csv_data += itervalues(all_stats[plugin])
# Export to CSV
# Manage header
if self.first_line:
if self.old_header is None:
# New file, write the header on top on the CSV file
self.writer.writerow(csv_header)
# File already exist, check if header are compatible
if self.old_header != csv_header:
# Header are differents, log an error and do not write data
logger.error("Cannot append data to existing CSV file. Headers are differents.")
logger.debug("Old header: {}".format(self.old_header))
logger.debug("New header: {}".format(csv_header))
else:
# Header are equals, ready to write data
self.old_header = None
# Only do this once
self.first_line = False
# Manage data
if self.old_header is None:
self.writer.writerow(csv_data)
self.csv_file.flush()
# for example, the file glances_xxx.py
# generate self._plugins_list["xxx"] = ...
name = plugin_script[len(self.header):-3].lower()
try:
# Import the plugin
plugin = __import__(plugin_script[:-3])
# Init and add the plugin to the dictionary
if name in ('help', 'amps', 'ports'):
self._plugins[name] = plugin.Plugin(args=args, config=config)
else:
self._plugins[name] = plugin.Plugin(args=args)
except Exception as e:
# If a plugin can not be log, display a critical message
# on the console but do not crash
logger.critical("Error while initializing the {} plugin ({})".format(name, e))
logger.error(traceback.format_exc())
stats.extend(temperature)
# Get the FAN speed
try:
fan_speed = self.__set_type(self.glancesgrabsensors.get('fan_speed'),
'fan_speed')
except Exception as e:
logger.error("Cannot grab FAN speed (%s)" % e)
else:
# Append FAN speed
stats.extend(fan_speed)
# Update HDDtemp stats
try:
hddtemp = self.__set_type(self.hddtemp_plugin.update(),
'temperature_hdd')
except Exception as e:
logger.error("Cannot grab HDD temperature (%s)" % e)
else:
# Append HDD temperature
stats.extend(hddtemp)
# Update batteries stats
try:
batpercent = self.__set_type(self.batpercent_plugin.update(),
'battery')
except Exception as e:
logger.error("Cannot grab battery percent (%s)" % e)
else:
# Append Batteries %
stats.extend(batpercent)
elif self.input_method == 'snmp':
# Update stats using SNMP
# No standard:
except psutil.NoSuchProcess:
pass
except (psutil.AccessDenied, NotImplementedError):
# NotImplementedError: /proc/${PID}/smaps file doesn't exist
# on kernel < 2.6.14 or CONFIG_MMU kernel configuration option
# is not enabled (see psutil #533/glances #413).
extended['memory_swap'] = None
try:
extended['tcp'] = len(top_process.connections(kind="tcp"))
extended['udp'] = len(top_process.connections(kind="udp"))
except (psutil.AccessDenied, psutil.NoSuchProcess):
# Manage issue1283 (psutil.AccessDenied)
extended['tcp'] = None
extended['udp'] = None
except (psutil.NoSuchProcess, ValueError, AttributeError) as e:
logger.error('Can not grab extended stats ({})'.format(e))
extended['extended_stats'] = False
else:
logger.debug('Grab extended stats for process {}'.format(proc['pid']))
extended['extended_stats'] = True
proc.update(extended)
first = False
# /End of extended stats
# Time since last update (for disk_io rate computation)
proc['time_since_update'] = time_since_update
# Process status (only keep the first char)
proc['status'] = str(proc['status'])[:1].upper()
# Process IO
# procstat['io_counters'] is a list:
if item is None:
return self._json_dumps(s)
if isinstance(s, dict):
try:
return self._json_dumps({item: s[item]})
except KeyError as e:
logger.error("Cannot get item history {} ({})".format(item, e))
return None
elif isinstance(s, list):
try:
# Source:
# http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
return self._json_dumps({item: map(itemgetter(item), s)})
except (KeyError, ValueError) as e:
logger.error("Cannot get item history {} ({})".format(item, e))
return None
else:
return None