#!/usr/bin/env python3 import docker import time import os import logging import ipaddress from pathlib import Path from collections import defaultdict from xml.etree import ElementTree as ET # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # Configuration DOMAIN = os.environ.get("DOMAIN", "home.arpa") HOSTS_FILE = "/etc/docker_hosts" UPDATE_INTERVAL = 2 # seconds # Define subnet to IP mapping structure class SubnetBasedIPs: def __init__(self): self.networks = [] # List of (network, ip) tuples def add_ip(self, ip, subnet): try: network = ipaddress.ip_network(subnet, strict=False) self.networks.append((network, ip)) except ValueError as e: logging.error(f"Invalid subnet {subnet} for IP {ip}: {e}") def get_best_ip(self, client_ip): try: client = ipaddress.ip_address(client_ip) for network, ip in self.networks: if client in network: return ip # If no matching subnet found, return the first IP return self.networks[0][1] if self.networks else None except ValueError as e: logging.error(f"Invalid client IP {client_ip}: {e}") return self.networks[0][1] if self.networks else None def get_macvlan_networks(): """Get all Docker networks that use the macvlan driver.""" client = docker.from_env() networks = client.networks.list() macvlan_networks = [] for network in networks: if network.attrs.get('Driver') == 'macvlan': config = network.attrs.get('IPAM', {}).get('Config', []) for cfg in config: subnet = cfg.get('Subnet') if subnet: macvlan_networks.append((network.name, subnet)) return macvlan_networks def get_container_ips(): client = docker.from_env() container_ips = defaultdict(SubnetBasedIPs) macvlan_networks = get_macvlan_networks() if not macvlan_networks: logging.warning("No macvlan networks found!") return container_ips logging.info(f"Monitoring macvlan networks: {macvlan_networks}") # Create network name to subnet mapping network_subnets = {name: subnet for name, subnet in macvlan_networks} for container in client.containers.list(): hostname = container.attrs['Config'].get('Hostname') container_name = container.name networks = container.attrs['NetworkSettings']['Networks'] logging.debug(f"Checking container: {container_name} (hostname: {hostname})") for network_name, network_config in networks.items(): if network_name in network_subnets: ip = network_config.get('IPAddress') subnet = network_subnets[network_name] logging.debug(f"Found IP for {network_name} ({subnet}): {ip}") if ip and container_name: container_ips[f"{container_name}.{DOMAIN}"].add_ip(ip, subnet) return container_ips def get_macvlan_networks(): """Get all Docker networks that use the macvlan driver.""" client = docker.from_env() networks = client.networks.list() macvlan_networks = [] for network in networks: if network.attrs.get('Driver') == 'macvlan': config = network.attrs.get('IPAM', {}).get('Config', []) for cfg in config: subnet = cfg.get('Subnet') if subnet: macvlan_networks.append((network.name, subnet)) logging.info(f"Found macvlan network: {network.name} with subnet: {subnet}") return macvlan_networks def generate_hosts_entries(container_ips): """Generate all possible host entries for CoreDNS to use.""" entries = [] # Get all unique subnets from macvlan networks test_subnets = [subnet for _, subnet in get_macvlan_networks()] logging.info(f"Generated entries for subnets: {test_subnets}") for hostname, subnet_ips in container_ips.items(): # Generate entries for each detected subnet for subnet in test_subnets: try: # Use a representative IP from each subnet for testing test_ip = str(next(ipaddress.ip_network(subnet).hosts())) best_ip = subnet_ips.get_best_ip(test_ip) if best_ip: entries.append(f"{best_ip} {hostname}") except Exception as e: logging.error(f"Error generating entry for {hostname} subnet {subnet}: {e}") return entries def update_hosts_file(container_ips): # Create hosts file content hosts_content = "# Auto-generated by docker-hosts-provider\n" hosts_content += "127.0.0.1 localhost\n\n" # Add entries for each subnet and remove duplicates entries = list(set(generate_hosts_entries(container_ips))) hosts_content += "\n".join(entries) logging.info("Writing hosts file with content:") logging.info(hosts_content) # Ensure directory exists os.makedirs(os.path.dirname(HOSTS_FILE), exist_ok=True) try: with open(HOSTS_FILE, 'w') as f: f.write(hosts_content) logging.info(f"Successfully updated {HOSTS_FILE}") except Exception as e: logging.error(f"Error writing hosts file: {e}") raise def main(): logging.info("Starting Docker DNS monitor...") # avahi_publisher = AvahiPublisher() while True: try: current_ips = get_container_ips() if current_ips: logging.info("Current DNS mappings:") for hostname, subnet_ips in sorted(current_ips.items()): for network, ip in subnet_ips.networks: logging.info(f" {hostname} -> {ip} (subnet: {network})") update_hosts_file(current_ips) # avahi_publisher.publish_hosts(current_ips) else: logging.warning("No containers found on macvlan networks.") time.sleep(UPDATE_INTERVAL) except Exception as e: logging.error(f"Error: {e}") time.sleep(UPDATE_INTERVAL) if __name__ == "__main__": main()