# # """ @author: @date: 2021-04-10 """ import os import sys import json import socket import typing import logging import argparse import http.client logger = logging.getLogger(__name__) def get_ip_from_ifconfig_me( ) -> typing.Tuple[bool, typing.Optional[bytes]]: """""" conn = http.client.HTTPSConnection('ifconfig.me') # nosec try: conn.request('GET', '/') r = conn.getresponse() data = r.read() r.close() except socket.gaierror: logger.error('ifconfig.me unavailable') return False, None except socket.timeout: logger.error('ifconfig.me timeout') return False, None finally: conn.close() return True, data def get_ip_from_ipecho_net( ) -> typing.Tuple[bool, typing.Optional[bytes]]: """""" conn = http.client.HTTPSConnection('ipecho.net') # nosec try: conn.request('GET', '/plain') r = conn.getresponse() data = r.read() r.close() except socket.gaierror: logger.error('ipecho.net unavailable') return False, None except socket.timeout: logger.error('ipecho.net timeout') return False, None finally: conn.close() return True, data def get_ip_from_ipinfo_io( ) -> typing.Tuple[bool, typing.Optional[bytes]]: """""" conn = http.client.HTTPSConnection('ipinfo.io') # nosec try: conn.request('GET', '/ip') r = conn.getresponse() data = r.read() r.close() except socket.gaierror: logger.error('ipinfo.io unavailable') return False, None except socket.timeout: logger.error('ipinfo.io timeout') return False, None finally: conn.close() return True, data def update_record(api_key: str, fqdn: str, r_name: str, r_type: str, ip_address: str ) -> typing.Tuple[bool, typing.Optional[bytes]]: """""" headers = {'Authorization': f'Apikey {api_key}'} _payload = {'rrset_values': [ip_address, ], 'rrset_ttl': 300} payload = json.dumps(_payload) ok = False conn = http.client.HTTPSConnection('api.gandi.net') # nosec try: conn.request('PUT', f'/v5/livedns/domains/{fqdn}/records/{r_name}/{r_type}', headers=headers, body=payload) r = conn.getresponse() ok = (201 == r.status) data = r.read() r.close() except socket.gaierror: logger.error('api.gandi.net unavailable') return False, None except socket.timeout: logger.error('api.gandi.net timeout') return False, None finally: conn.close() return ok, data if '__main__' == __name__: # Setup logging main_logger = logging.getLogger() handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) main_logger.addHandler(handler) main_logger.setLevel(logging.DEBUG) parser = argparse.ArgumentParser() parser.add_argument('config_path', type=str) args = parser.parse_args() cfg_path = args.config_path if not os.path.exists(cfg_path): logger.error(f'"{cfg_path}" configuration file not found') sys.exit(1) # Get public IP address from various sources ip_addresses = [] ok, ip1 = get_ip_from_ifconfig_me() if (ok is True) and (ip1 is not None): ip_addresses.append(ip1.decode('utf-8')) ok, ip2 = get_ip_from_ipecho_net() if (ok is True) and (ip2 is not None): ip_addresses.append(ip2.decode('utf-8')) ok, ip3 = get_ip_from_ipinfo_io() if (ok is True) and (ip3 is not None): ip_addresses.append(ip3.decode('utf-8')) if 0 >= len(ip_addresses): logger.error('Could not get public IP address from any source') sys.exit(1) # Make sure the collected addresses match ip_checks = [_ for _ in ip_addresses if _ == ip_addresses[0]] if len(ip_checks) < len(ip_addresses): logger.error('Public IP addresses do not match') sys.exit(1) ip = ip_addresses[0] # Read configuration cfg = None with open(cfg_path, 'r') as f: try: cfg = json.load(f) except ValueError: logger.error(f'{cfg_path} is not a valid JSON file') if cfg is None: logger.error('Impossible to load configuration') sys.exit(1) api_key = cfg.get('api_key', None) fqdn = cfg.get('fqdn', None) record_name = cfg.get('record_name', None) history_path = cfg.get('history_path', None) if None in (api_key, fqdn, record_name, history_path): logger.error('Configuration is missing a required field') sys.exit(1) old_ip = None if os.path.exists(history_path): with open(history_path, 'r') as f: old_ip = f.read() if ip == old_ip: logger.info('IP address has not changed since last DNS record update.') sys.exit(0) # Update DNS record ok, result = update_record(api_key, fqdn, record_name, 'A', ip) if ok is False: logger.error('Failed to update DNS record using api.gandi.net') logger.debug(result) sys.exit(1) with open(history_path, 'w') as f: f.write(ip)