"""Contains various object definitions needed by the weather utility."""
weather_copyright = """\
-# Copyright (c) 2006-2020 Jeremy Stanley <fungi@yuggoth.org>. Permission to
+# Copyright (c) 2006-2024 Jeremy Stanley <fungi@yuggoth.org>. Permission to
# use, copy, modify, and distribute this software is granted under terms
# provided in the LICENSE file distributed with this software.
#"""
-weather_version = "2.4"
+weather_version = "2.5.0"
radian_to_km = 6372.795484
radian_to_mi = 3959.871528
-def pyversion(ref=None):
- """Determine the Python version and optionally compare to a reference."""
- import platform
- ver = platform.python_version()
- if ref:
- return [
- int(x) for x in ver.split(".")[:2]
- ] >= [
- int(x) for x in ref.split(".")[:2]
- ]
- else: return ver
-
class Selections:
"""An object to contain selection data."""
def __init__(self):
return self.config.get(argument, option)
if option in self.options.__dict__:
return self.options.__dict__[option]
- else:
- import os, sys
- message = "%s error: no URI defined for %s\n" % (
- os.path.basename( sys.argv[0] ),
- option
- )
- sys.stderr.write(message)
- exit(1)
+ import sys
+ message = "WARNING: no URI defined for %s\n" % option
+ sys.stderr.write(message)
+ return None
def get_bool(self, option, argument=None):
"""Get data and coerce to a boolean if necessary."""
- return bool(self.get(option, argument))
+ # Mimic configparser's getboolean() method by treating
+ # false/no/off/0 as False and true/yes/on/1 as True values,
+ # case-insensitively
+ value = self.get(option, argument)
+ if isinstance(value, bool):
+ return value
+ if isinstance(value, str):
+ vlower = value.lower()
+ if vlower in ('false', 'no', 'off', '0'):
+ return False
+ elif vlower in ('true', 'yes', 'on', '1'):
+ return True
+ raise ValueError("Not a boolean: %s" % value)
def getint(self, option, argument=None):
"""Get data and coerce to an integer if necessary."""
value = self.get(option, argument)
# filter lines with both pressures in the form of "X inches (Y hPa)" or
# "X in. Hg (Y hPa)"
dual_p = re.match(
- "(.* )(\d*(\.\d+)? (inches|in\. Hg)) \((\d*(\.\d+)? hPa)\)(.*)",
+ r"(.* )(\d*(\.\d+)? (inches|in\. Hg)) \((\d*(\.\d+)? hPa)\)(.*)",
line
)
if dual_p:
elif units == "metric": line = preamble + hpa + trailer
# filter lines with both temperatures in the form of "X F (Y C)"
dual_t = re.match(
- "(.* )(-?\d*(\.\d+)? F) \((-?\d*(\.\d+)? C)\)(.*)",
+ r"(.* )(-?\d*(\.\d+)? F) \((-?\d*(\.\d+)? C)\)(.*)",
line
)
if dual_t:
# "Y kilometer(s)"
if units == "metric":
imperial_d = re.match(
- "(.* )(\d+)( mile\(s\))(.*)",
+ r"(.* )(\d+)( mile\(s\))(.*)",
line
)
if imperial_d:
# filter speeds in the form of "X MPH (Y KT)" to just "X MPH"; if metric is
# desired, convert to "Z KPH"
imperial_s = re.match(
- "(.* )(\d+)( MPH)( \(\d+ KT\))(.*)",
+ r"(.* )(\d+)( MPH)( \(\d+ KT\))(.*)",
line
)
if imperial_s:
line = preamble + str(int(round(int(mph)*1.609344))) + " KPH" + \
trailer
imperial_s = re.match(
- "(.* )(\d+)( MPH)( \(\d+ KT\))(.*)",
+ r"(.* )(\d+)( MPH)( \(\d+ KT\))(.*)",
line
)
if imperial_s:
# if imperial is desired, qualify given forcast temperatures like "X F"; if
# metric is desired, convert to "Y C"
imperial_t = re.match(
- "(.* )(High |high |Low |low )(\d+)(\.|,)(.*)",
+ r"(.* )(High |high |Low |low )(\d+)(\.|,)(.*)",
line
)
if imperial_t:
cachedir="."
):
"""Return a string containing the results of a URI GET."""
- if pyversion("3"):
- import urllib, urllib.error, urllib.request
- URLError = urllib.error.URLError
- urlopen = urllib.request.urlopen
- else:
- import urllib2 as urllib
- URLError = urllib.URLError
- urlopen = urllib.urlopen
- import os, time
+ import os, time, urllib, urllib.error, urllib.request
if cache_data:
dcachedir = os.path.join( os.path.expanduser(cachedir), "datacache" )
if not os.path.exists(dcachedir):
except (IOError, OSError): pass
dcache_fn = os.path.join(
dcachedir,
- uri.split(":")[1].replace("/","_")
+ uri.split(":",1)[1].replace("/","_")
)
now = time.time()
if cache_data and os.access(dcache_fn, os.R_OK) \
dcache_fd.close()
else:
try:
- if pyversion("3"): data = urlopen(uri).read().decode("utf-8")
- else: data = urlopen(uri).read()
- except URLError:
+ data = urllib.request.urlopen(uri).read().decode("utf-8")
+ except urllib.error.URLError:
if ignore_fail: return ""
- else:
- import os, sys, traceback
- message = "%s error: failed to retrieve\n %s\n %s" % (
- os.path.basename( sys.argv[0] ),
- uri,
- traceback.format_exception_only(
- sys.exc_type,
- sys.exc_value
- )[0]
- )
- sys.stderr.write(message)
- sys.exit(1)
+ import os, sys
+ sys.stderr.write("%s error: failed to retrieve\n %s\n\n" % (
+ os.path.basename( sys.argv[0] ), uri))
+ raise
# Some data sources are HTML with the plain text wrapped in pre tags
if "<pre>" in data:
data = data[data.find("<pre>")+5:data.find("</pre>")]
cacheage=cacheage,
cachedir=cachedir
)
- if pyversion("3") and type(metar) is bytes: metar = metar.decode("utf-8")
+ if type(metar) is bytes: metar = metar.decode("utf-8")
if verbose: return metar
else:
import re
quiet=False,
cache_data=False,
cacheage=900,
- cachedir="."
+ cachedir=".",
+ delay=1
):
"""Return alert notice for the specified URI."""
if not uri:
- import os, sys
- message = "%s error: Alert URI required for alerts\n" % \
- os.path.basename( sys.argv[0] )
- sys.stderr.write(message)
- sys.exit(1)
+ return ""
alert = get_uri(
uri,
ignore_fail=True,
cacheage=cacheage,
cachedir=cachedir
).strip()
- if pyversion("3") and type(alert) is bytes: alert = alert.decode("utf-8")
+ if type(alert) is bytes: alert = alert.decode("utf-8")
if alert:
if verbose: return alert
else:
- if alert.find("\nNATIONAL WEATHER SERVICE") == -1:
- muted = False
- else:
+ import re
+ if re.search(r"\nNational Weather Service", alert):
muted = True
+ else:
+ muted = False
+ expirycheck = re.search(r"Expires:([0-9]{12})", alert)
+ if expirycheck:
+ # only report alerts and forecasts that expired less than delay
+ # hours ago
+ import datetime, zoneinfo
+ expiration = datetime.datetime.fromisoformat(
+ "%s-%s-%sT%s:%s" % (
+ expirycheck[1][:4],
+ expirycheck[1][4:6],
+ expirycheck[1][6:8],
+ expirycheck[1][8:10],
+ expirycheck[1][-2:],
+ )).replace(tzinfo=zoneinfo.ZoneInfo("UTC"))
+ now = datetime.datetime.now(tz=zoneinfo.ZoneInfo("UTC"))
+ if now - expiration > datetime.timedelta(hours=delay):
+ return ""
lines = alert.split("\n")
- import time
- valid_time = time.strftime("%Y%m%d%H%M")
output = []
for line in lines:
- if line.startswith("Expires:") \
- and "Expires:" + valid_time > line:
- return ""
- if muted and line.startswith("NATIONAL WEATHER SERVICE"):
+ if muted and line.startswith("National Weather Service"):
muted = False
line = ""
elif line == "&&":
# the -a/--alert option
if config.has_option("default", "alert"):
- default_alert = bool(config.get("default", "alert"))
+ default_alert = config.getboolean("default", "alert")
else: default_alert = False
option_parser.add_option("-a", "--alert",
dest="alert",
+ "flash_flood_statement," \
+ "flash_flood_warning," \
+ "flash_flood_watch," \
- + "flood_statement," \
+ "flood_warning," \
- + "marine_weather_statement," \
- + "river_statement," \
+ "severe_thunderstorm_warning," \
+ "severe_weather_statement," \
- + "short_term_forecast," \
- + "special_marine_warning," \
+ "special_weather_statement," \
- + "tornado_warning," \
+ + "tornado," \
+ "urgent_weather_message"
option_parser.add_option("--atypes",
dest="atypes",
default=default_cachedir,
help="directory for storing cached searches and data")
+ # the --delay option
+ if config.has_option("default", "delay"):
+ default_delay = config.getint("default", "delay")
+ else: default_delay = 1
+ option_parser.add_option("--delay",
+ dest="delay",
+ default=default_delay,
+ help="hours to delay alert and forecast expiration")
+
# the -f/--forecast option
if config.has_option("default", "forecast"):
- default_forecast = bool(config.get("default", "forecast"))
+ default_forecast = config.getboolean("default", "forecast")
else: default_forecast = False
option_parser.add_option("-f", "--forecast",
dest="forecast",
# the --imperial option
if config.has_option("default", "imperial"):
- default_imperial = bool(config.get("default", "imperial"))
+ default_imperial = config.getboolean("default", "imperial")
else: default_imperial = False
option_parser.add_option("--imperial",
dest="imperial",
# the -m/--metric option
if config.has_option("default", "metric"):
- default_metric = bool(config.get("default", "metric"))
+ default_metric = config.getboolean("default", "metric")
else: default_metric = False
option_parser.add_option("-m", "--metric",
dest="metric",
# the -n/--no-conditions option
if config.has_option("default", "conditions"):
- default_conditions = bool(config.get("default", "conditions"))
+ default_conditions = config.getboolean("default", "conditions")
else: default_conditions = True
option_parser.add_option("-n", "--no-conditions",
dest="conditions",
# the --no-cache option
if config.has_option("default", "cache"):
- default_cache = bool(config.get("default", "cache"))
+ default_cache = config.getboolean("default", "cache")
else: default_cache = True
option_parser.add_option("--no-cache",
dest="cache",
# the --no-cache-data option
if config.has_option("default", "cache_data"):
- default_cache_data = bool(config.get("default", "cache_data"))
+ default_cache_data = config.getboolean("default", "cache_data")
else: default_cache_data = True
option_parser.add_option("--no-cache-data",
dest="cache_data",
# the --no-cache-search option
if config.has_option("default", "cache_search"):
- default_cache_search = bool(config.get("default", "cache_search"))
+ default_cache_search = config.getboolean("default", "cache_search")
else: default_cache_search = True
option_parser.add_option("--no-cache-search",
dest="cache_search",
# the -q/--quiet option
if config.has_option("default", "quiet"):
- default_quiet = bool(config.get("default", "quiet"))
+ default_quiet = config.getboolean("default", "quiet")
else: default_quiet = False
option_parser.add_option("-q", "--quiet",
dest="quiet",
# the -v/--verbose option
if config.has_option("default", "verbose"):
- default_verbose = bool(config.get("default", "verbose"))
+ default_verbose = config.getboolean("default", "verbose")
else: default_verbose = False
option_parser.add_option("-v", "--verbose",
dest="verbose",
def get_config():
"""Parse the aliases and configuration."""
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
+ import configparser, os
config = configparser.ConfigParser()
- import os
rcfiles = [
"/etc/weatherrc",
"/etc/weather/weatherrc",
"weatherrc"
]
for rcfile in rcfiles:
- if os.access(rcfile, os.R_OK): config.read(rcfile)
+ if os.access(rcfile, os.R_OK):
+ config.read(rcfile, encoding="utf-8")
for section in config.sections():
if section != section.lower():
if config.has_section(section.lower()):
def integrate_search_cache(config, cachedir, setpath):
"""Add cached search results into the configuration."""
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
- import os, time
+ import configparser, os, time
scache_fn = os.path.join( os.path.expanduser(cachedir), "searches" )
if not os.access(scache_fn, os.R_OK): return config
scache_fd = open(scache_fn)
pass
return config
scache = configparser.ConfigParser()
- scache.read(scache_fn)
+ scache.read(scache_fn, encoding="utf-8")
for section in scache.sections():
if not config.has_section(section):
config.add_section(section)
quiet=False
):
"""Find URIs using airport, gecos, placename, station, ZCTA/ZIP, zone."""
- import codecs, datetime, time, os, re, sys
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
+ import codecs, configparser, datetime, time, os, re, sys
datafiles = data_index(path)
if re.match("[A-Za-z]{3}$", expression): searchtype = "airport"
elif re.match("[A-Za-z0-9]{4}$", expression): searchtype = "station"
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- stations.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: stations.readfp( gzip.open(datafile) )
+ stations.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- stations.read(datafile)
+ stations.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- zones.read_string( gzip.open(datafile).read().decode("utf-8") )
- else: zones.readfp( gzip.open(datafile) )
+ zones.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- zones.read(datafile)
+ zones.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- airports.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: airports.readfp( gzip.open(datafile) )
+ airports.read_string(
+ gzip.open(datafile).read().decode("utf-8") )
else:
- airports.read(datafile)
+ airports.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- zctas.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: zctas.readfp( gzip.open(datafile) )
+ zctas.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- zctas.read(datafile)
+ zctas.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- places.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: places.readfp( gzip.open(datafile) )
+ places.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- places.read(datafile)
+ places.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
print(
" (proximity %s, %.3gkm, %.3gmi)" % ( score, km, mi )
)
- elif searchtype is "coordinates":
+ elif searchtype == "coordinates":
print( " (%.3gkm, %.3gmi)" % (km, mi) )
if zone[0]:
print(
print(
" (proximity %s, %.3gkm, %.3gmi)" % ( score, km, mi )
)
- elif searchtype is "coordinates" and zone[0]:
+ elif searchtype == "coordinates" and zone[0]:
print( " (%.3gkm, %.3gmi)" % (km, mi) )
if cache_search:
now = time.time()
)
search_cache = ["\n"]
search_cache.append( "[%s]\n" % search[0] )
- search_cache.append( "description = cached %s\n" % nowstamp )
+ search_cache.append( "cached = %s\n" % nowstamp )
for uriname in sorted(uris.keys()):
search_cache.append( "%s = %s\n" % ( uriname, uris[uriname] ) )
real_cachedir = os.path.expanduser(cachedir)
)
try:
scache_existing = configparser.ConfigParser()
- scache_existing.read(scache_fn)
+ scache_existing.read(scache_fn, encoding="utf-8")
if not scache_existing.has_section(search[0]):
scache_fd = codecs.open(scache_fn, "a", "utf-8")
scache_fd.writelines(search_cache)
return tuple(coordinates)
def correlate():
- import codecs, csv, datetime, hashlib, os, re, sys, tarfile, time, zipfile
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
+ import codecs, configparser, csv, datetime, hashlib, os, re, sys, time
+ import zipfile, zoneinfo
for filename in os.listdir("."):
if re.match("[0-9]{4}_Gaz_counties_national.zip$", filename):
gcounties_an = filename
weather_copyright,
os.path.basename( sys.argv[0] ),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( time.time() )
+ datetime.datetime.utcfromtimestamp( int(os.environ.get('SOURCE_DATE_EPOCH', time.time())) )
),
hashlib.md5( open(gcounties_an, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(gcounties_an) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(gcounties_an) )
),
gcounties_an,
hashlib.md5( open(gcousubs_an, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(gcousubs_an) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(gcousubs_an) )
),
gcousubs_an,
hashlib.md5( open(gplace_an, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(gplace_an) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(gplace_an) )
),
gplace_an,
hashlib.md5( open(gzcta_an, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(gzcta_an) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(gzcta_an) )
),
gzcta_an,
hashlib.md5( open(cpfzcf_fn, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(cpfzcf_fn) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(cpfzcf_fn) )
),
cpfzcf_fn,
hashlib.md5( open(nsdcccc_fn, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(nsdcccc_fn) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(nsdcccc_fn) )
),
nsdcccc_fn,
hashlib.md5( open(ourairports_fn, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(ourairports_fn) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(ourairports_fn) )
),
ourairports_fn,
hashlib.md5( open(overrides_fn, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(overrides_fn) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(overrides_fn) )
),
overrides_fn,
hashlib.md5( open(slist_fn, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(slist_fn) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(slist_fn) )
),
slist_fn,
hashlib.md5( open(zlist_fn, "rb").read() ).hexdigest(),
datetime.date.isoformat(
- datetime.datetime.fromtimestamp( os.path.getmtime(zlist_fn) )
+ datetime.datetime.utcfromtimestamp( os.path.getmtime(zlist_fn) )
),
zlist_fn
)
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- gcounties = zipfile.ZipFile(gcounties_an).open(gcounties_fn, "rU")
+ gcounties = zipfile.ZipFile(gcounties_an).open(gcounties_fn, "r")
columns = gcounties.readline().decode("utf-8").strip().split("\t")
for line in gcounties:
fields = line.decode("utf-8").strip().split("\t")
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- gcousubs = zipfile.ZipFile(gcousubs_an).open(gcousubs_fn, "rU")
+ gcousubs = zipfile.ZipFile(gcousubs_an).open(gcousubs_fn, "r")
columns = gcousubs.readline().decode("utf-8").strip().split("\t")
for line in gcousubs:
fields = line.decode("utf-8").strip().split("\t")
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- gplace = zipfile.ZipFile(gplace_an).open(gplace_fn, "rU")
+ gplace = zipfile.ZipFile(gplace_an).open(gplace_fn, "r")
columns = gplace.readline().decode("utf-8").strip().split("\t")
for line in gplace:
fields = line.decode("utf-8").strip().split("\t")
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- slist = codecs.open(slist_fn, "rU", "utf-8")
+ slist = codecs.open(slist_fn, "r", "utf-8")
for line in slist:
icao = line.split("#")[0].strip()
if icao:
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- nsdcccc = codecs.open(nsdcccc_fn, "rU", "utf-8")
+ nsdcccc = codecs.open(nsdcccc_fn, "r", "utf-8")
for line in nsdcccc:
line = str(line)
fields = line.split(";")
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- ourairports = open(ourairports_fn, "rU")
+ ourairports = open(ourairports_fn, "r")
for row in csv.reader(ourairports):
- icao = row[12].decode('utf-8').lower()
+ icao = row[12].lower()
if icao in stations:
- iata = row[13].decode('utf-8').lower()
+ iata = row[13].lower()
if len(iata) == 3: airports[iata] = { "station": icao }
if "description" not in stations[icao]:
description = []
- name = row[3].decode('utf-8')
+ name = row[3]
if name: description.append(name)
- municipality = row[10].decode('utf-8')
+ municipality = row[10]
if municipality: description.append(municipality)
- region = row[9].decode('utf-8')
- country = row[8].decode('utf-8')
+ region = row[9]
+ country = row[8]
if region:
if "-" in region:
c,r = region.split("-", 1)
if description:
stations[icao]["description"] = ", ".join(description)
if "location" not in stations[icao]:
- lat = row[4].decode('utf-8')
+ lat = row[4]
if lat:
- lon = row[5].decode('utf-8')
+ lon = row[5]
if lon:
stations[icao]["location"] = gecos(
"%s,%s" % (lat, lon)
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- zlist = codecs.open(zlist_fn, "rU", "utf-8")
+ zlist = codecs.open(zlist_fn, "r", "utf-8")
for line in zlist:
line = line.split("#")[0].strip()
if line:
sys.stdout.flush()
count = 0
cpfz = {}
- cpfzcf = codecs.open(cpfzcf_fn, "rU", "utf-8")
+ cpfzcf = codecs.open(cpfzcf_fn, "r", "utf-8")
for line in cpfzcf:
fields = line.strip().split("|")
if len(fields) == 11 \
zone = "z".join( fields[:2] ).lower()
if zone in zones:
state = fields[0]
+ description = fields[3].strip()
+ fips = "fips%s"%fields[6]
+ countycode = "%sc%s" % (state.lower(), fips[-3:])
if state:
zones[zone]["coastal_flood_statement"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
zones[zone]["flash_flood_statement"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
"flash_flood/statement/%s/%s.txt"
- % (state.lower(), zone))
+ % (state.lower(), countycode))
zones[zone]["flash_flood_warning"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
"flash_flood/warning/%s/%s.txt"
- % (state.lower(), zone))
+ % (state.lower(), countycode))
zones[zone]["flash_flood_watch"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
"flash_flood/watch/%s/%s.txt" % (state.lower(), zone))
- zones[zone]["flood_statement"] = (
- "https://tgftp.nws.noaa.gov/data/watches_warnings/"
- "flood/statement/%s/%s.txt" % (state.lower(), zone))
zones[zone]["flood_warning"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
- "flood/warning/%s/%s.txt" % (state.lower(), zone))
+ "flood/warning/%s/%s.txt"
+ % (state.lower(), countycode))
zones[zone]["severe_thunderstorm_warning"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
- "thunderstorm/%s/%s.txt" % (state.lower(), zone))
+ "thunderstorm/%s/%s.txt" % (state.lower(), countycode))
zones[zone]["severe_weather_statement"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
"severe_weather_stmt/%s/%s.txt"
- % (state.lower(), zone))
+ % (state.lower(), countycode))
zones[zone]["short_term_forecast"] = (
"https://tgftp.nws.noaa.gov/data/forecasts/nowcast/"
"%s/%s.txt" % (state.lower(), zone))
zones[zone]["state_forecast"] = (
"https://tgftp.nws.noaa.gov/data/forecasts/state/"
"%s/%s.txt" % (state.lower(), zone))
+ zones[zone]["tornado"] = (
+ "https://tgftp.nws.noaa.gov/data/watches_warnings/"
+ "tornado/%s/%s.txt" % (state.lower(), countycode))
zones[zone]["urgent_weather_message"] = (
"https://tgftp.nws.noaa.gov/data/watches_warnings/"
"non_precip/%s/%s.txt" % (state.lower(), zone))
zones[zone]["zone_forecast"] = (
"https://tgftp.nws.noaa.gov/data/forecasts/zone/"
"%s/%s.txt" % (state.lower(), zone))
- description = fields[3].strip()
- fips = "fips%s"%fields[6]
+ tzcode = fields[7]
+ if tzcode == "A":
+ zones[zone]["tz"] = "US/Alaska"
+ elif tzcode == "AH":
+ zones[zone]["tz"] = "US/Aleutian"
+ elif tzcode in ("C", "CE", "CM"):
+ zones[zone]["tz"] = "US/Central"
+ elif tzcode in ("E", "e"):
+ zones[zone]["tz"] = "US/Eastern"
+ elif tzcode == "F":
+ zones[zone]["tz"] = "Pacific/Guadalcanal"
+ elif tzcode == "G":
+ zones[zone]["tz"] = "Pacific/Guam"
+ elif tzcode == "H":
+ zones[zone]["tz"] = "US/Hawaii"
+ elif tzcode == "J":
+ zones[zone]["tz"] = "Japan"
+ elif tzcode == "K":
+ zones[zone]["tz"] = "Pacific/Kwajalein"
+ elif tzcode in ("M", "MC", "MP"):
+ zones[zone]["tz"] = "US/Mountain"
+ elif tzcode == "m":
+ zones[zone]["tz"] = "US/Arizona"
+ elif tzcode == "P":
+ zones[zone]["tz"] = "US/Pacific"
+ elif tzcode == "S":
+ zones[zone]["tz"] = "US/Samoa"
+ elif tzcode == "V":
+ zones[zone]["tz"] = "America/Virgin"
+ else:
+ zones[zone]["tz"] = ""
county = fields[5]
if county:
if description.endswith(county):
sys.stdout.write(message)
sys.stdout.flush()
count = 0
- gzcta = zipfile.ZipFile(gzcta_an).open(gzcta_fn, "rU")
+ gzcta = zipfile.ZipFile(gzcta_an).open(gzcta_fn, "r")
columns = gzcta.readline().decode("utf-8").strip().split("\t")
for line in gzcta:
fields = line.decode("utf-8").strip().split("\t")
removed = 0
changed = 0
overrides = configparser.ConfigParser()
- overrides.readfp( codecs.open(overrides_fn, "r", "utf8") )
+ overrides.read_file( codecs.open(overrides_fn, "r", "utf8") )
overrideslog = []
for section in overrides.sections():
addopt = 0
if os.path.exists(overrideslog_fn):
os.rename(overrideslog_fn, "%s_old"%overrideslog_fn)
overrideslog_fd = codecs.open(overrideslog_fn, "w", "utf8")
+ import time
+ overrideslog_fd.write(
+ '# Copyright (c) %s Jeremy Stanley <fungi@yuggoth.org>. Permission to\n'
+ '# use, copy, modify, and distribute this software is granted under terms\n'
+ '# provided in the LICENSE file distributed with this software.\n\n'
+ % time.gmtime().tm_year)
overrideslog_fd.writelines(overrideslog)
overrideslog_fd.close()
print("done (%s overridden sections: +%s/-%s/!%s)." % (
if type(element) is float: elements.append("%.7f"%element)
else: elements.append( repr(element) )
value = "(%s)"%", ".join(elements)
+ if type(value) is bytes:
+ value = value.decode("utf-8")
stations_fd.write( "\n%s = %s" % (key, value) )
count += 1
stations_fd.write("\n")
sys.stdout.write(message)
sys.stdout.flush()
airports = configparser.ConfigParser()
- airports.read(airports_fn)
+ airports.read(airports_fn, encoding="utf-8")
places = configparser.ConfigParser()
- places.read(places_fn)
+ places.read(places_fn, encoding="utf-8")
stations = configparser.ConfigParser()
- stations.read(stations_fn)
+ stations.read(stations_fn, encoding="utf-8")
zctas = configparser.ConfigParser()
- zctas.read(zctas_fn)
+ zctas.read(zctas_fn, encoding="utf-8")
zones = configparser.ConfigParser()
- zones.read(zones_fn)
+ zones.read(zones_fn, encoding="utf-8")
qalog = []
places_nocentroid = 0
places_nodescription = 0
zctas_nocentroid += 1
zones_nocentroid = 0
zones_nodescription = 0
+ zones_notz = 0
zones_noforecast = 0
zones_overlapping = 0
zonetable = {}
if not zones.has_option(zone, "description"):
qalog.append("%s: no description\n" % zone)
zones_nodescription += 1
+ if not zones.has_option(zone, "tz") or not zones.get(
+ zone, "tz") in zoneinfo.available_timezones():
+ qalog.append("%s: no time zone\n" % zone)
+ zones_notz += 1
if not zones.has_option(zone, "zone_forecast"):
qalog.append("%s: no forecast\n" % zone)
zones_noforecast += 1
if os.path.exists(qalog_fn):
os.rename(qalog_fn, "%s_old"%qalog_fn)
qalog_fd = codecs.open(qalog_fn, "w", "utf8")
+ import time
+ qalog_fd.write(
+ '# Copyright (c) %s Jeremy Stanley <fungi@yuggoth.org>. Permission to\n'
+ '# use, copy, modify, and distribute this software is granted under terms\n'
+ '# provided in the LICENSE file distributed with this software.\n\n'
+ % time.gmtime().tm_year)
qalog_fd.writelines(qalog)
qalog_fd.close()
if qalog:
print(" %s zones with no centroid"%zones_nocentroid)
if zones_nodescription:
print(" %s zones with no description"%zones_nodescription)
+ if zones_notz:
+ print(" %s zones with no time zone"%zones_notz)
if zones_noforecast:
print(" %s zones with no forecast"%zones_noforecast)
if zones_overlapping: