Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def runtests(args):
# Get username and password from environment
username = args.username or CONFIG.get('auth.myplex_username')
password = args.password or CONFIG.get('auth.myplex_password')
resource = args.resource
# Register known tests
for loader, name, ispkg in pkgutil.iter_modules([dirname(abspath(__file__))]):
if name.startswith('test_'):
log(0, 'Registering tests from %s.py' % name)
loader.find_module(name).load_module(name)
# Create Account and Plex objects
log(0, 'Logging into MyPlex as %s' % username)
account = MyPlexAccount.signin(username, password)
log(0, 'Signed into MyPlex as %s (%s)' % (account.username, account.email))
if resource:
plex = account.resource(resource).connect()
log(0, 'Connected to PlexServer resource %s' % plex.friendlyName)
else:
plex = PlexServer(args.baseurl, args.token)
log(0, 'Connected to PlexServer %s' % plex.friendlyName)
log(0, '')
# Run all specified tests
tests = {'passed':0, 'failed':0}
for test in itertests(args.query):
starttime = time.time()
log(0, test['name'], 'green')
try:
test['func'](account, plex)
runtime = time.time() - starttime
except Exception:
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXUN\'')
PLEXUN = cur.fetchone()
PLEXUN = str(PLEXUN[0]).strip()
try:
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXPW\'')
PLEXPW = cur.fetchone()
PLEXPW = PLEXPW[0]
import base64
PLEXPW = str(base64.b64decode(PLEXPW))
except Exception:
print ("Your Plex Password is temporarly needed to proceed:\n")
PLEXPW = str(getpass.getpass("Password: "))
print ("Local Fail. Trying cloud access.")
user = MyPlexAccount.signin(PLEXUN, PLEXPW)
plex = user.resource(PLEXSVR).connect()
client = plex.client(PLEXCLIENT)
psess = plex.sessions()
if not psess:
return ("Stopped")
else:
cur.execute('SELECT State FROM States WHERE Option LIKE \'Nowplaying\'')
nowp = cur.fetchone()
nowp = nowp[0]
if "TV Show:" in nowp:
nowp = nowp.split("Episode: ")
nowp = nowp[1]
elif "Movie:" in nowp:
nowp = nowp.split("Movie: ")
nowp = nowp[1]
for sess in psess:
baseurl = 'http://' + PLEXSERVERIP + ':' + PLEXSERVERPORT
#note: May need to add your local network as an network authorized without access to use the local access method.
try:
LOGGEDIN
except Exception:
try:
plex = PlexServer(baseurl)
except Exception:
print ("Local Fail. Trying cloud access.")
#You will need to add your username, password, servername below if your local access fails.
PLEXUN = "MYPLEXUN"
PLEXPW = "MYPLEXPW"
PLEXSVR = "PLEXSERVERNAME"
user = MyPlexAccount.signin(PLEXUN,PLEXPW)
plex = user.resource(PLEXSVR).connect()
LOGGEDIN = "YES"
def fetch_plex_instance(pkmeter, username=None, password=None, host=None):
username = username or pkmeter.config.get('plexserver', 'username', from_keyring=True)
password = password or pkmeter.config.get('plexserver', 'password', from_keyring=True)
host = host or pkmeter.config.get('plexserver', 'host', '')
if username:
log.info('Logging into MyPlex with user %s', username)
user = MyPlexAccount.signin(username, password)
return user.resource(host).connect()
log.info('Connecting to Plex host: %s', host)
return PlexServer(host)
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXSERVERIP\'')
PLEXSERVERIP = cur.fetchone()
PLEXSERVERIP = PLEXSERVERIP[0]
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXSERVERPORT\'')
PLEXSERVERPORT = cur.fetchone()
PLEXSERVERPORT = PLEXSERVERPORT[0]
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXSERVERTOKEN\'')
PLEXSERVERTOKEN = cur.fetchone()
PLEXSERVERTOKEN = PLEXSERVERTOKEN[0]
except Exception:
print ("Local Variables not set. Run setup to use local access.")
from plexapi.myplex import MyPlexAccount
user = MyPlexAccount.signin(PLEXUN,PLEXPW)
try:
LOGGEDIN
except Exception:
try:
from plexapi.server import PlexServer
baseurl = 'http://' + PLEXSERVERIP + ':' + PLEXSERVERPORT
token = PLEXSERVERTOKEN
plex = PlexServer(baseurl, token)
except Exception:
print ("Local Fail. Trying cloud access.")
plex = user.resource(PLEXSVR).connect()
client = plex.client(PLEXCLIENT)
LOGGEDIN = "YES"
except Exception:
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXUN\'')
PLEXUN = cur.fetchone()
PLEXUN = PLEXUN[0]
try:
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXPW\'')
PLEXPW = cur.fetchone()
PLEXPW = PLEXPW[0]
import base64
PLEXPW = str(base64.b64decode(PLEXPW))
except Exception:
print ("Your Plex Password is temporarly needed to proceed:\n")
PLEXPW = str(getpass.getpass("Password: "))
user = MyPlexAccount.signin(PLEXUN, PLEXPW)
print ("Local Fail. Trying cloud access.")
plex = user.resource(PLEXSVR).connect()
client = plex.client(PLEXCLIENT)
pstatus = client.isPlayingMedia()
sql.close()
#print (pstatus)
if pstatus is True:
return ("Playing")
else:
return ("Stopped")
#say = sessionstatus()
required=False, type=int, default=default_args.test)
parser.add_argument('-u', '--update', help='If you want to update the library set to 1, otherwise 0. Default:{0}'
.format(default_args.update),
required=False, type=int, default=default_args.update)
parser.add_argument('-w', '--wait', help='How long to wait, in seconds, after updating the libraries. Default:{0}'
.format(default_args.update_wait),
required=False, type=int, default=default_args.update_wait)
opts = parser.parse_args()
# Grab any passed in args
args = Args(opts.days, opts.notifiers, opts.max_movies, opts.max_tv,
opts.num_detailed, opts.test, opts.update, opts.wait)
# Get the Plex Server object
account = MyPlexAccount.signin(settings.plex_username, settings.plex_password)
plex = account.resource(settings.plex_servername).connect()
library = plex.library
# Get the Movies and TV sections of the library
movies_section = library.section(settings.movie_library)
tvshows_section = library.section(settings.tvshow_library)
if args.update != 0:
movies_section.refresh()
tvshows_section.refresh()
time.sleep(args.update_wait)
# Create the times
TODAY = int(time.time())
LASTDATE = int(TODAY - args.num_days * 24 * 60 * 60)
today_datetime = datetime.datetime.fromtimestamp(TODAY)
plex = PlexServer(baseurl)
except Exception:
print ("Local Fail. Trying cloud access.")
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXUN\'')
PLEXUN = cur.fetchone()
PLEXUN = PLEXUN[0]
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXPW\'')
if not cur.fetchone():
PLEXPW = getpass.getpass("Plex Password: ")
else:
cur.execute('SELECT setting FROM settings WHERE item LIKE \'PLEXPW\'')
PLEXPW = cur.fetchone()
PLEXPW = PLEXPW[0]
import base64
PLEXPW = str(base65.b64decode(PLEXPW))
user = MyPlexAccount.signin(PLEXUN,PLEXPW)
plex = user.resource(PLEXSVR).connect()
client = plex.client(PLEXCLIENT)
LOGGEDIN = "YES"
except IndexError:
print ("Error getting necessary plex api variables. Run system_setup.py.")
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
try:
account = MyPlexAccount.signin(request.form['username'], request.form['password'])
session['username'] = account.username
session['logged_in'] = True
flash('You were logged in')
return redirect(url_for('show_entries'))
except:
print("Login failed")
return render_template('login.html', error="Login Failed")
return render_template('login.html', error=error)