Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def raisesParserError(code):
"""Helper to test the a given code raises a Metar.ParserError."""
with pytest.raises(Metar.ParserError):
Metar.Metar(code)
def to_metar(textprod, text):
"""Create a METAR object, if possible"""
# Do some cleaning and whitespace trimming
text = sanitize(text)
if len(text) < 10:
return
attempt = 1
mtr = None
original_text = text
valid = textprod.valid
while attempt < 6 and mtr is None:
try:
mtr = METARReport(text, month=valid.month, year=valid.year)
except MetarParserError as inst:
tokens = ERROR_RE.findall(str(inst))
if tokens:
if tokens[0] == text or text.startswith(tokens[0]):
if not SA_RE.match(text):
print(
("%s Aborting due to non-replace %s")
% (textprod.get_product_id(), str(inst))
)
return
# So tokens contains a series of groups that needs updated
newtext = text
for token in tokens[0].split():
newtext = newtext.replace(" %s" % (token,), "")
if newtext != text:
text = newtext
else:
def process_metar(mstr, now):
""" Do the METAR Processing """
mtr = None
while mtr is None:
try:
mtr = Metar(mstr, now.month, now.year)
except MetarParserError as exp:
try:
msg = str(exp)
except Exception:
return None
if msg.find("day is out of range for month") > 0 and now.day == 1:
now -= datetime.timedelta(days=1)
continue
tokens = ERROR_RE.findall(str(exp))
orig_mstr = mstr
if tokens:
for token in tokens[0].split():
mstr = mstr.replace(" %s" % (token,), "")
if orig_mstr == mstr:
print("Can't fix badly formatted metar: " + mstr)
return None
else:
def process_metar(mstr, now):
""" Do the METAR Processing """
mtr = None
while mtr is None:
try:
mtr = Metar(mstr, now.month, now.year)
except MetarParserError as exp:
try:
msg = str(exp)
except Exception:
return None
tokens = ERROR_RE.findall(str(exp))
orig_mstr = mstr
if tokens:
for token in tokens[0].split():
mstr = mstr.replace(" %s" % (token,), "")
if orig_mstr == mstr:
LOG.info("Can't fix badly formatted metar: %s", mstr)
return None
else:
LOG.info("MetarParserError: %s", msg)
return None
except Exception as exp:
data['time'] = obs.time.isoformat()
data['temp'] = obs.temp.value("C")
data['wind_speed'] = obs.wind_speed.value("KMH")
data['wind_deg'] = obs.wind_dir.value()
if not obs.station_id in database:
database[obs.station_id] = {}
database[obs.station_id]['name'] = stations[obs.station_id].name
database[obs.station_id]['lat'] = stations[obs.station_id].lat
database[obs.station_id]['lng'] = stations[obs.station_id].lng
database[obs.station_id]['cycles'] = {}
database[obs.station_id]['cycles'][cycle.group(1)] = data
# print "Adding ",stations[obs.station_id].name
except Metar.ParserError, err:
print "Fail to parse METAR: ",line
# break
# break
# break
# print database
print 'Total entries', len(database)
with open(str(datetime.date.today()) + '-' + str(datetime.datetime.now().hour)+'.json', 'w') as outfile:
outfile.write(json.dumps(database, outfile, indent=4, separators=(',', ': ')))
outfile.close()
os.system("rm *.TXT")
def process_line(line):
"""Decode a single input line."""
line = line.strip()
if len(line) and line[0] == line[0].upper():
try:
obs = Metar.Metar(line)
if report:
print("--------------------")
print(obs.string())
except Metar.ParserError as exc:
if not silent:
print("--------------------")
print("METAR code: ",line)
print(", ".join(exc.args))
if debug:
sys.stderr.write("[ " + url + " ]")
try:
urlh = urlopen(url)
report = ""
for line in urlh:
if not isinstance(line, str):
line = line.decode() # convert Python3 bytes buffer to string
if line.startswith(name):
report = line.strip()
obs = Metar.Metar(line)
print(obs.string())
break
if not report:
print("No data for ", name, "\n\n")
except Metar.ParserError as exc:
print("METAR code: ", line)
print(string.join(exc.args, ", "), "\n")
except:
import traceback
print(traceback.format_exc())
print("Error retrieving", name, "data", "\n")
m = pattern.match(code)
if m:
if debug:
_report_match(handler, m.group())
handler(self, m.groupdict())
code = pattern.sub("", code, 1)
break
except Exception as err:
message = ("%s failed while processing '%s'\n\t%s") % (
handler.__name__,
code,
"\n\t".join(err.args),
)
if strict:
raise ParserError(message)
else:
warnings.warn(message, RuntimeWarning)
if self._unparsed_groups:
code = " ".join(self._unparsed_groups)
message = "Unparsed groups in body '%s' while processing '%s'" % (
code,
metarcode,
)
if strict:
raise ParserError(message)
else:
warnings.warn(message, RuntimeWarning)