Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _exec_page_plus_read(self, page, cmd, data_len, attempts=_READ_ATTEMPTS):
ret = None
for i in range(attempts):
self._wait()
self._write([0xad, 0, data_len + 2, 4, 0x60, CMD.PAGE_PLUS_READ, 2, page, cmd])
ret = self._read()
# in the captured traffic ret[2] == 0xff appears to signal a
# invalid data condition (probably related to the device being
# busy; see PMBus spec); TODO check PEC byte as well
if ret[0] == 0xaa and ret[1] == data_len + 2 and ret[2] == data_len:
break
assert ret, f'invalid response (attempts={attempts})'
return ret[3:(3 + data_len)]
import logging
import time
from liquidctl.driver.usb import UsbHidDriver
from liquidctl.pmbus import CommandCode as CMD
from liquidctl.pmbus import linear_to_float
LOGGER = logging.getLogger(__name__)
_READ_LENGTH = 64
_WRITE_LENGTH = 64
_MIN_DELAY = 0.0025
_READ_ATTEMPTS = 3
_SEASONIC_READ_FIRMWARE_VERSION = CMD.MFR_SPECIFIC_FC
_RAILS = ['+12V #1', '+12V #2', '+12V #3', '+5V', '+3.3V']
class SeasonicEDriver(UsbHidDriver):
"""liquidctl driver for Seasonic E-series PSUs."""
SUPPORTED_DEVICES = [
(0x7793, 0x5911, None, 'NZXT E500 (experimental)', {}),
(0x7793, 0x5912, None, 'NZXT E650 (experimental)', {}),
(0x7793, 0x2500, None, 'NZXT E850 (experimental)', {}),
]
def initialize(self, **kwargs):
"""Initialize the device.
Aparently not required.
def _get_vout(self, rail):
mode = self._exec_page_plus_read(rail, CMD.VOUT_MODE, 1)[0]
assert mode >> 5 == 0 # assume vout_mode is always ulinear16
vout = self._exec_page_plus_read(rail, CMD.READ_VOUT, 2)
return linear_to_float(vout, mode & 0x1f)
status = [
('Current uptime', self._get_timedelta(_CORSAIR_READ_UPTIME), ''),
('Total uptime', self._get_timedelta(_CORSAIR_READ_TOTAL_UPTIME), ''),
('Temperature 1', self._get_float(CMD.READ_TEMPERATURE_1), '°C'),
('Temperature 2', self._get_float(CMD.READ_TEMPERATURE_2), '°C'),
('Fan control mode', self._get_fan_control_mode(), ''),
('Fan speed', self._get_float(CMD.READ_FAN_SPEED_1), 'rpm'),
('Input voltage', self._get_float(CMD.READ_VIN), 'V'),
('Total power', self._get_float(_CORSAIR_READ_INPUT_POWER), 'W'),
('+12V OCP mode', self._get_12v_ocp_mode(), ''),
]
for rail in [_RAIL_12V, _RAIL_5V, _RAIL_3P3V]:
name = _RAIL_NAMES[rail]
self._exec(WriteBit.WRITE, CMD.PAGE, [rail])
status.append((f'{name} output voltage', self._get_float(CMD.READ_VOUT), 'V'))
status.append((f'{name} output current', self._get_float(CMD.READ_IOUT), 'A'))
status.append((f'{name} output power', self._get_float(CMD.READ_POUT), 'W'))
self._exec(WriteBit.WRITE, CMD.PAGE, [0])
self.device.release()
LOGGER.warning('reading the +12V OCP mode is an experimental feature')
return status
import logging
from datetime import timedelta
from enum import Enum
from liquidctl.driver.usb import UsbHidDriver
from liquidctl.pmbus import CommandCode as CMD
from liquidctl.pmbus import WriteBit, linear_to_float
from liquidctl.util import clamp
LOGGER = logging.getLogger(__name__)
_READ_LENGTH = 64
_WRITE_LENGTH = 64
_SLAVE_ADDRESS = 0x02
_CORSAIR_READ_TOTAL_UPTIME = CMD.MFR_SPECIFIC_D1
_CORSAIR_READ_UPTIME = CMD.MFR_SPECIFIC_D2
_CORSAIR_12V_OCP_MODE = CMD.MFR_SPECIFIC_D8
_CORSAIR_READ_INPUT_POWER = CMD.MFR_SPECIFIC_EE
_CORSAIR_FAN_CONTROL_MODE = CMD.MFR_SPECIFIC_F0
_RAIL_12V = 0x0
_RAIL_5V = 0x1
_RAIL_3P3V = 0x2
_RAIL_NAMES = {_RAIL_12V : '+12V', _RAIL_5V : '+5V', _RAIL_3P3V : '+3.3V'}
_MIN_FAN_DUTY = 0
class OCPMode(Enum):
"""Overcurrent protection mode."""
SINGLE_RAIL = 0x1