Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def auto_delete():
print('\n%s Starting plex-autodelete script..' % datestr())
plex = PlexServer()
for section in plex.library.sections():
if section.type in ('show',):
deleted = 0
for tag, keep in TAGS.items():
func = keep_season if keep == 'season' else keep_episodes
for show in section.search(collection=tag):
deleted += func(show, keep)
if deleted:
section.update()
def notify_plex():
global config
if 'plex_server' in config and config['plex_server'] is not None and not dry_run:
plex_server = config['plex_server']
try:
from plexapi.server import PlexServer
plex = PlexServer('http://{}'.format(plex_server))
plex.library.update()
# plex.library.section(PLEX_DEFAULT_REFRESH_LIBRARY).update()
except ModuleNotFoundError as ex:
print(
'Library not installed. To use Plex notifications please install the Python 3 Plex API ' +
'("pip3 install plexapi")')
except Exception as ex2:
print(f'Unable to connect to Plex server at {plex_server}')
def _test_servers(servers):
items, seen = [], set()
print('Finding Plex clients..')
listargs = [[PlexServer, s, t, 5] for s,t in servers.items()]
results = utils.threaded(_connect, listargs)
for url, token, plex, runtime in results:
clients = plex.clients() if plex else []
if plex and clients:
for c in plex.clients():
if c._baseurl not in seen:
extras = [plex.friendlyName] + c.protocolCapabilities
items.append(FORMAT % ('Client', '--', c.product, c.title, token, c._baseurl, ','.join(extras)))
seen.add(c._baseurl)
return items
if not PLEX_TOKEN:
PLEX_TOKEN = CONFIG.data['auth'].get('server_token', '')
sess = requests.Session()
# Ignore verifying the SSL certificate
sess.verify = False # '/path/to/certfile'
# If verify is set to a path to a directory,
# the directory must have been processed using the c_rehash utility supplied
# with OpenSSL.
if sess.verify is False:
# Disable the warning that the request is insecure, we know that...
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
plex = PlexServer(PLEX_URL, PLEX_TOKEN, session=sess)
sections_lst = [x.title for x in plex.library.sections()]
ratings_lst = ['G', 'PG', 'PG-13', 'R', 'NC-17', 'TV-G', 'TV-Y', 'TV-Y7', 'TV-14', 'TV-PG', 'TV-MA']
def invite(user, sections, allowSync, camera, channels, filterMovies, filterTelevision, filterMusic):
plex.myPlexAccount().inviteFriend(user=user, server=plex, sections=sections, allowSync=allowSync,
allowCameraUpload=camera, allowChannels=channels, filterMovies=filterMovies,
filterTelevision=filterTelevision, filterMusic=filterMusic)
print('Invited {user} to share libraries: \n{sections}'.format(sections=sections, user=user))
if allowSync is True:
print('Sync: Enabled')
if camera is True:
print('Camera Upload: Enabled')
if channels is True:
print('Plugins: Enabled')
# Defining libraries
libraries = exclusions(opts.allLibraries, opts.libraries, sections_dict)
# Defining selected playlists
selected_playlists = exclusions(opts.allPlaylists, opts.playlists, playlist_lst)
# Create user server objects
if users:
for user in users:
# todo-me smart playlists will have to recreated in users server instance
if opts.action == 'share' and selected_playlists:
logger.info("Sharing playlist(s)...")
share_playlists(selected_playlists, users)
user_acct = account.user(user)
user_server = PlexServer(PLEX_URL, user_acct.get_token(plex.machineIdentifier))
all_playlists = [pl for pl in user_server.playlists()]
user_selected = exclusions(opts.allPlaylists, opts.playlists, all_playlists)
playlist_dict['data'].append({
'server': user_server,
'user': user,
'user_selected': user_selected,
'all_playlists': all_playlists})
if opts.self or not users:
playlist_dict['data'].append({
'server': plex,
'user': 'admin',
'user_selected': selected_playlists,
'all_playlists': playlist_lst})
if not opts.jbop and opts.action == 'show':
def _do_search():
from plexapi.server import PlexServer
plex = PlexServer(self._server, None)
matches = plex.search(search_term)
if matches:
match = matches[0]
self.searches[match.title] = match.unwatched()
else:
_LOGGER.warning('No matches for %s', search_term)
def list_remover(plex, shared_users, playlist_name):
# update my list
print("{}: removing playlist for script user".format(playlist_name))
remove_playlist(plex, playlist_name)
# update list for shared users
if SYNC_WITH_SHARED_USERS:
for user in shared_users:
print("{0}: removing playlist for user {1}".format(
playlist_name,
user
))
user_token = shared_users[user]
user_plex = PlexServer(baseurl=PLEX_URL, token=user_token, timeout=PLEX_TIMEOUT)
remove_playlist(user_plex, playlist_name)
else:
print("Skipping removal from shared users")
import requests
from plexapi.server import PlexServer
## EDIT THESE SETTINGS ##
PLEX_TOKEN = 'xxxxx'
PLEX_URL = 'http://localhost:32400'
MESSAGE = "You are not allowed to stream above 4 Mbps."
ignore_lst = ('')
##/EDIT THESE SETTINGS ##
sess = requests.Session()
sess.verify = False
plex = PlexServer(PLEX_URL, PLEX_TOKEN, session=sess)
def kill_session():
for session in plex.sessions():
bitrate = session.media[0].parts[0].streams[0].bitrate
user = session.usernames[0]
if user not in ignore_lst and int(bitrate) > 4000:
title = (session.grandparentTitle + ' - ' if session.type == 'episode' else '') + session.title
print('{user} is watching {title} and they might be asleep.'.format(user=user, title=title))
session.stop(reason=MESSAGE)
kill_session()
def _connect_with_url():
session = None
if self._url.startswith("https") and not self._verify_ssl:
session = Session()
session.verify = False
self._plex_server = plexapi.server.PlexServer(
self._url, self._token, session
)