aboutsummaryrefslogtreecommitdiffstats
path: root/gandi_dns_update.py
diff options
context:
space:
mode:
Diffstat (limited to 'gandi_dns_update.py')
-rw-r--r--gandi_dns_update.py203
1 files changed, 203 insertions, 0 deletions
diff --git a/gandi_dns_update.py b/gandi_dns_update.py
new file mode 100644
index 0000000..6bce6fb
--- /dev/null
+++ b/gandi_dns_update.py
@@ -0,0 +1,203 @@
+#
+#
+
+"""
+@author: <sylvain.herledan@hrafnagud.info>
+@date: 2021-04-10
+"""
+
+import os
+import sys
+import json
+import socket
+import typing
+import logging
+import argparse
+import http.client
+
+logger = logging.getLogger(__name__)
+
+
+def get_ip_from_ifconfig_me(
+ ) -> typing.Tuple[bool, typing.Optional[bytes]]:
+ """"""
+ conn = http.client.HTTPSConnection('ifconfig.me') # nosec
+
+ try:
+ conn.request('GET', '/')
+ r = conn.getresponse()
+ data = r.read()
+ r.close()
+ except socket.gaierror:
+ logger.error('ifconfig.me unavailable')
+ return False, None
+ except socket.timeout:
+ logger.error('ifconfig.me timeout')
+ return False, None
+ finally:
+ conn.close()
+
+ return True, data
+
+
+def get_ip_from_ipecho_net(
+ ) -> typing.Tuple[bool, typing.Optional[bytes]]:
+ """"""
+ conn = http.client.HTTPSConnection('ipecho.net') # nosec
+
+ try:
+ conn.request('GET', '/plain')
+ r = conn.getresponse()
+ data = r.read()
+ r.close()
+ except socket.gaierror:
+ logger.error('ipecho.net unavailable')
+ return False, None
+ except socket.timeout:
+ logger.error('ipecho.net timeout')
+ return False, None
+ finally:
+ conn.close()
+
+ return True, data
+
+
+def get_ip_from_ipinfo_io(
+ ) -> typing.Tuple[bool, typing.Optional[bytes]]:
+ """"""
+ conn = http.client.HTTPSConnection('ipinfo.io') # nosec
+ try:
+ conn.request('GET', '/ip')
+ r = conn.getresponse()
+ data = r.read()
+ r.close()
+ except socket.gaierror:
+ logger.error('ipinfo.io unavailable')
+ return False, None
+ except socket.timeout:
+ logger.error('ipinfo.io timeout')
+ return False, None
+ finally:
+ conn.close()
+
+ return True, data
+
+
+def update_record(api_key: str,
+ fqdn: str,
+ r_name: str,
+ r_type: str,
+ ip_address: str
+ ) -> typing.Tuple[bool, typing.Optional[bytes]]:
+ """"""
+ headers = {'Authorization': f'Apikey {api_key}'}
+ _payload = {'rrset_values': [ip_address, ],
+ 'rrset_ttl': 300}
+ payload = json.dumps(_payload)
+
+ ok = False
+ conn = http.client.HTTPSConnection('api.gandi.net') # nosec
+ try:
+ conn.request('PUT',
+ f'/v5/livedns/domains/{fqdn}/records/{r_name}/{r_type}',
+ headers=headers, body=payload)
+ r = conn.getresponse()
+ ok = (201 == r.status)
+ data = r.read()
+ r.close()
+ except socket.gaierror:
+ logger.error('api.gandi.net unavailable')
+ return False, None
+ except socket.timeout:
+ logger.error('api.gandi.net timeout')
+ return False, None
+ finally:
+ conn.close()
+
+ return ok, data
+
+
+if '__main__' == __name__:
+ # Setup logging
+ main_logger = logging.getLogger()
+ handler = logging.StreamHandler()
+ handler.setLevel(logging.DEBUG)
+ main_logger.addHandler(handler)
+ main_logger.setLevel(logging.DEBUG)
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('config_path', type=str)
+ args = parser.parse_args()
+
+ cfg_path = args.config_path
+ if not os.path.exists(cfg_path):
+ logger.error(f'"{cfg_path}" configuration file not found')
+ sys.exit(1)
+
+ # Get public IP address from various sources
+ ip_addresses = []
+
+ ok, ip1 = get_ip_from_ifconfig_me()
+ if (ok is True) and (ip1 is not None):
+ ip_addresses.append(ip1.decode('utf-8'))
+
+ ok, ip2 = get_ip_from_ipecho_net()
+ if (ok is True) and (ip2 is not None):
+ ip_addresses.append(ip2.decode('utf-8'))
+
+ ok, ip3 = get_ip_from_ipinfo_io()
+ if (ok is True) and (ip3 is not None):
+ ip_addresses.append(ip3.decode('utf-8'))
+
+ if 0 >= len(ip_addresses):
+ logger.error('Could not get public IP address from any source')
+ sys.exit(1)
+
+ # Make sure the collected addresses match
+ ip_checks = [_ for _ in ip_addresses if _ == ip_addresses[0]]
+ if len(ip_checks) < len(ip_addresses):
+ logger.error('Public IP addresses do not match')
+ sys.exit(1)
+
+ ip = ip_addresses[0]
+
+ # Read configuration
+ cfg = None
+ with open(cfg_path, 'r') as f:
+ try:
+ cfg = json.load(f)
+ except ValueError:
+ logger.error(f'{cfg_path} is not a valid JSON file')
+
+ if cfg is None:
+ logger.error('Impossible to load configuration')
+ sys.exit(1)
+
+ api_key = cfg.get('api_key', None)
+ fqdn = cfg.get('fqdn', None)
+ record_name = cfg.get('record_name', None)
+ history_path = cfg.get('history_path', None)
+
+ if None in (api_key, fqdn, record_name, history_path):
+ logger.error('Configuration is missing a required field')
+ sys.exit(1)
+
+ old_ip = None
+ if os.path.exists(history_path):
+ with open(history_path, 'r') as f:
+ old_ip = f.read()
+
+ if ip == old_ip:
+ logger.info('IP address has not changed since last DNS record update.')
+ sys.exit(0)
+
+ # Update DNS record
+ ok, result = update_record(api_key, fqdn, record_name, 'A', ip)
+
+ if ok is False:
+ logger.error('Failed to update DNS record using api.gandi.net')
+ logger.debug(result)
+ sys.exit(1)
+
+ with open(history_path, 'w') as f:
+ f.write(ip)