Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
params = {'server_id': machineId, 'shared_server': {'library_section_ids': sectionIds}}
url = self.FRIENDSERVERS.format(machineId=machineId, serverId=serverId)
else:
params = {'server_id': machineId, 'shared_server': {'library_section_ids': sectionIds,
"invited_id": user.id}}
url = self.FRIENDINVITE.format(machineId=machineId)
# Remove share sections, add shares to user without shares, or update shares
if sectionIds:
if removeSections is True:
response_servers = self.query(url, self._session.delete, json=params, headers=headers)
elif 'invited_id' in params.get('shared_server', ''):
response_servers = self.query(url, self._session.post, json=params, headers=headers)
else:
response_servers = self.query(url, self._session.put, json=params, headers=headers)
else:
log.warning('Section name, number of section object is required changing library sections')
# Update friend filters
url = self.FRIENDUPDATE.format(userId=user.id)
params = {}
if isinstance(allowSync, bool):
params['allowSync'] = '1' if allowSync else '0'
if isinstance(allowCameraUpload, bool):
params['allowCameraUpload'] = '1' if allowCameraUpload else '0'
if isinstance(allowChannels, bool):
params['allowChannels'] = '1' if allowChannels else '0'
if isinstance(filterMovies, dict):
params['filterMovies'] = self._filterDictToStr(filterMovies or {}) # '1' if allowChannels else '0'
if isinstance(filterTelevision, dict):
params['filterTelevision'] = self._filterDictToStr(filterTelevision or {})
if isinstance(allowChannels, dict):
params['filterMusic'] = self._filterDictToStr(filterMusic or {})
if params:
if not isinstance(value, (list, tuple)):
value = [value]
# convert list of values to list of keys or ids
result = set()
choices = self.listChoices(category, libtype)
lookup = {c.title.lower(): unquote(unquote(c.key)) for c in choices}
allowed = set(c.key for c in choices)
for item in value:
item = str((item.id or item.tag) if isinstance(item, MediaTag) else item).lower()
# find most logical choice(s) to use in url
if item in allowed: result.add(item); continue
if item in lookup: result.add(lookup[item]); continue
matches = [k for t, k in lookup.items() if item in t]
if matches: map(result.add, matches); continue
# nothing matched; use raw item value
log.warning('Filter value not listed, using raw item value: %s' % item)
result.add(item)
return ','.join(result)
def query(self, key, method=None, headers=None, timeout=None, **kwargs):
""" Main method used to handle HTTPS requests to the Plex server. This method helps
by encoding the response to utf-8 and parsing the returned XML into and
ElementTree object. Returns None if no data exists in the response.
"""
url = self.url(key)
method = method or self._session.get
timeout = timeout or TIMEOUT
log.debug('%s %s', method.__name__.upper(), url)
headers = self._headers(**headers or {})
response = method(url, headers=headers, timeout=timeout, **kwargs)
if response.status_code not in (200, 201):
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
log.warning('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None
def query(self, url, method=None, headers=None, timeout=None, **kwargs):
method = method or self._session.get
timeout = timeout or TIMEOUT
log.debug('%s %s %s', method.__name__.upper(), url, kwargs.get('json', ''))
headers = self._headers(**headers or {})
response = method(url, headers=headers, timeout=timeout, **kwargs)
if response.status_code not in (200, 201, 204): # pragma: no cover
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
log.warning('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None
def query(self, path, method=None, headers=None, timeout=None, **kwargs):
""" Main method used to handle HTTPS requests to the Plex client. This method helps
by encoding the response to utf-8 and parsing the returned XML into and
ElementTree object. Returns None if no data exists in the response.
"""
url = self.url(path)
method = method or self._session.get
timeout = timeout or TIMEOUT
log.debug('%s %s', method.__name__.upper(), url)
headers = self._headers(**headers or {})
response = method(url, headers=headers, timeout=timeout, **kwargs)
if response.status_code not in (200, 201):
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
log.warning('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None
def query(self, path, method=None, headers=None, timeout=None, **kwargs):
""" Main method used to handle HTTPS requests to the Plex client. This method helps
by encoding the response to utf-8 and parsing the returned XML into and
ElementTree object. Returns None if no data exists in the response.
"""
url = self.url(path)
method = method or self._session.get
timeout = timeout or TIMEOUT
log.debug('%s %s', method.__name__.upper(), url)
headers = self._headers(**headers or {})
response = method(url, headers=headers, timeout=timeout, **kwargs)
if response.status_code not in (200, 201):
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
log.warning('BadRequest (%s) %s %s; %s' % (response.status_code, codename, response.url, errtext))
raise BadRequest('(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None
if not isinstance(value, (list, tuple)):
value = [value]
# convert list of values to list of keys or ids
result = set()
choices = self.listChoices(category, libtype)
lookup = {c.title.lower(): unquote(unquote(c.key)) for c in choices}
allowed = set(c.key for c in choices)
for item in value:
item = str((item.id or item.tag) if isinstance(item, MediaTag) else item).lower()
# find most logical choice(s) to use in url
if item in allowed: result.add(item); continue
if item in lookup: result.add(lookup[item]); continue
matches = [k for t, k in lookup.items() if item in t]
if matches: map(result.add, matches); continue
# nothing matched; use raw item value
log.warning('Filter value not listed, using raw item value: %s' % item)
result.add(item)
return ','.join(result)
def clients(self):
""" Returns list of all :class:`~plexapi.client.PlexClient` objects connected to server. """
items = []
ports = None
for elem in self.query('/clients'):
port = elem.attrib.get('port')
if not port:
log.warning('%s did not advertise a port, checking plex.tv.', elem.attrib.get('name'))
ports = self._myPlexClientPorts() if ports is None else ports
port = ports.get(elem.attrib.get('machineIdentifier'))
baseurl = 'http://%s:%s' % (elem.attrib['host'], port)
items.append(PlexClient(baseurl=baseurl, server=self,
token=self._token, data=elem, connect=False))
return items