Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_epg(self):
try:
os.mkdir(EPG_DIR)
except OSError as e:
if e[0] != 17:
raise(e)
self.trace("Loading epg %s" % self.act_url)
try:
urllib.urlretrieve(self.act_url, EPG_ZIP)
self.lastload = syncTime()
except:
raise APIException("epg download failed")
cmd = "unzip -q -o %s -d %s" % (EPG_ZIP, EPG_DIR)
self.trace(cmd)
os.system(cmd)
def authorize(self):
self.trace("Username is "+self.username)
self.cookiejar.clear()
params = urllib.urlencode({"login" : self.username,
"pass" : md5(md5(self.username).hexdigest()+md5(self.password).hexdigest()).hexdigest(),
"with_cfg" : ''})
self.trace("Authorization started (%s)" % (self.site+"/login?"+ params))
try:
httpstr = self.opener.open(self.site+"/login?"+ params).read()
except IOError as e:
raise APIException(e)
#print httpstr
try:
root = fromstring(httpstr)
except SyntaxError as e:
raise APIException(e)
if root.find('error'):
err = root.find('error')
raise APIException(err.findtext('code').encode('utf-8')+" "+err.findtext('message').encode('utf-8'))
self.sid = root.find('sid').text.encode('utf-8')
self.packet_expire = None #XXX: no info in api..
self.SID = True
settings = root.find('settings')
self.protect_code = settings.findtext('parental_pass').encode("utf-8")
self.trace('protectcode %s' % self.protect_code)
print settings.findtext('time_zone')
self.trace("username = %s" % self.username)
self.cookiejar.clear()
params = urllib.urlencode({"login" : self.username,
"pass" : self.password,
"settings" : "all"})
try:
reply = self.opener.open(self.site+self.apipath+'/xml/login?', params).read()
except IOError as e:
raise APIException(e)
try:
reply = fromstring(reply)
except SyntaxError as e:
raise APIException(e)
if reply.find("error"):
raise APIException(reply.find('error').findtext('message'))
self.packet_expire = datetime.fromtimestamp(int(reply.find('account').findtext('packet_expire')))
#Load settings here, because kartina api is't friendly
self.settings = {}
sett = reply.find("settings")
for s in sett:
if s.tag == "http_caching": continue
value = s.findtext('value')
vallist = []
if s.tag == "stream_server":
for x in s.find('list'):
vallist += [(x.findtext('ip'), x.findtext('descr'))]
elif s.find('list'):
for x in s.find('list'):
vallist += [x.text]
def load_epg(self):
try:
os.mkdir(EPG_DIR)
except OSError as e:
if e[0] != 17:
raise(e)
self.trace("Loading epg %s" % self.act_url)
try:
urllib.urlretrieve(self.act_url, EPG_ZIP)
self.lastload = syncTime()
except:
raise APIException("epg download failed")
cmd = "unzip -q -o %s -d %s" % (EPG_ZIP, EPG_DIR)
self.trace(cmd)
os.system(cmd)
def doget():
self.trace("Getting %s (%s)" % (name, url))
try:
reply = self.opener.open(url).read()
except IOError as e:
raise APIException(e)
try:
root = fromstring(reply)
except SyntaxError as e:
raise APIException(e)
if root.find('error'):
err = root.find('error')
raise APIException(err.find('code').text.encode('utf-8')+" "+err.find('message').text.encode('utf-8'))
self.SID = True
return root
params = urllib.urlencode({"login" : self.username,
"pass" : md5(md5(self.username).hexdigest()+md5(self.password).hexdigest()).hexdigest(),
"with_cfg" : ''})
self.trace("Authorization started (%s)" % (self.site+"/login?"+ params))
try:
httpstr = self.opener.open(self.site+"/login?"+ params).read()
except IOError as e:
raise APIException(e)
#print httpstr
try:
root = fromstring(httpstr)
except SyntaxError as e:
raise APIException(e)
if root.find('error'):
err = root.find('error')
raise APIException(err.findtext('code').encode('utf-8')+" "+err.findtext('message').encode('utf-8'))
self.sid = root.find('sid').text.encode('utf-8')
self.packet_expire = None #XXX: no info in api..
self.SID = True
settings = root.find('settings')
self.protect_code = settings.findtext('parental_pass').encode("utf-8")
self.trace('protectcode %s' % self.protect_code)
print settings.findtext('time_zone')
if Timezone != int(settings.findtext('time_zone')) or self.time_shift*60 != int(settings.findtext('time_shift')):
return 1
return 0
def authorize(self):
self.trace("Username is "+self.username)
self.cookiejar.clear()
params = urllib.urlencode({"login" : self.username,
"pass" : md5(md5(self.username).hexdigest()+md5(self.password).hexdigest()).hexdigest(),
"with_cfg" : ''})
self.trace("Authorization started (%s)" % (self.site+"/login?"+ params))
try:
httpstr = self.opener.open(self.site+"/login?"+ params).read()
except IOError as e:
raise APIException(e)
#print httpstr
try:
root = fromstring(httpstr)
except SyntaxError as e:
raise APIException(e)
if root.find('error'):
err = root.find('error')
raise APIException(err.findtext('code').encode('utf-8')+" "+err.findtext('message').encode('utf-8'))
self.sid = root.find('sid').text.encode('utf-8')
self.packet_expire = None #XXX: no info in api..
self.SID = True
settings = root.find('settings')
self.protect_code = settings.findtext('parental_pass').encode("utf-8")
self.trace('protectcode %s' % self.protect_code)
print settings.findtext('time_zone')
if Timezone != int(settings.findtext('time_zone')) or self.time_shift*60 != int(settings.findtext('time_shift')):
return 1
return 0
def authorize(self):
print "authorizing"
params = urllib.urlencode({"login" : self.username,
"pass" : self.password})
try:
reply = self.opener.open(urllib2.Request('http://tv.team-dobrogea.ru/plugin.php?playlist', params)).readlines()
except Exception as e:
raise APIException(e)
self.readlines = reply
def doget():
self.trace("Getting %s" % (name))
#self.trace("Getting %s (%s)" % (name, url))
try:
reply = self.opener.open(url).read()
print reply
except IOError as e:
raise APIException(e)
try:
root = fromstring(reply)
except SyntaxError as e:
raise APIException(e)
if root.find("error"):
err = root.find("error")
raise APIException(err.findtext('code').encode('utf-8')+" "+err.findtext('message').encode('utf-8'))
self.SID = True
return root