Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_daily_plot(day: date) -> Figure:
"""Return matplotlib plot representing a day's plot."""
start, end = get_open_close(day)
desktops = list_desktops(public_only=True)
profiles = UtilizationProfile.from_hostnames(desktops, start, end).values()
desks_count = len(desktops)
now: Any = datetime.now()
latest = min(end, now)
minute = timedelta(minutes=1)
times = [start + i * minute for i in range((latest - start) // minute + 1)]
if now >= end or now <= start:
now = None
sums = []
for t in times:
instant15 = t + timedelta(seconds=15)
instant45 = t + timedelta(seconds=45)
in_use = sum(1 if profile.in_use(instant15) or profile.in_use(instant45) else 0 for profile in profiles)
sums.append(in_use)
def get_daily_plot(day: date) -> Figure:
"""Return matplotlib plot representing a day's plot."""
start, end = get_open_close(day)
desktops = list_desktops(public_only=True)
profiles = UtilizationProfile.from_hostnames(desktops, start, end).values()
desks_count = len(desktops)
now: Any = datetime.now()
latest = min(end, now)
minute = timedelta(minutes=1)
times = [start + i * minute for i in range((latest - start) // minute + 1)]
if now >= end or now <= start:
now = None
sums = []
for t in times:
instant15 = t + timedelta(seconds=15)
instant45 = t + timedelta(seconds=45)
in_use = sum(1 if profile.in_use(instant15) or profile.in_use(instant45) else 0 for profile in profiles)
sums.append(in_use)
if __name__ == '__main__':
start = datetime(2015, 11, 23)
end = start + timedelta(days=1)
print('Testing naive time to create profiles.')
with timeit():
slow_profiles = {
host + '.ocf.berkeley.edu': UtilizationProfile.from_hostname(host, start, end)
for host in list_desktops()
}
print('Testing optimized time to create profiles.')
with timeit():
fast_profiles = UtilizationProfile.from_hostnames(list_desktops(), start, end)
@contextmanager
def timeit():
start = time.time()
yield
print('Time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
start = datetime(2015, 11, 23)
end = start + timedelta(days=1)
print('Testing naive time to create profiles.')
with timeit():
slow_profiles = {
host + '.ocf.berkeley.edu': UtilizationProfile.from_hostname(host, start, end)
for host in list_desktops()
}
print('Testing optimized time to create profiles.')
with timeit():
fast_profiles = UtilizationProfile.from_hostnames(list_desktops(), start, end)
#!/usr/bin/env python3
"""Add a test refund."""
import getpass
import random
import string
from datetime import datetime
from ocflib.printing.quota import add_refund
from ocflib.printing.quota import get_connection
from ocflib.printing.quota import Refund
if __name__ == '__main__':
user = 'ocfprinting'
password = getpass.getpass('{} password: '.format(user))
with get_connection(user=user, password=password) as c:
add_refund(
c,
Refund(
user=input('user: '),
time=datetime.now(),
pages=int(input('pages: ')),
staffer=getpass.getuser(),
reason=''.join(
random.choice(string.ascii_letters) for _ in range(30)
),
"""Print matching OCF usernames."""
search_term = msg.match.group(1).strip()
keywords = search_term.split()
if len(keywords) > 0:
search = '(&{})'.format(
''.join(
# all keywords must match either uid or cn
'(|(uid=*{keyword}*)(cn=*{keyword}*))'.format(
keyword=alphanum(keyword),
)
for keyword in keywords
),
)
with ldap.ldap_ocf() as c:
c.search(
ldap.OCF_LDAP_PEOPLE,
search,
attributes=('uid', 'cn'),
size_limit=5,
)
if len(c.response) > 0:
msg.respond(
', '.join(
sorted(
'{} ({})'.format(
entry['attributes']['uid'][0],
entry['attributes']['cn'][0],
)
for entry in c.response
email=form.cleaned_data['contact_email'],
encrypted_password=encrypt_password(
form.cleaned_data['password'],
RSA.importKey(CREATE_PUBLIC_KEY),
),
handle_warnings=NewAccountRequest.WARNINGS_WARN,
)
if 'warnings-submit' in request.POST:
req = req._replace(
handle_warnings=NewAccountRequest.WARNINGS_SUBMIT,
)
task = validate_then_create_account.delay(req)
task.wait(timeout=5)
if isinstance(task.result, NewAccountResponse):
if task.result.status == NewAccountResponse.REJECTED:
status = 'has_errors'
form.add_error(None, task.result.errors)
elif task.result.status == NewAccountResponse.FLAGGED:
status = 'has_warnings'
form.add_error(None, task.result.errors)
elif task.result.status == NewAccountResponse.PENDING:
return HttpResponseRedirect(reverse('account_pending'))
else:
raise AssertionError('Unexpected state reached')
else:
# validation was successful, the account is being created now
request.session['approve_task_id'] = task.result
return HttpResponseRedirect(reverse('wait_for_account'))
else:
form = ApproveForm()
encrypted_password=encrypt_password(
form.cleaned_data['password'],
RSA.importKey(CREATE_PUBLIC_KEY),
),
handle_warnings=NewAccountRequest.WARNINGS_WARN,
)
if 'warnings-submit' in request.POST:
req = req._replace(
handle_warnings=NewAccountRequest.WARNINGS_SUBMIT,
)
task = validate_then_create_account.delay(req)
task.wait(timeout=5)
if isinstance(task.result, NewAccountResponse):
if task.result.status == NewAccountResponse.REJECTED:
status = 'has_errors'
form.add_error(None, task.result.errors)
elif task.result.status == NewAccountResponse.FLAGGED:
status = 'has_warnings'
form.add_error(None, task.result.errors)
elif task.result.status == NewAccountResponse.PENDING:
return HttpResponseRedirect(reverse('account_pending'))
else:
raise AssertionError('Unexpected state reached')
else:
# validation was successful, the account is being created now
request.session['approve_task_id'] = task.result
return HttpResponseRedirect(reverse('wait_for_account'))
else:
form = ApproveForm()
with get_session() as session:
session.add(stored_request) # TODO: error handling
session.commit()
except sqlalchemy.exc.IntegrityError:
# If there's an integrity constraint, it's okay -- the
# account was already submitted, so we can still return a
# "pending" response.
pass
else:
dispatch_event(
'ocflib.account_submitted',
request=dict(request.to_dict(), reasons=warnings),
)
return NewAccountResponse(
status=NewAccountResponse.PENDING,
errors=warnings,
)
elif request.handle_warnings == NewAccountRequest.WARNINGS_WARN:
return NewAccountResponse(
status=NewAccountResponse.FLAGGED,
errors=warnings,
)
return create_account.delay(request).id
possible for it to fail (just exceedingly unlikely).
"""
# TODO: docstring is not 100% correct
with get_session() as session:
errors, warnings = validate_request(request, credentials, session)
if errors:
# Fatal errors; cannot be bypassed, even with staff approval
return NewAccountResponse(
status=NewAccountResponse.REJECTED,
errors=(errors + warnings),
)
elif warnings:
# Non-fatal errors; the frontend can choose to create the account
# anyway, submit the account for staff approval, or get a response
# with a list of warnings for further inspection.
if request.handle_warnings == NewAccountRequest.WARNINGS_SUBMIT:
stored_request = StoredNewAccountRequest.from_request(request, str(warnings))
try:
with get_session() as session:
session.add(stored_request) # TODO: error handling
session.commit()
except sqlalchemy.exc.IntegrityError:
# If there's an integrity constraint, it's okay -- the
# account was already submitted, so we can still return a
# "pending" response.
pass
else:
dispatch_event(
'ocflib.account_submitted',
request=dict(request.to_dict(), reasons=warnings),
)