summaryrefslogtreecommitdiffstats
path: root/scripts/python
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/python')
-rw-r--r--scripts/python/.env5
-rwxr-xr-xscripts/python/powerdns_mailcow_dkim_patcher.py144
2 files changed, 149 insertions, 0 deletions
diff --git a/scripts/python/.env b/scripts/python/.env
new file mode 100644
index 0000000..10eb8dc
--- /dev/null
+++ b/scripts/python/.env
@@ -0,0 +1,5 @@
+ENDPOINT_PDNS=http://example.com
+APIKEY_PDNS=
+ENDPOINT_MAILCOW=https://example.com
+APIKEY_MAILCOW=
+
diff --git a/scripts/python/powerdns_mailcow_dkim_patcher.py b/scripts/python/powerdns_mailcow_dkim_patcher.py
new file mode 100755
index 0000000..95137e2
--- /dev/null
+++ b/scripts/python/powerdns_mailcow_dkim_patcher.py
@@ -0,0 +1,144 @@
+#!/usr/bin/python3
+"""
+PowerDNS <-> Mailcow DKIM Public Key patching script.
+
+Created and Last modified: 13/09/2021 by Georg Pfuetzenreuter <georg@lysergic.dev>
+
+Example use cases:
+ - Administration failure caused individual zones to contain faulty DKIM records
+ - Automation failure caused individual zones to contain faulty DKIM records
+ - Security incident required replacing DKIM private keys
+ - Security audit required manipulating the DKIM records of selected zones
+
+The public key provided by Mailcow is considered as the source of truth.
+"""
+import requests
+import sys
+import os
+from dotenv import load_dotenv
+
+if len(sys.argv) > 1:
+ domain = sys.argv[1]
+else:
+ print("Specify the domain name")
+ sys.exit(1)
+
+load_dotenv()
+# POWERDNS SETTINGS
+ENDPOINT_PDNS = os.environ.get('ENDPOINT_PDNS')
+APIKEY_PDNS = os.environ.get('APIKEY_PDNS')
+
+# MAILCOW SETTINGS
+ENDPOINT_MAILCOW = os.environ.get('ENDPOINT_MAILCOW')
+APIKEY_MAILCOW = os.environ.get('APIKEY_MAILCOW')
+
+if None in (ENDPOINT_PDNS, APIKEY_PDNS, ENDPOINT_MAILCOW, APIKEY_MAILCOW):
+ print("Could not load environment variables. Please check your .env file.")
+ sys.exit(0)
+
+print("Scanning " + domain)
+
+# QUERY POWERDNS
+URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + './export'
+try:
+ response = requests.get(
+ URL,
+ headers = {'accept': 'text/plain', 'X-API-Key': APIKEY_PDNS},
+ )
+ data = response.text
+ #print(data)
+ for record in data.split('\n'):
+ if '._domainkey' in record:
+ #print(record)
+ txtis = record.split('"')[1]
+ try:
+ txtis
+ except NameError:
+ print("No DKIM TXT record found.")
+ #else:
+ #print(txtis)
+except requests.exceptions.ConnectionError as err:
+ print("Connection failed.")
+ sys.exit(1)
+except requests.exceptions.HTTPError as err:
+ print(err)
+ sys.exit(1)
+
+# QUERY MAILCOW (can I put cow emoji in comment?)
+server = ENDPOINT_MAILCOW
+api_key = APIKEY_MAILCOW
+api = '/api/v1'
+get = api + '/get'
+URL = server + get + '/dkim/' + domain
+try:
+ response = requests.get(
+ URL,
+ headers = {'accept': 'application/json', 'X-API-Key': api_key},
+ )
+ data = response.json()
+ selector = data['dkim_selector']
+ txtshould = data['dkim_txt']
+ length = data['length']
+ pubkey = data['pubkey']
+ #print(f"Domain: {domain}\nSelector: {selector}\nTXT: {txtshould}\nPublic Key: {pubkey}")
+ #print(txtshould)
+except KeyError:
+ print("No or faulty DKIM lookup from Mailcow. ABORTING.")
+ sys.exit(1)
+except requests.exceptions.ConnectionError as err:
+ print("Connection failed.")
+ sys.exit(1)
+except requests.exceptions.HTTPError as err:
+ print(err)
+ sys.exit(1)
+# COMPARE
+if txtis == txtshould:
+ print(domain + ": All good!")
+if txtis != txtshould:
+ print("Mismatch!")
+ print("Patching SPF and DMARC ...")
+ URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + "."
+ payload = {
+ "rrsets": [{"name": domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\"v=spf1 mx a -all\"", "disabled": False, "name": domain + "."}, {"content": "\"v=DMARC1; p=reject; rua=mailto:system@lysergic.dev\"", "disabled": False, "name": domain + "."}]}]
+ }
+ response = requests.patch(
+ URL,
+ headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
+ json = payload,
+ )
+ status = response.status_code
+ if status == 204:
+ print("SPF/DMARC: OK!")
+ elif status == 422:
+ print("SPF/DMARC: Failed:")
+ print(response.json())
+ sys.exit(1)
+ else:
+ print("Unhandled error.")
+ print(status)
+ print(response.json())
+ sys.exit(1)
+ print("Patching DKIM ...")
+ payload = {
+ "rrsets": [{"name": selector + "._domainkey." + domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\""+ txtshould + "\"", "disabled": False, "name": selector + "._domainkey." + domain + "."}]}]
+ }
+ response = requests.patch(
+ URL,
+ headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
+ json = payload,
+ )
+ status = response.status_code
+ if status == 204:
+ print("DKIM: OK!")
+ elif status == 422:
+ print("DKIM: Failed:")
+ print(response.json())
+ sys.exit(1)
+ else:
+ print("Unhandled error.")
+ print(status)
+ print(response.json())
+ sys.exit(1)
+ print("Done.")
+ sys.exit(0)
+