X-Git-Url: https://www.yuggoth.org/gitweb?p=weather.git;a=blobdiff_plain;f=weather.py;h=4920a3ae632c7e3eb7dea951fd0fd47f54ee3ef7;hp=a93f1ed4903c27d49d191b2c4dda836da1249b87;hb=4d25a49d5a5ec5415f8e83ba26fea5adf4e5512a;hpb=b755668b64a70bb7b0efbf1bd0aff8c310471ee1 diff --git a/weather.py b/weather.py index a93f1ed..4920a3a 100644 --- a/weather.py +++ b/weather.py @@ -1,206 +1,564 @@ -# weather.py version 1.1, http://fungi.yuggoth.org/weather/ -# Copyright (c) 2006 Jeremy Stanley , all rights reserved. -# Licensed per terms in the LICENSE file distributed with this software. +# Copyright (c) 2006-2010 Jeremy Stanley . Permission to +# use, copy, modify, and distribute this software is granted under terms +# provided in the LICENSE file distributed with this software. """Contains various object definitions needed by the weather utility.""" -version = "1.1" +version = "1.5" class Selections: - """An object to contain selection data.""" - def __init__(self): - """Store the config, options and arguments.""" - self.config = get_config() - self.options, self.arguments = get_options(self.config) - if self.arguments: - self.arguments = [(x.lower()) for x in self.arguments] - else: self.arguments = [ None ] - def get(self, option, argument=None): - """Retrieve data from the config or options.""" - if not argument: return self.options.__dict__[option] - elif not self.config.has_section(argument): - import sys - sys.stderr.write("ERROR: no alias defined for " \ - + argument + "\n") - sys.exit(1) - elif self.config.has_option(argument, option): - return self.config.get(argument, option) - else: return self.options.__dict__[option] - def get_bool(self, option, argument=None): - """Get data and coerce to a boolean if necessary.""" - return bool(self.get(option, argument)) + """An object to contain selection data.""" + def __init__(self): + """Store the config, options and arguments.""" + self.config = get_config() + self.options, self.arguments = get_options(self.config) + if self.arguments: + self.arguments = [(x.lower()) for x in self.arguments] + else: self.arguments = [ None ] + def get(self, option, argument=None): + """Retrieve data from the config or options.""" + if not argument: return self.options.__dict__[option] + elif not self.config.has_section(argument): + import sys + sys.stderr.write("weather: error: no alias defined for " \ + + argument + "\n") + sys.exit(1) + elif self.config.has_option(argument, option): + return self.config.get(argument, option) + else: return self.options.__dict__[option] + def get_bool(self, option, argument=None): + """Get data and coerce to a boolean if necessary.""" + return bool(self.get(option, argument)) def bool(data): - """Coerce data to a boolean value.""" - if type(data) is str: - if eval(data): return True - else: return False - else: - if data: return True - else: return False + """Coerce data to a boolean value.""" + if type(data) is str: + if eval(data): return True + else: return False + else: + if data: return True + else: return False def quote(words): - """Wrap a string in quotes if it contains spaces.""" - if words.find(" ") != -1: words = "\"" + words + "\"" - return words + """Wrap a string in quotes if it contains spaces.""" + if words.find(" ") != -1: words = "\"" + words + "\"" + return words + +def titlecap(words): + """Perform English-language title capitalization.""" + words = words.lower().strip() + for separator in [" ", "-", "'"]: + newwords = [] + wordlist = words.split(separator) + for word in wordlist: + if word: + newwords.append(word[0].upper() + word[1:]) + words = separator.join(newwords) + end = len(words) + for prefix in ["Mac", "Mc"]: + position = 0 + offset = len(prefix) + while position < end: + position = words.find(prefix, position) + if position == -1: + position = end + position += offset + import string + if position < end and words[position] in string.letters: + words = words[:position] \ + + words[position].upper() \ + + words[position+1:] + return words + +def filter_units(line, units="imperial"): + """Filter or convert units in a line of text between US/UK and metric.""" + import re + # filter lines with both pressures in the form of "X inches (Y hPa)" or + # "X in. Hg (Y hPa)" + dual_p = re.match( + "(.* )(\d*(\.\d+)? (inches|in\. Hg)) \((\d*(\.\d+)? hPa)\)(.*)", + line + ) + if dual_p: + preamble, in_hg, i_fr, i_un, hpa, h_fr, trailer = dual_p.groups() + if units == "imperial": line = preamble + in_hg + trailer + elif units == "metric": line = preamble + hpa + trailer + # filter lines with both temperatures in the form of "X F (Y C)" + dual_t = re.match( + "(.* )(\d*(\.\d+)? F) \((\d*(\.\d+)? C)\)(.*)", + line + ) + if dual_t: + preamble, fahrenheit, f_fr, celsius, c_fr, trailer = dual_t.groups() + if units == "imperial": line = preamble + fahrenheit + trailer + elif units == "metric": line = preamble + celsius + trailer + # if metric is desired, convert distances in the form of "X mile(s)" to + # "Y kilometer(s)" + if units == "metric": + imperial_d = re.match( + "(.* )(\d+)( mile\(s\))(.*)", + line + ) + if imperial_d: + preamble, mi, m_u, trailer = imperial_d.groups() + line = preamble + str(int(round(int(mi)*1.609344))) \ + + " kilometer(s)" + trailer + # filter speeds in the form of "X MPH (Y KT)" to just "X MPH"; if metric is + # desired, convert to "Z KPH" + imperial_s = re.match( + "(.* )(\d+)( MPH)( \(\d+ KT\))(.*)", + line + ) + if imperial_s: + preamble, mph, m_u, kt, trailer = imperial_s.groups() + if units == "imperial": line = preamble + mph + m_u + trailer + elif units == "metric": + line = preamble + str(int(round(int(mph)*1.609344))) + " KPH" + \ + trailer + # if imperial is desired, qualify given forcast temperatures like "X F"; if + # metric is desired, convert to "Y C" + imperial_t = re.match( + "(.* )(High |high |Low |low )(\d+)(\.|,)(.*)", + line + ) + if imperial_t: + preamble, parameter, fahrenheit, sep, trailer = imperial_t.groups() + if units == "imperial": + line = preamble + parameter + fahrenheit + " F" + sep + trailer + elif units == "metric": + line = preamble + parameter \ + + str(int(round((int(fahrenheit)-32)*5/9))) + " C" + sep + trailer + # hand off the resulting line + return line def sorted(data): - """Return a sorted copy of a list.""" - new_copy = data[:] - new_copy.sort() - return new_copy - -def get_url(url): - """Return a string containing the results of a URL GET.""" - import urllib - return urllib.urlopen(url).read() - -def get_metar(id, verbose=False): - """Return a summarized METAR for the specified station.""" - metar = get_url( - "http://weather.noaa.gov/pub/data/observations/metar/decoded/" \ - + id.upper() + ".TXT") - if verbose: return metar - else: - lines = metar.split("\n") - headings = [ - "Relative Humidity", - "Precipitation last hour", - "Sky conditions", - "Temperature", - "Weather", - "Wind" - ] - output = [] - output.append("Current conditions at " \ - + lines[0].split(", ")[1] + " (" \ - + id.upper() +")") - output.append("Last updated " + lines[1]) - for line in lines: - for heading in headings: - if line.startswith(heading + ":"): - if line.endswith(":0"): - line = line[:-2] - output.append(" " + line) - return "\n".join(output) - -def get_forecast(city, st, verbose=False): - """Return the forecast for a specified city/st combination.""" - forecast = get_url("http://weather.noaa.gov/pub/data/forecasts/city/" \ - + st.lower() + "/" + city.lower().replace(" ", "_") \ - + ".txt") - if verbose: return forecast - else: - lines = forecast.split("\n") - output = [] - output.append(lines[2]) - output.append(lines[3]) - for line in lines: - if line.startswith("."): - output.append(line.replace(".", " ", 1)) - return "\n".join(output) + """Return a sorted copy of a list.""" + new_copy = data[:] + new_copy.sort() + return new_copy + +def get_url(url, ignore_fail=False): + """Return a string containing the results of a URL GET.""" + import urllib2 + try: return urllib2.urlopen(url).read() + except urllib2.URLError: + if ignore_fail: return "" + else: + import sys, traceback + sys.stderr.write("weather: error: failed to retrieve\n " \ + + url + "\n " + \ + traceback.format_exception_only(sys.exc_type, sys.exc_value)[0]) + sys.exit(1) + +def get_metar( + id, + verbose=False, + quiet=False, + headers=None, + murl=None, + imperial=False, + metric=False +): + """Return a summarized METAR for the specified station.""" + if not id: + import sys + sys.stderr.write("weather: error: id required for conditions\n") + sys.exit(1) + if not murl: + murl = \ + "http://weather.noaa.gov/pub/data/observations/metar/decoded/%ID%.TXT" + murl = murl.replace("%ID%", id.upper()) + murl = murl.replace("%Id%", id.capitalize()) + murl = murl.replace("%iD%", id) + murl = murl.replace("%id%", id.lower()) + murl = murl.replace(" ", "_") + metar = get_url(murl) + if verbose: return metar + else: + lines = metar.split("\n") + if not headers: + headers = \ + "relative_humidity," \ + + "precipitation_last_hour," \ + + "sky conditions," \ + + "temperature," \ + + "weather," \ + + "wind" + headerlist = headers.lower().replace("_"," ").split(",") + output = [] + if not quiet: + title = "Current conditions at %s" + place = lines[0].split(", ") + if len(place) > 1: + place = "%s, %s (%s)" % (titlecap(place[0]), place[1], id.upper()) + else: place = id.upper() + output.append(title%place) + output.append("Last updated " + lines[1]) + for header in headerlist: + for line in lines: + if line.lower().startswith(header + ":"): + if line.endswith(":0") or line.endswith(":1"): + line = line[:-2] + if imperial: line = filter_units(line, units="imperial") + elif metric: line = filter_units(line, units="metric") + if quiet: output.append(line) + else: output.append(" " + line) + return "\n".join(output) + +def get_alert( + zone, + verbose=False, + quiet=False, + atype=None, + aurl=None, + imperial=False, + metric=False +): + """Return alert notice for the specified zone and type.""" + if not zone: + import sys + sys.stderr.write("weather: error: zone required for alerts\n") + sys.exit(1) + if not atype: atype = "severe_weather_stmt" + if not aurl: + aurl = \ + "http://weather.noaa.gov/pub/data/watches_warnings/%atype%/%zone%.txt" + aurl = aurl.replace("%ATYPE%", atype.upper()) + aurl = aurl.replace("%Atype%", atype.capitalize()) + aurl = aurl.replace("%atypE%", atype) + aurl = aurl.replace("%atype%", atype.lower()) + aurl = aurl.replace("%ZONE%", zone.upper()) + aurl = aurl.replace("%Zone%", zone.capitalize()) + aurl = aurl.replace("%zonE%", zone) + aurl = aurl.replace("%zone%", zone.lower()) + aurl = aurl.replace(" ", "_") + alert = get_url(aurl, ignore_fail=True).strip() + if alert: + if verbose: return alert + else: + lines = alert.split("\n") + muted = True + import calendar, re, time + valid_time = time.strftime("%Y%m%d%H%M") + #if not quiet: output = [ lines[3], lines[5] ] + #if not quiet: output = [ lines[8], lines[10] ] + #else: output = [] + output = [] + for line in lines: + if line.startswith("Expires:") and "Expires:"+valid_time > line: + return "" + if muted and line.find("...") != -1: + muted = False + if line == "$$" \ + or line.startswith("LAT...LON") \ + or line.startswith("TIME...MOT...LOC"): + muted = True + if line and not ( + muted \ + or line == "&&" + or re.match("^/.*/$", line) \ + or re.match("^"+zone.split("/")[1][:3].upper()+".*", line) + ): + if quiet: output.append(line) + else: output.append(" " + line) + return "\n".join(output) + +def get_forecast( + city, + st, + verbose=False, + quiet=False, + flines="0", + furl=None, + imperial=False, + metric=False +): + """Return the forecast for a specified city/st combination.""" + if not city or not st: + import sys + sys.stderr.write("weather: error: city and st required for forecast\n") + sys.exit(1) + if not furl: + furl = "http://weather.noaa.gov/pub/data/forecasts/city/%st%/%city%.txt" + furl = furl.replace("%CITY%", city.upper()) + furl = furl.replace("%City%", city.capitalize()) + furl = furl.replace("%citY%", city) + furl = furl.replace("%city%", city.lower()) + furl = furl.replace("%ST%", st.upper()) + furl = furl.replace("%St%", st.capitalize()) + furl = furl.replace("%sT%", st) + furl = furl.replace("%st%", st.lower()) + furl = furl.replace(" ", "_") + forecast = get_url(furl) + if verbose: return forecast + else: + lines = forecast.split("\n") + output = [] + if not quiet: output += lines[2:4] + flines = int(flines) + if not flines: flines = len(lines) - 5 + for line in lines[5:flines+5]: + if imperial: line = filter_units(line, units="imperial") + elif metric: line = filter_units(line, units="metric") + if line.startswith("."): + if quiet: output.append(line.replace(".", "", 1)) + else: output.append(line.replace(".", " ", 1)) + return "\n".join(output) def get_options(config): - """Parse the options passed on the command line.""" - import optparse - usage = "usage: %prog [ options ] [ alias [ alias [...] ] ]" - verstring = "%prog " + version - option_parser = optparse.OptionParser(usage=usage, version=verstring) - if config.has_option("default", "city"): - default_city = config.get("default", "city") - else: default_city = "Raleigh Durham" - option_parser.add_option("-c", "--city", - dest="city", - default=default_city, - help="the city name (ex: \"Raleigh Durham\")") - if config.has_option("default", "forecast"): - default_forecast = bool(config.get("default", "forecast")) - else: default_forecast = False - option_parser.add_option("-f", "--forecast", - dest="forecast", - action="store_true", - default=default_forecast, - help="include a local forecast") - if config.has_option("default", "id"): - default_id = config.get("default", "id") - else: default_id = "KRDU" - option_parser.add_option("-i", "--id", - dest="id", - default=default_id, - help="the METAR station ID (ex: KRDU)") - option_parser.add_option("-l", "--list", - dest="list", - action="store_true", - default=False, - help="print a list of configured aliases") - if config.has_option("default", "conditions"): - default_conditions = bool(config.get("default", "conditions")) - else: default_conditions = True - option_parser.add_option("-n", "--no-conditions", - dest="conditions", - action="store_false", - default=default_conditions, - help="disable output of current conditions (forces -f)") - option_parser.add_option("-o", "--omit-forecast", - dest="forecast", - action="store_false", - default=default_forecast, - help="omit the local forecast (cancels -f)") - if config.has_option("default", "st"): - default_st = config.get("default", "st") - else: default_st = "NC" - option_parser.add_option("-s", "--st", - dest="st", - default=default_st, - help="the state abbreviation (ex: NC)") - if config.has_option("default", "verbose"): - default_verbose = bool(config.get("default", "verbose")) - else: default_verbose = False - option_parser.add_option("-v", "--verbose", - dest="verbose", - action="store_true", - default=default_verbose, - help="show full decoded feeds") - options, arguments = option_parser.parse_args() - return options, arguments + """Parse the options passed on the command line.""" + + # for optparse's builtin -h/--help option + usage = "usage: %prog [ options ] [ alias [ alias [...] ] ]" + + # for optparse's builtin --version option + verstring = "%prog " + version + + # create the parser + import optparse + option_parser = optparse.OptionParser(usage=usage, version=verstring) + + # the -a/--alert option + if config.has_option("default", "alert"): + default_alert = bool(config.get("default", "alert")) + else: default_alert = False + option_parser.add_option("-a", "--alert", + dest="alert", + action="store_true", + default=default_alert, + help="include local alert notices") + + # the --atypes option + if config.has_option("default", "atypes"): + default_atypes = config.get("default", "atypes") + else: + default_atypes = \ + "flash_flood/statement," \ + + "flash_flood/warning," \ + + "flash_flood/watch," \ + + "flood/coastal," \ + + "flood/statement," \ + + "flood/warning," \ + + "non_precip," \ + + "severe_weather_stmt," \ + + "special_weather_stmt," \ + + "thunderstorm," \ + + "tornado," \ + + "urgent_weather_message" + option_parser.add_option("--atypes", + dest="atypes", + default=default_atypes, + help="alert notification types to display") + + # the --aurl option + if config.has_option("default", "aurl"): + default_aurl = config.get("default", "aurl") + else: + default_aurl = \ + "http://weather.noaa.gov/pub/data/watches_warnings/%atype%/%zone%.txt" + option_parser.add_option("--aurl", + dest="aurl", + default=default_aurl, + help="alert URL (including %atype% and %zone%)") + + # separate options object from list of arguments and return both + # the -c/--city option + if config.has_option("default", "city"): + default_city = config.get("default", "city") + else: default_city = "" + option_parser.add_option("-c", "--city", + dest="city", + default=default_city, + help="the city name (ex: \"Raleigh Durham\")") + + # the --flines option + if config.has_option("default", "flines"): + default_flines = config.get("default", "flines") + else: default_flines = "0" + option_parser.add_option("--flines", + dest="flines", + default=default_flines, + help="maximum number of forecast lines to show") + + # the -f/--forecast option + if config.has_option("default", "forecast"): + default_forecast = bool(config.get("default", "forecast")) + else: default_forecast = False + option_parser.add_option("-f", "--forecast", + dest="forecast", + action="store_true", + default=default_forecast, + help="include a local forecast") + + # the --furl option + if config.has_option("default", "furl"): + default_furl = config.get("default", "furl") + else: + default_furl = \ + "http://weather.noaa.gov/pub/data/forecasts/city/%st%/%city%.txt" + option_parser.add_option("--furl", + dest="furl", + default=default_furl, + help="forecast URL (including %city% and %st%)") + + # the --headers option + if config.has_option("default", "headers"): + default_headers = config.get("default", "headers") + else: + default_headers = \ + "temperature," \ + + "relative_humidity," \ + + "wind," \ + + "weather," \ + + "sky_conditions," \ + + "precipitation_last_hour" + option_parser.add_option("--headers", + dest="headers", + default=default_headers, + help="the conditions headers to display") + + # the -i/--id option + if config.has_option("default", "id"): + default_id = config.get("default", "id") + else: default_id = "" + option_parser.add_option("-i", "--id", + dest="id", + default=default_id, + help="the METAR station ID (ex: KRDU)") + + # the --imperial option + if config.has_option("default", "imperial"): + default_imperial = bool(config.get("default", "imperial")) + else: default_imperial = False + option_parser.add_option("--imperial", + dest="imperial", + action="store_true", + default=default_imperial, + help="filter/convert for US/UK units") + + # the -l/--list option + option_parser.add_option("-l", "--list", + dest="list", + action="store_true", + default=False, + help="print a list of configured aliases") + + # the -m/--metric option + if config.has_option("default", "metric"): + default_metric = bool(config.get("default", "metric")) + else: default_metric = False + option_parser.add_option("-m", "--metric", + dest="metric", + action="store_true", + default=default_metric, + help="filter/convert for metric units") + + # the --murl option + if config.has_option("default", "murl"): + default_murl = config.get("default", "murl") + else: + default_murl = \ + "http://weather.noaa.gov/pub/data/observations/metar/decoded/%ID%.TXT" + option_parser.add_option("--murl", + dest="murl", + default=default_murl, + help="METAR URL (including %id%)") + + # the -n/--no-conditions option + if config.has_option("default", "conditions"): + default_conditions = bool(config.get("default", "conditions")) + else: default_conditions = True + option_parser.add_option("-n", "--no-conditions", + dest="conditions", + action="store_false", + default=default_conditions, + help="disable output of current conditions (forces -f)") + + # the -o/--omit-forecast option + option_parser.add_option("-o", "--omit-forecast", + dest="forecast", + action="store_false", + default=default_forecast, + help="omit the local forecast (cancels -f)") + + # the -q/--quiet option + if config.has_option("default", "quiet"): + default_quiet = bool(config.get("default", "quiet")) + else: default_quiet = False + option_parser.add_option("-q", "--quiet", + dest="quiet", + action="store_true", + default=default_quiet, + help="skip preambles and don't indent") + + # the -s/--st option + if config.has_option("default", "st"): + default_st = config.get("default", "st") + else: default_st = "" + option_parser.add_option("-s", "--st", + dest="st", + default=default_st, + help="the state abbreviation (ex: NC)") + + # the -v/--verbose option + if config.has_option("default", "verbose"): + default_verbose = bool(config.get("default", "verbose")) + else: default_verbose = False + option_parser.add_option("-v", "--verbose", + dest="verbose", + action="store_true", + default=default_verbose, + help="show full decoded feeds (cancels -q)") + + # the -z/--zones option + if config.has_option("default", "zones"): + default_zones = config.get("default", "zones") + else: default_zones = "" + option_parser.add_option("-z", "--zones", + dest="zones", + default=default_zones, + help="alert zones (ex: nc/ncc183,nc/ncz041)") + + options, arguments = option_parser.parse_args() + return options, arguments def get_config(): - """Parse the aliases and configuration.""" - import ConfigParser - config = ConfigParser.ConfigParser() - import os.path - rcfiles = [ - "/etc/weatherrc", - os.path.expanduser("~/.weatherrc"), - "weatherrc" - ] - import os - for rcfile in rcfiles: - if os.access(rcfile, os.R_OK): config.read(rcfile) - for section in config.sections(): - if section != section.lower(): - if config.has_section(section.lower()): - config.remove_section(section.lower()) - config.add_section(section.lower()) - for option,value in config.items(section): - config.set(section.lower(), option, value) - return config + """Parse the aliases and configuration.""" + import ConfigParser + config = ConfigParser.ConfigParser() + import os.path + rcfiles = [ + "/etc/weatherrc", + os.path.expanduser("~/.weatherrc"), + "weatherrc" + ] + import os + for rcfile in rcfiles: + if os.access(rcfile, os.R_OK): config.read(rcfile) + for section in config.sections(): + if section != section.lower(): + if config.has_section(section.lower()): + config.remove_section(section.lower()) + config.add_section(section.lower()) + for option,value in config.items(section): + config.set(section.lower(), option, value) + return config def list_aliases(config): - """Return a formatted list of aliases defined in the config.""" - sections = [] - for section in sorted(config.sections()): - if section.lower() not in sections and section != "default": - sections.append(section.lower()) - output = "configured aliases..." - for section in sorted(sections): - output += "\n " \ - + section \ - + ": --id=" \ - + quote(config.get(section, "id")) \ - + " --city=" \ - + quote(config.get(section, "city")) \ - + " --st=" \ - + quote(config.get(section, "st")) - return output + """Return a formatted list of aliases defined in the config.""" + sections = [] + for section in sorted(config.sections()): + if section.lower() not in sections and section != "default": + sections.append(section.lower()) + output = "configured aliases..." + for section in sorted(sections): + output += "\n " \ + + section \ + + ": --id=" \ + + quote(config.get(section, "id")) \ + + " --city=" \ + + quote(config.get(section, "city")) \ + + " --st=" \ + + quote(config.get(section, "st")) + return output