Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
rsp = session.get(
'http://www.pwsweather.com/pwsupdate/pwsupdate.php',
params=prepared_data, timeout=60)
except Exception as ex:
return False, repr(ex)
if rsp.status_code != 200:
return False, 'http status: {:d}'.format(rsp.status_code)
text = rsp.text.strip()
if text:
return True, 'server response "{:s}"'.format(text)
return True, 'OK'
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
for header in literal_eval(self.params['http headers']):
session.headers.update({header[0]: header[1]})
rsp = session.get(self.params['api url'], params=prepared_data,
timeout=60)
except Exception as ex:
return False, repr(ex)
if rsp.status_code != 200:
return False, 'http status: {:d}'.format(rsp.status_code)
rsp = rsp.json()
if rsp:
return True, 'server response "{!r}"'.format(rsp)
return True, 'OK'
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
mode = 'r'
else:
mode = 'rb'
try:
with open(path, mode) as f:
if text_file:
session.storlines('STOR %s' % (target), f)
else:
session.storbinary('STOR %s' % (target), f)
except Exception as ex:
return False, repr(ex)
return True, 'OK'
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
for key, value in prepared_data.items():
if value == '':
value = 'None'
try:
session.publish(self.params['topic'] + "/" + key, value,
retain=self.params['retain'])
except Exception as ex:
return False, repr(ex)
# Need to make sure the messages have been flushed to the
# server.
session.loop(timeout=0.5)
return True, 'OK'
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
with paramiko.SFTPClient.from_transport(transport) as session:
session.get_channel().settimeout(20)
session.chdir(self.params['directory'])
yield session
def upload_file(self, session, path):
target = os.path.basename(path)
try:
session.put(path, target)
except Exception as ex:
return False, repr(ex)
return True, 'OK'
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
url = 'https://rtupdate.wunderground.com/'
else:
url = 'https://weatherstation.wunderground.com/'
url += 'weatherstation/updateweatherstation.php'
try:
rsp = session.get(url, params=prepared_data, timeout=60)
except Exception as ex:
return False, repr(ex)
if rsp.status_code != 200:
return False, 'http status: {:d}'.format(rsp.status_code)
text = rsp.text.strip()
return text == 'success', 'server response "{:s}"'.format(text)
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
def upload_data(self, session, prepared_data={}):
try:
rsp = session.post('http://interface.wetterarchiv.de/weather/',
data=prepared_data, timeout=60)
except Exception as ex:
return False, repr(ex)
if rsp.status_code != 200:
return False, 'http status: {:d}'.format(rsp.status_code)
rsp = rsp.json()
if rsp:
return True, 'server response "{!r}"'.format(rsp)
return True, 'OK'
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
if not webbrowser.open(auth_request_url, new=2, autoraise=0):
print('Please use a web browser to open the following URL')
print(auth_request_url)
if sys.version_info[0] >= 3:
input_ = input
else:
input_ = raw_input
code = input_('Please enter the auth code shown in your web browser: ')
code = code.strip()
# log in
access_token = api.log_in(code=code, scopes=['write'])
self.context.params.set(service_name, 'access_token', access_token)
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))
yield session
def upload_data(self, session, prepared_data={}):
url = 'https://stations.windy.com/pws/update/' + self.params['api_key']
try:
rsp = session.get(url, params=prepared_data, timeout=60)
except Exception as ex:
return False, repr(ex)
if rsp.status_code != 200:
return False, 'http status: {:d}'.format(rsp.status_code)
text = rsp.text.strip()
return 'SUCCESS' in text, 'server response "{:s}"'.format(text)
if __name__ == "__main__":
sys.exit(pywws.service.main(ToService))