Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
minute, hour, day_of_month, month, day_of_week, command = line.split(' ', 5)
crons[command] = {
'minute': try_int(minute),
'hour': try_int(hour),
'month': try_int(month),
'day_of_month': try_int(day_of_month),
'day_of_week': try_int(day_of_week),
'comments': current_comments,
}
current_comments = []
return crons
class Users(FactBase):
'''
Returns a dictionary of users -> details.
.. code:: python
'user_name': {
'home': '/home/user_name',
'shell': '/bin/bash,
'group': 'main_user_group',
'groups': [
'other',
'groups'
]
},
...
'''
class Cpus(FactBase):
'''
Returns the number of CPUs on this server.
'''
command = 'getconf NPROCESSORS_ONLN || getconf _NPROCESSORS_ONLN'
@staticmethod
def process(output):
try:
return int(output[0])
except ValueError:
pass
class Memory(FactBase):
'''
Returns the memory installed in this server, in MB.
'''
command = 'vmstat -s'
@staticmethod
def process(output):
data = {}
for line in output:
value, key = line.split(' ', 1)
try:
value = int(value)
except ValueError:
command = 'cat /etc/group'
default = list
@staticmethod
def process(output):
groups = []
for line in output:
if ':' in line:
groups.append(line.split(':')[0])
return groups
class Crontab(FactBase):
'''
Returns a dictionary of cron command -> execution time.
.. code:: python
'/path/to/command': {
'minute': '*',
'hour': '*',
'month': '*',
'day_of_month': '*',
'day_of_week': '*',
},
...
'''
default = dict
'state': state,
}
if depends != '-':
module['depends'] = [
value
for value in depends.split(',')
if value
]
modules[name] = module
return modules
class LsbRelease(FactBase):
'''
Returns a dictionary of release information using ``lsb_release``.
.. code:: python
{
"id": "Ubuntu",
"description": "Ubuntu 18.04.2 LTS",
"release": "18.04",
"codename": "bionic",
...
}
'''
command = 'lsb_release -ca'
'''
Returns the OS version according to ``uname``.
'''
command = 'uname -r'
class Arch(FactBase):
'''
Returns the system architecture according to ``uname``.
'''
command = 'uname -p'
class Command(FactBase):
'''
Returns the raw output lines of a given command.
'''
@staticmethod
def command(command):
return command
class Which(FactBase):
'''
Returns the path of a given command, if available.
'''
@staticmethod
def command(name):
from pyinfra.api import FactBase
from .util.packaging import parse_packages
BREW_REGEX = r'^([a-zA-Z\-]+)\s([0-9\._+a-z\-]+)'
class BrewPackages(FactBase):
'''
Returns a dict of installed brew packages:
.. code:: python
'package_name': ['version'],
...
'''
command = 'brew list --versions'
deafult = dict
def process(self, output):
return parse_packages(BREW_REGEX, output)
from pyinfra.api import FactBase
from .util.packaging import parse_packages
GEM_REGEX = r'^([a-zA-Z0-9\-\+\_]+)\s\(([0-9\.]+)\)$'
class GemPackages(FactBase):
'''
Returns a dict of installed gem packages:
.. code:: python
'package_name': ['version'],
...
'''
command = 'gem list --local'
default = dict
def process(self, output):
return parse_packages(GEM_REGEX, output)
key = bit
else:
args.append(bit)
if key:
add_args()
if 'extras' in definition:
definition['extras'] = set(definition['extras'])
return definition
class IptablesRules(FactBase):
'''
Returns a list of iptables rules for a specific table:
.. code:: python
{
'chain': 'PREROUTING',
'jump': 'DNAT'
},
...
'''
default = list
def command(self, table='filter'):
return 'iptables-save -t {0}'.format(table)
command = 'cat /etc/apt/sources.list /etc/apt/sources.list.d/*.list || true'
default = list
def process(self, output):
repos = []
for line in output:
repo = parse_apt_repo(line)
if repo:
repos.append(repo)
return repos
class DebPackages(FactBase):
'''
Returns a dict of installed dpkg packages:
.. code:: python
'package_name': ['version'],
...
'''
command = 'dpkg -l'
regex = r'^ii\s+([a-zA-Z0-9\+\-\.]+):?[a-zA-Z0-9]*\s+([a-zA-Z0-9:~\.\-\+]+).+$'
default = dict
def process(self, output):
return parse_packages(self.regex, output)
# pyinfra
# File: pyinfra/facts/packages.py
# Desc: various package management system facts
import re
from pyinfra.api import FactBase
class DebPackages(FactBase):
'''Returns a dict of installed dpkg packages -> version.'''
command = 'dpkg -l'
_regex = r'^[a-z]+\s+([a-zA-Z0-9\+\-\.]+):?[a-zA-Z0-9]*\s+([a-zA-Z0-9:~\.\-\+]+).+$'
def process(self, output):
packages = {}
for line in output:
matches = re.match(self._regex, line)
if matches:
# apt packages are case-insensitive
name = matches.group(1).lower()
packages[name] = matches.group(2)
return packages