import socket import json import os, sys from random import randint from time import sleep, time HOSTS = {} UDP_IP = "255.255.255.255" UDP_PORT = 5005 PROT_HDR = "SATURNARCH " SEND_TIME = None TYPE = "slave" if os.path.exists("/etc/slurm-llnl/MASTER"): TYPE = "master" MASTER = None sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.bind((UDP_IP, UDP_PORT)) def nfsDone(): if MASTER is None: return False with open('/etc/fstab') as myfile: if MASTER["ip"] in myfile.read(): return True return False def get_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(0) try: # doesn't even have to be reachable s.connect(('8.8.8.8', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP def tryReadFile(fname, defval): try: with open(fname, 'r') as file: return file.read().replace('\n', '') except: pass return defval def selfInfo(): return { "ip": get_ip(), "type": TYPE, "name": socket.gethostname(), "cpus": os.cpu_count(), "rams": os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES'), "res": json.loads(tryReadFile("/etc/slurm-llnl/localRes", "[]")), "mac": tryReadFile("/etc/slurm-llnl/localMac", "00:00:00:00:00:00"), "version": int(tryReadFile("/etc/slurm-llnl/confVer", 0)) } def loadHosts(): global HOSTS, MASTER try: with open('/etc/slurm-llnl/hosts.json', 'r') as file: HOSTS = json.load(file) except: HOSTS = {} if TYPE == "master": MASTER = selfInfo() else: for host in HOSTS.values(): if host["type"] == "master": MASTER = host def updateVersion(val): with open("/etc/slurm-llnl/confVer", "w") as outfile: outfile.write(str(val)) def updateHosts(): with open("/etc/slurm-llnl/hosts.json", "w") as outfile: json.dump(HOSTS, outfile) def generateSlurmConfig(source, target): if MASTER is None: return selfInfoData = selfInfo() hosts = f"NodeName={socket.gethostname()} NodeAddr={get_ip()} CPUs={os.cpu_count()} RealMemory={int(selfInfoData["rams"] / (1024.**2))} Gres={",".join(selfInfoData["res"])} State=UNKNOWN\n" # first is my self noMasterHosts = "" for host in HOSTS.values(): hosts += f"NodeName={host["name"]} NodeAddr={host["ip"]} CPUs={host["cpus"]} RealMemory={int(host["rams"] / (1024.**2))} Gres={",".join(host["res"])} State=UNKNOWN\n" noMasterHosts += f"{host["name"]}," if len(noMasterHosts) > 0: noMasterHosts = noMasterHosts[:-1] with open(source) as f: newText=f.read().replace('{%hosts%}', hosts).replace('{%noMasterHosts%}', noMasterHosts).replace('{%masterName%}', MASTER["name"]).replace('{%masterIP%}', MASTER["ip"]) with open(target, "w") as f: f.write(newText) def generateHosts(target): fileStr = """# Auto generated by SaturnArch 127.0.0.1\tlocalhost ::1\tlocalhost ip6-localhost ip6-loopback ff02::1\tip6-allnodes ff02::2\tip6-allrouters """ fileStr += f"{get_ip()}\t{socket.gethostname()}\n" # first is my self for host in HOSTS.values(): fileStr += f"{host["ip"]}\t{host["name"]}\n" with open(target, "w") as outfile: outfile.write(fileStr) def self_announcement(): MESSAGE = (PROT_HDR + json.dumps(selfInfo())).encode("ASCII") sock.sendto(MESSAGE, (UDP_IP, UDP_PORT)) ## Start program loadHosts() self_announcement() while True: if SEND_TIME is not None and SEND_TIME < int(time()): print(f"Sending self announcement") self_announcement() SEND_TIME = None sock.settimeout(None) data, addr = None, None try: data, addr = sock.recvfrom(1024) data = data.decode("ASCII") except socket.timeout: continue if not data.startswith(PROT_HDR): continue data = data[len(PROT_HDR):] # remove header data = json.loads(data) if data["ip"] == get_ip(): continue if data["ip"] in HOSTS and data == HOSTS[data["ip"]]: continue print(f"Discover new HOST {data}") if data["type"] == "master": MASTER = data HOSTS[data["ip"]] = data updateHosts() generateHosts("/etc/hosts") # configure network disks if TYPE == "slave" and MASTER is not None and not nfsDone(): os.system(f"echo \"{MASTER['ip']}:/clusterfs /clusterfs nfs defaults 0 0\" >> /etc/fstab") os.system(f"echo \"{MASTER['ip']}:/home /home nfs defaults 0 0\" >> /etc/fstab") os.system("mount -a") os.system("cp -f /clusterfs/config/munge.key /etc/munge/munge.key") os.system("cat /clusterfs/config/maintenance.pub >> /home/maintenance/.ssh/authorized_keys") generateSlurmConfig("/clusterfs/config/slurm.conf.template", "/etc/slurm-llnl/slurm.conf") if int(tryReadFile("/etc/slurm-llnl/confVer", 0)) < data["version"]: updateVersion(data["version"]) # reset all services os.system("systemctl restart munge") os.system("systemctl restart slurmd") if TYPE == "master": os.system("systemctl restart slurmctld") # plan next send waitTime = randint(10,100) print(f"Plan self announcement at T+{waitTime}s") SEND_TIME = int(time()) + waitTime sock.settimeout(waitTime / 2)