Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def user(self, username):
""" Returns the :class:`~myplex.MyPlexUser` that matches the email or username specified.
Parameters:
username (str): Username, email or id of the user to return.
"""
for user in self.users():
# Home users don't have email, username etc.
if username.lower() == user.title.lower():
return user
elif (user.username and user.email and user.id and username.lower() in
(user.username.lower(), user.email.lower(), str(user.id))):
return user
raise NotFound('Unable to find user %s' % username)
title (str): Title of the episode to return
season (int): Season number (default:None; required if title not specified).
episode (int): Episode number (default:None; required if title not specified).
Raises:
:class:`plexapi.exceptions.BadRequest`: If season and episode is missing.
:class:`plexapi.exceptions.NotFound`: If the episode is missing.
"""
if title:
key = '/library/metadata/%s/allLeaves' % self.ratingKey
return self.fetchItem(key, title__iexact=title)
elif season is not None and episode:
results = [i for i in self.episodes() if i.seasonNumber == season and i.index == episode]
if results:
return results[0]
raise NotFound('Couldnt find %s S%s E%s' % (self.title, season, episode))
raise BadRequest('Missing argument: title or season and episode are required')
def user(self, username):
""" Returns the :class:`~plexapi.myplex.MyPlexUser` that matches the email or username specified.
Parameters:
username (str): Username, email or id of the user to return.
"""
for user in self.users():
# Home users don't have email, username etc.
if username.lower() == user.title.lower():
return user
elif (user.username and user.email and user.id and username.lower() in
(user.username.lower(), user.email.lower(), str(user.id))):
return user
raise NotFound('Unable to find user %s' % username)
def _get_library_series(self, series):
try:
return self._section.get(series.name)
except NotFound:
results = self._search_series(series.name)
if len(results) == 1:
return results[0]
return None
def server(self, name):
""" Returns the :class:`~plexapi.myplex.MyPlexServerShare` that matches the name specified.
Parameters:
name (str): Name of the server to return.
"""
for server in self.servers:
if name.lower() == server.name.lower():
return server
raise NotFound('Unable to find server %s' % name)
def _chooseConnection(ctype, name, results):
""" Chooses the first (best) connection from the given _connect results. """
# At this point we have a list of result tuples containing (url, token, PlexServer, runtime)
# or (url, token, None, runtime) in the case a connection could not be established.
for url, token, result, runtime in results:
okerr = 'OK' if result else 'ERR'
log.info('%s connection %s (%ss): %s?X-Plex-Token=%s', ctype, okerr, runtime, url, token)
results = [r[2] for r in results if r and r[2] is not None]
if results:
log.info('Connecting to %s: %s?X-Plex-Token=%s', ctype, results[0]._baseurl, results[0]._token)
return results[0]
raise NotFound('Unable to connect to %s: %s' % (ctype.lower(), name))
def photo(self, title):
""" Returns the :class:`~plexapi.photo.Photo` that matches the specified title. """
for photo in self.photos():
if photo.title.lower() == title.lower():
return photo
raise NotFound('Unable to find photo: %s' % title)
is_importing = (
self.context["source"] # pylint: disable=no-member
== config_entries.SOURCE_IMPORT
)
plex_server = PlexServer(self.hass, server_config)
try:
await self.hass.async_add_executor_job(plex_server.connect)
except NoServersFound:
_LOGGER.error("No servers linked to Plex account")
errors["base"] = "no_servers"
except (plexapi.exceptions.BadRequest, plexapi.exceptions.Unauthorized):
_LOGGER.error("Invalid credentials provided, config not created")
errors["base"] = "faulty_credentials"
except (plexapi.exceptions.NotFound, requests.exceptions.ConnectionError):
server_identifier = (
server_config.get(CONF_URL) or plex_server.server_choice or "Unknown"
)
_LOGGER.error("Plex server could not be reached: %s", server_identifier)
errors["base"] = "not_found"
except ServerNotSpecified as available_servers:
if is_importing:
_LOGGER.warning(
"Imported configuration has multiple available Plex servers. Specify server in configuration or add a new Integration."
)
return self.async_abort(reason="non-interactive")
self.available_servers = available_servers.args[0]
return await self.async_step_select_server()
except Exception as error: # pylint: disable=broad-except
break
print(u"Created symlinks for {count} new items:".format(count=count))
for item in new_items:
print(u"{title} ({year})".format(title=item.title, year=item.year))
# Check if the new library exists in Plex
print(u"Creating the '{}' library in Plex...".format(
self.recipe['new_library']['name']))
try:
new_library = self.plex.server.library.section(
self.recipe['new_library']['name'])
print(u"Library already exists in Plex. Scanning the library...")
new_library.update()
except plexapi.exceptions.NotFound:
self.plex.create_new_library(
self.recipe['new_library']['name'],
self.recipe['new_library']['folder'],
self.library_type)
new_library = self.plex.server.library.section(
self.recipe['new_library']['name'])
# Wait for metadata to finish downloading before continuing
print(u"Waiting for metadata to finish downloading...")
new_library = self.plex.server.library.section(
self.recipe['new_library']['name'])
while new_library.refreshing:
time.sleep(5)
new_library = self.plex.server.library.section(
self.recipe['new_library']['name'])
def section(self, title=None):
""" Returns the :class:`~plexapi.library.LibrarySection` that matches the specified title.
Parameters:
title (str): Title of the section to return.
"""
for section in self.sections():
if section.title.lower() == title.lower():
return section
raise NotFound('Invalid library section: %s' % title)