Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import geoip2.database
import sys
from collections import defaultdict
reader = geoip2.database.Reader('./GeoLite2-Country.mmdb')
try:
with sys.stdin as file:
for rec in file:
try:
parts = rec.strip().split('|')
ip = parts[0]
from_country = None
try:
from_country = reader.country(ip).country.name
except geoip2.errors.AddressNotFoundError:
from_country = 'Unknown'
print '{}|{}'.format(from_country,'|'.join(parts))
# print '{} | {} {} {} | {} | {} | {}'.format(from_country, date[0], date[1], date[2][:4], ip, parts[1][1:13], parts[1][parts[1].find(':')+1:-1])
except:
pass # ignore all errors for one string
except KeyboardInterrupt:
exit(0)
except:
raise
lines = f.readlines()
for lineWithEnding in lines:
line = lineWithEnding.rstrip('\n').rstrip('\r')
parts = re.split(r'\t', line)
ip = parts[ipColumnFromZero]
try:
if len(ip) == 0:
print(line + "\t")
continue
response = reader.city(ip)
city = response.city.name
if city != None:
print(line + "\t" + city)
else:
print(line + "\t")
except geoip2.errors.AddressNotFoundError:
print(line + "\t")
"to get the latest version.")
return None
else:
db_age = datetime.now() - datetime.fromtimestamp(
os.stat(db_path).st_mtime)
if db_age > timedelta(days=7):
logger.warning("GeoLite2-Country.mmdb is more than a week old")
db_path = db_path
db_reader = geoip2.database.Reader(db_path)
country = None
try:
country = db_reader.country(ip_address).country.iso_code
except geoip2.errors.AddressNotFoundError:
pass
return country
def country(self, address: str) -> str:
"""
Look up an ip address in the db and return it's country code.
"""
default_value = ''
if self.db is None:
return default_value
try:
return str(self.db.country(address).country.iso_code)
except geoip2.errors.AddressNotFoundError:
return default_value
except ValueError as e: # pragma: no cover
self._logger.exception("ValueError: %s", e)
return default_value
def geoip(ipaddr):
# 获取IP地理位置
try:
reader = geoip2.database.Reader('data/GeoLite2-City.mmdb')
response = reader.city(ipaddr)
country = response.country.names["zh-CN"]
site = response.subdivisions.most_specific.names.get("zh-CN")
if not site:
site = ''
city = response.city.names.get("zh-CN")
if not city:
city = ''
address = '{} {} {}'.format(country, site, city)
except FileNotFoundError:
address = 'Geoip File Not Found'
except (KeyError, geoip2.errors.AddressNotFoundError):
address = 'Address Not In Databases'
except Exception as e:
logging.exception(e)
address = 'None'
console('GeoIP', ipaddr, 'Address: {}\n'.format(address))
console('GeoIP', ipaddr, 'Ipaddr: {}\n'.format(ipaddr))
return address
def geoip(text, reply):
if not geoip_reader:
return "GeoIP database is updating, please wait a minute"
try:
ip = socket.gethostbyname(text)
except socket.gaierror:
return "Invalid input."
try:
location_data = geoip_reader.city(ip)
except geoip2.errors.AddressNotFoundError:
return "Sorry, I can't locate that in my database."
data = {
"cc": location_data.country.iso_code or "N/A",
"country": location_data.country.name or "Unknown",
"city": location_data.city.name or "Unknown",
"region": ", " + location_data.subdivisions.most_specific.name or ""
}
reply("\x02Country:\x02 {country} ({cc}), \x02City:\x02 {city}{region}".format(**data))
# check the DB
update_db()
Attempts to return resolved information about the specified IP Address.
If such an attempt fails, returns None.
"""
if not ip_address:
return None
try:
parsed_ip = IPAddress(ip_address)
except AddrFormatError:
return ResolvedLocation("invalid_ip", None, self.sync_token, None)
# Try geoip classification
try:
geoinfo = self.geoip_db.country(ip_address)
except geoip2.errors.AddressNotFoundError:
geoinfo = None
if self.amazon_ranges is None or parsed_ip not in self.amazon_ranges:
if geoinfo:
return ResolvedLocation(
"internet", geoinfo.country.iso_code, self.sync_token, geoinfo.country.iso_code,
)
return ResolvedLocation("internet", None, self.sync_token, None)
return ResolvedLocation(
"aws", None, self.sync_token, geoinfo.country.iso_code if geoinfo else None
)
def find_location(ip):
reader = Reader(TannerConfig.get('DATA', 'geo_db'))
try:
location = reader.city(ip)
info = dict(
country=location.country.name,
country_code=location.country.iso_code,
city=location.city.name,
zip_code=location.postal.code,
)
except geoip2.errors.AddressNotFoundError:
info = "NA" # When IP doesn't exist in the db, set info as "NA - Not Available"
return info
from king_phisher import errors
from king_phisher import find
from king_phisher import ipaddress
import geoip2.database
import geoip2.errors
import maxminddb.errors
import requests
__all__ = ('init_database', 'lookup', 'GeoLocation')
DB_DOWNLOAD_URL = 'https://zerosteiner.s3.amazonaws.com/data/GeoLite2-City.tar.gz'
DB_RESULT_FIELDS = ('city', 'continent', 'coordinates', 'country', 'postal_code', 'time_zone')
"""A tuple listing the fields that are required in database results."""
AddressNotFoundError = geoip2.errors.AddressNotFoundError
_geoip_db = None
_geoip_db_lock = threading.Lock()
logger = logging.getLogger('KingPhisher.GeoIP')
Coordinates = collections.namedtuple('Coordinates', ('latitude', 'longitude'))
"""A named tuple for representing GPS coordinates."""
def _normalize_encoding(word):
if sys.version_info[0] == 2 and isinstance(word, unicode):
word = word.encode('ascii', 'ignore')
return word
def download_geolite2_city_db(dest, license=None, date=None):
"""
Download the GeoLite2 database and save it to disk.