docker-hosts-provider/monitor.py
2024-11-06 14:53:01 +01:00

194 lines
No EOL
6.6 KiB
Python

#!/usr/bin/env python3
# dbus-1-devel glib2-devel
import docker
import time
import os
import logging
import ipaddress
from pathlib import Path
from collections import defaultdict
from xml.etree import ElementTree as ET
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Configuration
DOMAIN = os.environ.get("DOMAIN", "home.arpa")
HOSTS_FILE = "/etc/docker_hosts"
UPDATE_INTERVAL = 2 # seconds
# Define subnet to IP mapping structure
class SubnetBasedIPs:
def __init__(self):
self.networks = [] # List of (network, ip) tuples
def add_ip(self, ip, subnet):
try:
network = ipaddress.ip_network(subnet, strict=False)
self.networks.append((network, ip))
except ValueError as e:
logging.error(f"Invalid subnet {subnet} for IP {ip}: {e}")
def get_best_ip(self, client_ip):
try:
client = ipaddress.ip_address(client_ip)
for network, ip in self.networks:
if client in network:
return ip
# If no matching subnet found, return the first IP
return self.networks[0][1] if self.networks else None
except ValueError as e:
logging.error(f"Invalid client IP {client_ip}: {e}")
return self.networks[0][1] if self.networks else None
def get_macvlan_networks():
"""Get all Docker networks that use the macvlan driver."""
client = docker.from_env()
networks = client.networks.list()
macvlan_networks = []
for network in networks:
if network.attrs.get('Driver') == 'macvlan':
config = network.attrs.get('IPAM', {}).get('Config', [])
for cfg in config:
subnet = cfg.get('Subnet')
if subnet:
macvlan_networks.append((network.name, subnet))
return macvlan_networks
def get_container_ips():
client = docker.from_env()
container_ips = defaultdict(SubnetBasedIPs)
macvlan_networks = get_macvlan_networks()
if not macvlan_networks:
logging.warning("No macvlan networks found!")
return container_ips
logging.info(f"Monitoring macvlan networks: {macvlan_networks}")
# Create network name to subnet mapping
network_subnets = {name: subnet for name, subnet in macvlan_networks}
for container in client.containers.list():
hostname = container.attrs['Config'].get('Hostname')
container_name = container.name
networks = container.attrs['NetworkSettings']['Networks']
logging.debug(f"Checking container: {container_name} (hostname: {hostname})")
for network_name, network_config in networks.items():
if network_name in network_subnets:
ip = network_config.get('IPAddress')
subnet = network_subnets[network_name]
logging.debug(f"Found IP for {network_name} ({subnet}): {ip}")
if ip and container_name:
container_ips[f"{container_name}.{DOMAIN}"].add_ip(ip, subnet)
return container_ips
def get_macvlan_networks():
"""Get all Docker networks that use the macvlan driver."""
client = docker.from_env()
networks = client.networks.list()
macvlan_networks = []
for network in networks:
if network.attrs.get('Driver') == 'macvlan':
config = network.attrs.get('IPAM', {}).get('Config', [])
for cfg in config:
subnet = cfg.get('Subnet')
if subnet:
macvlan_networks.append((network.name, subnet))
logging.info(f"Found macvlan network: {network.name} with subnet: {subnet}")
return macvlan_networks
def generate_hosts_entries(container_ips):
"""Generate all possible host entries for CoreDNS to use."""
entries = []
# Get all unique subnets from macvlan networks
test_subnets = [subnet for _, subnet in get_macvlan_networks()]
logging.info(f"Generated entries for subnets: {test_subnets}")
for hostname, subnet_ips in container_ips.items():
# Generate entries for each detected subnet
for subnet in test_subnets:
try:
# Use a representative IP from each subnet for testing
test_ip = str(next(ipaddress.ip_network(subnet).hosts()))
best_ip = subnet_ips.get_best_ip(test_ip)
if best_ip:
entries.append(f"{best_ip} {hostname}")
except Exception as e:
logging.error(f"Error generating entry for {hostname} subnet {subnet}: {e}")
return entries
def update_hosts_file(container_ips):
# Create hosts file content
hosts_content = "# Auto-generated by docker-dns-monitor\n"
hosts_content += "127.0.0.1 localhost\n\n"
# Add entries for each subnet and remove duplicates
entries = list(set(generate_hosts_entries(container_ips)))
hosts_content += "\n".join(entries)
logging.info("Writing hosts file with content:")
logging.info(hosts_content)
# Ensure directory exists
os.makedirs(os.path.dirname(HOSTS_FILE), exist_ok=True)
try:
# Write to temporary file first
temp_file = f"{HOSTS_FILE}.tmp"
with open(HOSTS_FILE, 'w') as f:
f.write(hosts_content)
# Check file permissions
# os.chmod(temp_file, 0o644)
# Atomic replace
# os.rename(temp_file, HOSTS_FILE)
logging.info(f"Successfully updated {HOSTS_FILE}")
except Exception as e:
logging.error(f"Error writing hosts file: {e}")
raise
def main():
logging.info("Starting Docker DNS monitor...")
# avahi_publisher = AvahiPublisher()
while True:
try:
current_ips = get_container_ips()
if current_ips:
logging.info("Current DNS mappings:")
for hostname, subnet_ips in sorted(current_ips.items()):
for network, ip in subnet_ips.networks:
logging.info(f" {hostname} -> {ip} (subnet: {network})")
update_hosts_file(current_ips)
# avahi_publisher.publish_hosts(current_ips)
else:
logging.warning("No containers found on macvlan networks.")
time.sleep(UPDATE_INTERVAL)
except Exception as e:
logging.error(f"Error: {e}")
time.sleep(UPDATE_INTERVAL)
if __name__ == "__main__":
main()