Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
send simple commands to the client. Returns an ElementTree object containing
the response.
Parameters:
command (str): Command to be sent in for format '/'.
proxy (bool): Set True to proxy this command through the PlexServer.
**params (dict): Additional GET parameters to include with the command.
Raises:
:class:`plexapi.exceptions.Unsupported`: When we detect the client doesn't support this capability.
"""
command = command.strip('/')
controller = command.split('/')[0]
headers = {'X-Plex-Target-Client-Identifier': self.machineIdentifier}
if controller not in self.protocolCapabilities:
log.debug('Client %s doesnt support %s controller.'
'What your trying might not work' % (self.title, controller))
# Workaround for ptp. See https://github.com/pkkid/python-plexapi/issues/244
t = time.time()
if t - self._last_call >= 80 and self.product in ('ptp', 'Plex Media Player'):
url = '/player/timeline/poll?wait=0&commandID=%s' % self._nextCommandId()
if proxy:
self._server.query(url, headers=headers)
else:
self.query(url, headers=headers)
self._last_call = t
params['commandID'] = self._nextCommandId()
key = '/player/%s%s' % (command, utils.joinArgs(params))
proxy = self._proxyThroughServer if proxy is None else proxy
if not filename and response.headers.get('Content-Disposition'):
filename = re.findall(r'filename=\"(.+)\"', response.headers.get('Content-Disposition'))
filename = filename[0] if filename[0] else None
filename = os.path.basename(filename)
fullpath = os.path.join(savepath, filename)
# append file.ext from content-type if not already there
extension = os.path.splitext(fullpath)[-1]
if not extension:
contenttype = response.headers.get('content-type')
if contenttype and 'image' in contenttype:
fullpath += contenttype.split('/')[1]
# check this is a mocked download (testing)
if mocked:
log.debug('Mocked download %s', fullpath)
return fullpath
# save the file to disk
log.info('Downloading: %s', fullpath)
if showstatus: # pragma: no cover
total = int(response.headers.get('content-length', 0))
bar = tqdm(unit='B', unit_scale=True, total=total, desc=filename)
with open(fullpath, 'wb') as handle:
for chunk in response.iter_content(chunk_size=chunksize):
handle.write(chunk)
if showstatus:
bar.update(len(chunk))
if showstatus: # pragma: no cover
bar.close()
if not filename and response.headers.get('Content-Disposition'):
filename = re.findall(r'filename=\"(.+)\"', response.headers.get('Content-Disposition'))
filename = filename[0] if filename[0] else None
filename = os.path.basename(filename)
fullpath = os.path.join(savepath, filename)
# append file.ext from content-type if not already there
extension = os.path.splitext(fullpath)[-1]
if not extension:
contenttype = response.headers.get('content-type')
if contenttype and 'image' in contenttype:
fullpath += contenttype.split('/')[1]
# check this is a mocked download (testing)
if mocked:
log.debug('Mocked download %s', fullpath)
return fullpath
# save the file to disk
log.info('Downloading: %s', fullpath)
if showstatus: # pragma: no cover
total = int(response.headers.get('content-length', 0))
bar = tqdm(unit='B', unit_scale=True, total=total, desc=filename)
with open(fullpath, 'wb') as handle:
for chunk in response.iter_content(chunk_size=chunksize):
handle.write(chunk)
if showstatus:
bar.update(len(chunk))
if showstatus: # pragma: no cover
bar.close()
send simple commands to the client. Returns an ElementTree object containing
the response.
Parameters:
command (str): Command to be sent in for format '/'.
proxy (bool): Set True to proxy this command through the PlexServer.
**params (dict): Additional GET parameters to include with the command.
Raises:
:class:`plexapi.exceptions.Unsupported`: When we detect the client doesn't support this capability.
"""
command = command.strip('/')
controller = command.split('/')[0]
headers = {'X-Plex-Target-Client-Identifier': self.machineIdentifier}
if controller not in self.protocolCapabilities:
log.debug('Client %s doesnt support %s controller.'
'What your trying might not work' % (self.title, controller))
proxy = self._proxyThroughServer if proxy is None else proxy
query = self._server.query if proxy else self.query
# Workaround for ptp. See https://github.com/pkkid/python-plexapi/issues/244
t = time.time()
if t - self._last_call >= 80 and self.product in ('ptp', 'Plex Media Player'):
url = '/player/timeline/poll?wait=0&commandID=%s' % self._nextCommandId()
query(url, headers=headers)
self._last_call = t
params['commandID'] = self._nextCommandId()
key = '/player/%s%s' % (command, utils.joinArgs(params))
try:
send simple commands to the client. Returns an ElementTree object containing
the response.
Parameters:
command (str): Command to be sent in for format '/'.
proxy (bool): Set True to proxy this command through the PlexServer.
**params (dict): Additional GET parameters to include with the command.
Raises:
:class:`~plexapi.exceptions.Unsupported`: When we detect the client
doesn't support this capability.
"""
command = command.strip('/')
controller = command.split('/')[0]
if controller not in self.protocolCapabilities:
log.debug('Client %s doesnt support %s controller.'
'What your trying might not work' % (self.title, controller))
params['commandID'] = self._nextCommandId()
key = '/player/%s%s' % (command, utils.joinArgs(params))
headers = {'X-Plex-Target-Client-Identifier': self.machineIdentifier}
proxy = self._proxyThroughServer if proxy is None else proxy
if proxy:
return self._server.query(key, headers=headers)
return self.query(key, headers=headers)
def _getSectionIds(self, server, sections):
""" Converts a list of section objects or names to sectionIds needed for library sharing. """
if not sections: return []
# Get a list of all section ids for looking up each section.
allSectionIds = {}
machineIdentifier = server.machineIdentifier if isinstance(server, PlexServer) else server
url = self.PLEXSERVERS.replace('{machineId}', machineIdentifier)
data = self.query(url, self._session.get)
for elem in data[0]:
allSectionIds[elem.attrib.get('id', '').lower()] = elem.attrib.get('id')
allSectionIds[elem.attrib.get('title', '').lower()] = elem.attrib.get('id')
allSectionIds[elem.attrib.get('key', '').lower()] = elem.attrib.get('id')
log.debug(allSectionIds)
# Convert passed in section items to section ids from above lookup
sectionIds = []
for section in sections:
sectionKey = section.key if isinstance(section, LibrarySection) else section
sectionIds.append(allSectionIds[sectionKey.lower()])
return sectionIds
def query(self, url, method=None, headers=None, timeout=None, **kwargs):
method = method or self._session.get
timeout = timeout or TIMEOUT
log.debug('%s %s %s', method.__name__.upper(), url, kwargs.get('json', ''))
headers = self._headers(**headers or {})
response = method(url, headers=headers, timeout=timeout, **kwargs)
if response.status_code not in (200, 201, 204): # pragma: no cover
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
log.warning('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None
def query(self, key, method=None, headers=None, timeout=None, **kwargs):
""" Main method used to handle HTTPS requests to the Plex server. This method helps
by encoding the response to utf-8 and parsing the returned XML into and
ElementTree object. Returns None if no data exists in the response.
"""
url = self.url(key)
method = method or self._session.get
timeout = timeout or TIMEOUT
log.debug('%s %s', method.__name__.upper(), url)
headers = self._headers(**headers or {})
response = method(url, headers=headers, timeout=timeout, **kwargs)
if response.status_code not in (200, 201):
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
log.warning('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None