Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_all_collections(connection: DBusConnection) -> Iterator[Collection]:
"""Returns a generator of all available collections."""
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
for collection_path in service.get_property('Collections'):
yield Collection(connection, collection_path)
def unlock_objects(connection: DBusConnection, paths: List[str]) -> bool:
"""Requests unlocking objects specified in `paths`.
Returns a boolean representing whether the operation was dismissed.
.. versionadded:: 2.1.2"""
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
unlocked_paths, prompt = service.call('Unlock', 'ao', paths)
if len(prompt) > 1:
dismissed, (signature, unlocked) = exec_prompt(connection, prompt)
assert signature == 'ao'
return dismissed
return False
def create_collection(connection: DBusConnection, label: str, alias: str = '',
session: Optional[Session] = None) -> Collection:
"""Creates a new :class:`Collection` with the given `label` and `alias`
and returns it. This action requires prompting.
:raises: :exc:`~secretstorage.exceptions.PromptDismissedException`
if the prompt is dismissed.
"""
if not session:
session = open_session(connection)
properties = {SS_PREFIX + 'Collection.Label': ('s', label)}
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
collection_path, prompt = service.call('CreateCollection', 'a{sv}s',
properties, alias)
if len(collection_path) > 1:
return Collection(connection, collection_path, session=session)
dismissed, result = exec_prompt(connection, prompt)
if dismissed:
raise PromptDismissedException('Prompt dismissed.')
signature, collection_path = result
assert signature == 'o'
return Collection(connection, collection_path, session=session)
def open_session(connection: DBusConnection) -> Session:
"""Returns a new Secret Service session."""
service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection)
session = Session()
try:
output, result = service.call('OpenSession', 'sv',
ALGORITHM_DH,
('ay', int_to_bytes(session.my_public_key)))
except DBusErrorResponse as resp:
if resp.name != DBUS_NOT_SUPPORTED:
raise
output, result = service.call('OpenSession', 'sv',
ALGORITHM_PLAIN,
('s', ''))
session.encrypted = False
else:
signature, value = output
assert signature == 'ay'
key = int_from_bytes(value, 'big')
def get_all_collections(bus):
"""Returns a generator of all available collections."""
service_obj = bus_get_object(bus, SS_PATH)
service_props_iface = dbus.Interface(service_obj,
dbus.PROPERTIES_IFACE)
for collection_path in service_props_iface.Get(SERVICE_IFACE,
'Collections', signature='ss'):
yield Collection(bus, collection_path)
def lock(self):
"""Locks the collection."""
service_obj = bus_get_object(self.bus, SS_PATH)
service_iface = InterfaceWrapper(service_obj, SERVICE_IFACE)
service_iface.Lock([self.collection_path], signature='ao')
def open_session(bus):
"""Returns a new Secret Service session."""
service_obj = bus_get_object(bus, SS_PATH)
service_iface = dbus.Interface(service_obj, SS_PREFIX+'Service')
session = Session()
try:
output, result = service_iface.OpenSession(
ALGORITHM_DH,
dbus.ByteArray(int_to_bytes(session.my_public_key)),
signature='sv'
)
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() != DBUS_NOT_SUPPORTED:
raise
output, result = service_iface.OpenSession(
ALGORITHM_PLAIN,
'',
signature='sv'
)
def unlock_objects(bus, paths, callback=None):
"""Requests unlocking objects specified in `paths`. If `callback`
is specified, calls it when unlocking is complete (see
:func:`exec_prompt` description for details).
Otherwise, uses the loop from GLib API and returns a boolean
representing whether the operation was dismissed.
.. versionadded:: 2.1.2"""
service_obj = bus_get_object(bus, SS_PATH)
service_iface = InterfaceWrapper(service_obj, SERVICE_IFACE)
unlocked_paths, prompt = service_iface.Unlock(paths, signature='ao')
unlocked_paths = list(unlocked_paths) # Convert from dbus.Array
if len(prompt) > 1:
if callback:
exec_prompt(bus, prompt, callback)
else:
return exec_prompt_glib(bus, prompt)[0]
elif callback:
# We still need to call it.
callback(False, unlocked_paths)