Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
latency_var = property(lambda msgs: statistics.pvariance(msgs.bundles))
latency_avg = property(lambda msgs: statistics.mean(msgs.bundles))
def processResults(self):
timeTaken = self.timeTaken
avgMotionTime = sum(self.timeTaken) / len(self.timeTaken)
motionTimeVar = statistics.pvariance(self.timeTaken)
avgEndPosError = sum(self.posEndError) / len(self.posEndError)
maxEndPosError = max(self.posEndError)
avgLineError = sum(self.lineFollowError) / len(self.lineFollowError)
unitLineError = list(map(truediv, self.lineFollowError, self.timeTaken))
lineErrorPerTime = sum(unitLineError) / len(unitLineError)
rotationalError = sum(self.rotationalFollowError) / len(self.rotationalFollowError)
unitRotError = list(map(truediv, self.rotationalFollowError, self.timeTaken))
rotErrorPerTime = sum(unitRotError) / len(unitRotError)
overshoot = self.maxOvershoot
perOvershoot = []
for i in range(0, len(self.maxOvershoot)):
perOvershoot.append(self.maxOvershoot[i] / distances[i % len(self.distances)])
avgAbsOvershoot = sum(overshoot) / len(overshoot)
def probability_metric_cluster(G, members):
"""
Given the members of a cluster and the graph they belong to, finds
the cluster's mean, standard deviation, and variance.
Note: n currently defaults to the number of members in the community.
TODO: testing, to find out whether this is a legitimate normalization.
"""
nMembers = len(members) # figure out if this is a good normalization
# numVertices = G.vcount();
# normalization = nMembers / numVertices
data = [p_in_after_n(G, v, nMembers, members) for v in members]
mean = statistics.mean(data) # could divide mean by normalization
var = statistics.pvariance(data, mu=mean)
return mean, var
def DVOLA(df, n=30, price='Close'):
"""
Daily Volatility
"""
dvola_list = []
for i in range(len(df[price])):
if i + 1 < n:
dvola = float('NaN')
else:
start = i + 1 - n
end = i + 1
pvariance = statistics.pvariance(df[price][start:end])
dvola = math.sqrt(pvariance)
dvola_list.append(dvola)
return dvola_list
def variance(lst):
return statistics.pvariance(lst)
WerteArray = []
if kanal <= 7:
median_value = median_filter(samples[kanal])
if (median_value > 15) and (median_value < 4080):
if (sensorname[kanal] != 'KTYPE'):
Rtheta = messwiderstand[kanal]*((4096.0/median_value) - 1)
try:
Temperatur[kanal] = round(temperatur_sensor(Rtheta, sensortyp[kanal]), 2)
except exceptions.TypeError:
Temperatur[kanal] = None
else:
# AD595 = 10mV/°C
Temperatur[kanal] = median_value * 330 / 4096
else:
Temperatur[kanal] = None
variance = statistics.pvariance(samples[kanal])
if variance > 4:
warnung = 'Channel:{kanal} variance: {variance} in {iterations}, median @ {median_value}!'.format(
kanal=kanal,
variance=variance,
iterations=iterations,
median_value=median_value)
logger.warning(warnung)
logger.debug(u'Channel {}, MCP3128 {}, temperature {}'.format(kanal, kanal, Temperatur[kanal]))
elif kanal <= 9:
if maverick is None:
Temperatur[kanal] = None
logger.debug(u'Channel {}, disabled or not available'.format(kanal))
else:
logger.debug(u'Channel {}, Maverick {}, temperature {}'.format(kanal, kanal - 7, Temperatur[kanal]))
maverick_value = maverick['temperature_' + str(kanal - 7)]
if maverick_value == '':
pvariance = float('NaN')
else:
if start is None:
start = i
end = i + 1
pvariance = statistics.pvariance(df[price][start:end], mu)
pvariance_list.append(pvariance)
i += 1
else:
while i < len(df[price]):
if i + 1 < n:
pvariance = float('NaN')
else:
start = i + 1 - n
end = i + 1
pvariance = statistics.pvariance(df[price][start:end], mu)
pvariance_list.append(pvariance)
i += 1
return pvariance_list
conn.create_function('RADIANS', 1, none_guard(math.radians))
conn.create_function('REPEAT', 2, none_guard(operator.mul))
conn.create_function('REVERSE', 1, none_guard(lambda x: x[::-1]))
conn.create_function('RPAD', 3, _sqlite_rpad)
conn.create_function('SHA1', 1, none_guard(lambda x: hashlib.sha1(x.encode()).hexdigest()))
conn.create_function('SHA224', 1, none_guard(lambda x: hashlib.sha224(x.encode()).hexdigest()))
conn.create_function('SHA256', 1, none_guard(lambda x: hashlib.sha256(x.encode()).hexdigest()))
conn.create_function('SHA384', 1, none_guard(lambda x: hashlib.sha384(x.encode()).hexdigest()))
conn.create_function('SHA512', 1, none_guard(lambda x: hashlib.sha512(x.encode()).hexdigest()))
conn.create_function('SIGN', 1, none_guard(lambda x: (x > 0) - (x < 0)))
conn.create_function('SIN', 1, none_guard(math.sin))
conn.create_function('SQRT', 1, none_guard(math.sqrt))
conn.create_function('TAN', 1, none_guard(math.tan))
conn.create_aggregate('STDDEV_POP', 1, list_aggregate(statistics.pstdev))
conn.create_aggregate('STDDEV_SAMP', 1, list_aggregate(statistics.stdev))
conn.create_aggregate('VAR_POP', 1, list_aggregate(statistics.pvariance))
conn.create_aggregate('VAR_SAMP', 1, list_aggregate(statistics.variance))
conn.execute('PRAGMA foreign_keys = ON')
return conn
file_data = {}
for d in data:
d_plain = [i[0] for i in d['datapoints'] if i[0] is not None]
d_timestamps = [i[1] for i in d['datapoints'] if i[0] is not None]
d_duration = args.to_ts - args.from_ts
d_len = len(d_plain)
if d_len < 5:
logging.warning('Very low number of datapoints returned for %s: %s' % (d['target'], d_len))
if len(d_plain) > 0:
d_min = min(d_plain)
d_max = max(d_plain)
d_mean = statistics.mean(d_plain)
d_median = statistics.median(d_plain)
d_integral = scipy.integrate.simps(d_plain, d_timestamps) / d_duration
d_pstdev = statistics.pstdev(d_plain)
d_pvariance = statistics.pvariance(d_plain)
d_hist = get_hist(d_plain)
else:
d_min = 0
d_max = 0
d_mean = 0
d_median = 0
d_integral = 0
d_pstdev = 0
d_pvariance = 0
d_hist = {(0, 0): 0}
table_row_data = [d_min, d_max, d_mean, d_median, d_integral, d_pstdev, d_pvariance, d_hist, d_duration, d_len]
file_row = [d['target']] + table_row_data
table_row = [d['target']] + reformat_number_list(table_row_data)
table_data.append(table_row)
file_data[d['target']] = {table_header[i]:file_row[i] for i in range(len(table_header))}