Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from devlib.module import Module
from devlib.exception import TargetStableError
from devlib.utils.misc import memoized
class GpufreqModule(Module):
name = 'gpufreq'
path = ''
def __init__(self, target):
super(GpufreqModule, self).__init__(target)
frequencies_str = self.target.read_value("/sys/kernel/gpu/gpu_freq_table")
self.frequencies = list(map(int, frequencies_str.split(" ")))
self.frequencies.sort()
self.governors = self.target.read_value("/sys/kernel/gpu/gpu_available_governor").split(" ")
@staticmethod
def probe(target):
# kgsl/Adreno
probe_path = '/sys/kernel/gpu/'
if target.file_exists(probe_path):
def start(self):
if self.running:
return False
self.target.execute('m5 roistart {}'.format(self.number))
self.running = True
return True
def stop(self):
if not self.running:
return False
self.target.execute('m5 roiend {}'.format(self.number))
self.running = False
return True
class Gem5StatsModule(Module):
'''
Module controlling Region of Interest (ROIs) markers, satistics dump
frequency and parsing statistics log file when using gem5 platforms.
ROIs are identified by user-defined labels and need to be booked prior to
use. The translation of labels into gem5 ROI numbers will be performed
internally in order to avoid conflicts between multiple clients.
'''
name = 'gem5stats'
@staticmethod
def probe(target):
return isinstance(target.platform, Gem5SimulationPlatform)
def __init__(self, target):
super(Gem5StatsModule, self).__init__(target)
def start(self):
with open_serial_connection(timeout=self.timeout,
port=self.port,
baudrate=self.baud) as target:
# pylint: disable=no-member
target.sendline('motor_{}_1'.format(self.fan_pin))
def stop(self):
with open_serial_connection(timeout=self.timeout,
port=self.port,
baudrate=self.baud) as target:
# pylint: disable=no-member
target.sendline('motor_{}_0'.format(self.fan_pin))
class OdroidXU3ctiveCoolingModule(Module):
name = 'odroidxu3-fan'
@staticmethod
def probe(target):
return target.file_exists('/sys/devices/odroid_fan.15/fan_mode')
def start(self):
self.target.write_value('/sys/devices/odroid_fan.15/fan_mode', 0, verify=False)
self.target.write_value('/sys/devices/odroid_fan.15/pwm_duty', 255, verify=False)
def stop(self):
self.target.write_value('/sys/devices/odroid_fan.15/fan_mode', 0, verify=False)
self.target.write_value('/sys/devices/odroid_fan.15/pwm_duty', 1, verify=False)
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from devlib.module import Module
from devlib.exception import TargetStableError
from devlib.utils.misc import memoized
class DevfreqModule(Module):
name = 'devfreq'
@staticmethod
def probe(target):
path = '/sys/class/devfreq/'
if not target.file_exists(path):
return False
# Check that at least one policy is implemented
if not target.list_directory(path):
return False
return True
@memoized
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from devlib.module import Module
class HotplugModule(Module):
name = 'hotplug'
base_path = '/sys/devices/system/cpu'
@classmethod
def probe(cls, target): # pylint: disable=arguments-differ
# If a system has just 1 CPU, it makes not sense to hotplug it.
# If a system has more than 1 CPU, CPU0 could be configured to be not
# hotpluggable. Thus, check for hotplug support by looking at CPU1
path = cls._cpu_path(target, 1)
return target.file_exists(path) and target.is_rooted
@classmethod
def _cpu_path(cls, target, cpu):
if isinstance(cpu, int):
cpu = 'cpu{}'.format(cpu)
# Make sure some data is there
for cpu in cpus:
if target.file_exists(target.path.join(path, cpu, "domain0", "flags")):
return True
return False
def __init__(self, target, path=None):
if path is None:
path = self.sched_domain_root
procfs = target.read_tree_values(path, depth=self._read_depth)
super(SchedProcFSData, self).__init__(procfs)
class SchedModule(Module):
name = 'sched'
cpu_sysfs_root = '/sys/devices/system/cpu'
@staticmethod
def probe(target):
logger = logging.getLogger(SchedModule.name)
SchedDomainFlag.check_version(target, logger)
# It makes sense to load this module if at least one of those
# functionalities is enabled
schedproc = SchedProcFSData.available(target)
debug = SchedModule.target_has_debug(target)
dmips = any([target.file_exists(SchedModule.cpu_dmips_capacity_path(target, cpu))
for cpu in target.list_online_cpus()])
return (self.name == other.name) and (self.desc == other.desc)
elif isinstance(other, basestring):
return (self.name == other) or (self.desc == other)
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return 'CpuidleState({}, {})'.format(self.name, self.desc)
__repr__ = __str__
class Cpuidle(Module):
name = 'cpuidle'
root_path = '/sys/devices/system/cpu/cpuidle'
@staticmethod
def probe(target):
return target.file_exists(Cpuidle.root_path)
def __init__(self, target):
super(Cpuidle, self).__init__(target)
basepath = '/sys/devices/system/cpu/'
values_tree = self.target.read_tree_values(basepath, depth=4, check_exit_code=False)
self._states = {
cpu_name: sorted(
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from devlib.module import Module
class BigLittleModule(Module):
name = 'bl'
@staticmethod
def probe(target):
return target.big_core is not None
@property
def bigs(self):
return [i for i, c in enumerate(self.target.platform.core_names)
if c == self.target.platform.big_core]
@property
def littles(self):
return [i for i, c in enumerate(self.target.platform.core_names)
if c == self.target.platform.little_core]
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from devlib.module import Module
from devlib.utils.serial_port import open_serial_connection
class MbedFanActiveCoolingModule(Module):
name = 'mbed-fan'
timeout = 30
@staticmethod
def probe(target):
return True
def __init__(self, target, port='/dev/ttyACM0', baud=115200, fan_pin=0):
super(MbedFanActiveCoolingModule, self).__init__(target)
self.port = port
self.baud = baud
self.fan_pin = fan_pin
def start(self):
with open_serial_connection(timeout=self.timeout,
self.logger.debug('Tasks: %s', task_ids)
return list(map(int, task_ids))
def add_task(self, tid):
self.target.write_value(self.tasks_file, tid, verify=False)
def add_tasks(self, tasks):
for tid in tasks:
self.add_task(tid)
def add_proc(self, pid):
self.target.write_value(self.procs_file, pid, verify=False)
CgroupSubsystemEntry = namedtuple('CgroupSubsystemEntry', 'name hierarchy num_cgroups enabled')
class CgroupsModule(Module):
name = 'cgroups'
stage = 'setup'
@staticmethod
def probe(target):
if not target.is_rooted:
return False
if target.file_exists('/proc/cgroups'):
return True
return target.config.has('cgroups')
def __init__(self, target):
super(CgroupsModule, self).__init__(target)
self.logger = logging.getLogger('CGroups')