From aaf501aff18f0e0c91ef238c73a843df93e9ee1a Mon Sep 17 00:00:00 2001 From: Lukas Plevac Date: Fri, 29 Dec 2023 09:36:25 +0100 Subject: [PATCH] Added upload by chanks and lock for resources --- station/config.py | 9 ++++---- station/main.py | 33 +++++++++++++++++------------ station/planner.py | 10 ++++++--- station/puller.py | 51 +++++++++++++++++++++++++++++++++++++-------- station/recorder.py | 33 ++++++++++++++++++----------- station/rotator.py | 10 +-------- web/uploads.ini | 4 ++-- 7 files changed, 98 insertions(+), 52 deletions(-) diff --git a/station/config.py b/station/config.py index 84390b5..8f5c7e4 100644 --- a/station/config.py +++ b/station/config.py @@ -1,4 +1,5 @@ -masterUrl = "http://10.0.0.8" -apiKey = "6f44206f-6d59-4761-b5a2-07172ecea2e6" -pullInterval = 120 # in sec -planInterval = 1200 # in sec \ No newline at end of file +masterUrl = "http://10.0.0.8" +apiKey = "d0ec2b81-601b-481a-bde9-4e6699fd9297" +pullInterval = 120 # in sec +planInterval = 1200 # in sec +MaxUploadChunk = 5000000 # in bytes \ No newline at end of file diff --git a/station/main.py b/station/main.py index f7d4e26..1fbbd9e 100644 --- a/station/main.py +++ b/station/main.py @@ -7,11 +7,15 @@ from recorder import recorder import sys import traceback import planner +import threading -def onRecorded(info): - pass +from loguru import logger + +assigned = threading.Lock() +#assigned.release() i = 0 + while True: try: if (i % config.pullInterval) == 0: @@ -23,29 +27,32 @@ while True: jobsDeltas.append((job["start"] - datetime.utcnow()).total_seconds()) if job["start"] <= datetime.utcnow() + timedelta(seconds=60): - if job["end"] <= datetime.utcnow(): + if job["end"] <= datetime.utcnow() or not assigned.acquire(timeout=10): puller.setFail(job["id"]) puller.watingJobs.remove(job) + logger.debug("Canceling job {} because is ended lock state is {}".format(job["id"], assigned.locked())) + break + + logger.debug("I have lock") # start record puller.setRecording(job["id"]) - curRecorder = recorder(job, puller.location) + logger.debug("Starting record process for job {}".format(job["id"])) + curRecorder = recorder(job, puller.location, assigned) curRecorder.start() - - puller.watingJobs.remove(job) - if (i % 10) == 0 and len(jobsDeltas): - print(f"Next job in {min(jobsDeltas)}s") + puller.watingJobs.remove(job) # ask for planeble satellites if (i % config.planInterval) == 0: + logger.debug("Calling planner") planner.planAll(puller.location) i += 1 except Exception as inst: - print(f"[ERROR] main script fail restarting - error {inst}") + logger.error(f"main script fail restarting - error {inst}") # Get current system exception ex_type, ex_value, ex_traceback = sys.exc_info() @@ -59,8 +66,8 @@ while True: for trace in trace_back: stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (trace[0], trace[1], trace[2], trace[3])) - print("Exception type : %s " % ex_type.__name__) - print("Exception message : %s" %ex_value) - print("Stack trace : %s" %stack_trace) + logger.error("Exception type : %s " % ex_type.__name__) + logger.error("Exception message : %s" %ex_value) + logger.error("Stack trace : %s" %stack_trace) - time.sleep(1) \ No newline at end of file + time.sleep(1) diff --git a/station/planner.py b/station/planner.py index 0a43a90..e29910f 100644 --- a/station/planner.py +++ b/station/planner.py @@ -5,13 +5,15 @@ from operator import itemgetter import puller +from loguru import logger + def plan(lat, lon, alt, tle, transmitter, receiver, priority, name, delta = timedelta(seconds=1800), predictH = 12, horizon = 5): #prevent plan same obsevation last = datetime.utcnow() plans = [] for ob in puller.watingJobs: - last = max(ob["start"], last) + last = max(ob["end"], last) orb = Orbital(name, line1=tle["line1"], line2=tle["line2"]) @@ -30,7 +32,7 @@ def plan(lat, lon, alt, tle, transmitter, receiver, priority, name, delta = time end = ob[1] if start <= (last + timedelta(seconds=60)): # must be minute after last - print(f"[INFO] alredy planed {name} at {start}") + #logger.debug(f"alredy planed {name} at {start} skiping") continue plans.append({ @@ -41,6 +43,8 @@ def plan(lat, lon, alt, tle, transmitter, receiver, priority, name, delta = time "priority": priority }) + logger.debug(f"planed {name} at {start}") + return plans def planAll(location): @@ -70,4 +74,4 @@ def planAll(location): elif plans[i]["priority"] > plans[i + 1]["priority"]: plans.pop(i + 1) else: - i += 1 \ No newline at end of file + i += 1 diff --git a/station/puller.py b/station/puller.py index 85d4b0f..87d585e 100644 --- a/station/puller.py +++ b/station/puller.py @@ -4,8 +4,8 @@ from urllib.request import urlopen import requests import json import os - import pathlib +from loguru import logger watingJobs = [] location = {} @@ -26,7 +26,15 @@ def getPlaneble(): return data_json def apiSend(url, data, files=None): - r = requests.post(url=config.masterUrl + url, data=data, files=files) + try: + r = requests.post(url=config.masterUrl + url, data=data, files=files, timeout=10) + except requests.Timeout: + logger.error("Api send fail timeout {}".format(config.masterUrl + url)) + return None + except requests.ConnectionError: + logger.error("Api send fail connection error {}".format(config.masterUrl + url)) + return None + return r.text def plan(transmitter, receiver, start, end): @@ -55,20 +63,45 @@ def setDecoding(observation): def setSuccess(observation): apiSend("/api/observation/success", {"id": observation}) -def setArtefacts(adir, observation): - ufiles = {} # open('file.txt','rb') +def read_in_chunks(file_object, chunk_size=5000000): + while True: + data = file_object.read(chunk_size) + if not data: + break + yield data - print("Uploading artefacts") +def setArtefacts(adir, observation): + logger.debug("Uploading artefacts") for path, subdirs, files in os.walk(adir): for name in files: afile = os.path.join(path, name) fileName = str(afile).replace(str(adir), "").replace("/", "\\") - print(fileName) - ufiles[fileName] = open(afile, 'rb') + aPath = str(path).replace(str(adir), "").replace("/", "\\") + ufile = open(afile, 'rb') + + index = 0 + offset = 0 + + for chunk in read_in_chunks(ufile): + offset = index + len(chunk) + + logger.debug(f"Sending file {fileName} chunk with offset {index}") + if apiSend("/api/observation/addArtefacts", { + "id": observation, + "fname": name, + "path": aPath, + "offset": index, + "data": chunk + }) is None: + logger.error(f"Sending file {fileName} fail in chunk with offset {index}") + break - apiSend("/api/observation/addArtefacts", {"id": observation}, ufiles) + index = offset + + + return True def parseNewJobs(jobs): @@ -97,4 +130,4 @@ def pull(): info = getInfo() parseInfo(info) jobs = getNewJobs() - parseNewJobs(jobs) \ No newline at end of file + parseNewJobs(jobs) diff --git a/station/recorder.py b/station/recorder.py index 3a74e6d..ab420cc 100644 --- a/station/recorder.py +++ b/station/recorder.py @@ -8,6 +8,8 @@ import config import time import datetime +from loguru import logger + # A recursive function to remove the folder def del_folder(path): for sub in path.iterdir(): @@ -22,13 +24,14 @@ def del_folder(path): path.rmdir() class recorder(threading.Thread): - def __init__(self, job, location): + def __init__(self, job, location, lock): threading.Thread.__init__(self) self.job = job self.location = location + self.lock = lock def run(self): - print(f"Recorder for job {self.job['target']['name']} started") + logger.debug(f"Recorder for job {self.job['target']['name']} started") recordTime = (self.job["end"] - self.job["start"]).total_seconds() @@ -52,6 +55,11 @@ class recorder(threading.Thread): ret = os.system(f"satdump record {baseband} --source {self.job['receiver']['params']['radio']} --samplerate {fs} --frequency {self.job['transmitter']['centerFrequency']} --gain {self.job['receiver']['params']['gain']} --baseband_format s8 --timeout {recordTime}") + rotatorCTR.kill() + self.lock.release() + + logger.debug("Release lock") + if ret != 0: # fail to open sdr puller.setFail(self.job["id"]) return @@ -61,11 +69,6 @@ class recorder(threading.Thread): print(f"Recorder for job {self.job['target']['name']} stoped") puller.setRecorded(self.job["id"]) - rotatorCTR.kill() - - if self.job["proccessPipe"] == []: - return - puller.setDecoding(self.job["id"]) #create artecats dir @@ -88,11 +91,17 @@ class recorder(threading.Thread): for k, v in replacements.items(): pipe = pipe.replace(k, v) - os.system(pipe) + ret = os.system(pipe) + + if ret != 0: + logger.error("Process pipe {} fail".format(pipe)) puller.setSuccess(self.job["id"]) - puller.setArtefacts(adir, self.job["id"]) + logger.debug("Starting upload of artifacts") + + if not puller.setArtefacts(adir, self.job["id"]): + puller.setFail(self.job["id"]) # remove basband record os.remove(str(baseband) + ".s8") @@ -101,8 +110,8 @@ class recorder(threading.Thread): path = Path(adir) try: del_folder(path) - print("Directory removed successfully") + logger.debug("Directory with artifacts removed successfully") except OSError as o: - print(f"Error, {o.strerror}: {path}") - + logger.debug(f"Error, {o.strerror}: {path}") + logger.debug("Recored done") diff --git a/station/rotator.py b/station/rotator.py index d2f2a0d..13049ab 100644 --- a/station/rotator.py +++ b/station/rotator.py @@ -20,7 +20,7 @@ class rotator(threading.Thread): #init pyorbytal orb = Orbital(self.job["target"]["name"], line1=self.job["target"]["locator"]["tle"]["line1"], line2=self.job["target"]["locator"]["tle"]["line2"]) - while (True): + while (not self.killed): az, el = orb.get_observer_look( utc_time=datetime.utcnow() + timedelta(seconds=5), lon=self.station["lon"], @@ -32,17 +32,9 @@ class rotator(threading.Thread): print(f"[INFO] rotator az: {az}, el: {el}") self.driver.set_azel(az, el) - - if (self.killed): - break time.sleep(10) - # home the rotator on end - self.driver.reset() - - time.sleep(60) - def kill(self): diff --git a/web/uploads.ini b/web/uploads.ini index be50262..b5e5f53 100644 --- a/web/uploads.ini +++ b/web/uploads.ini @@ -1,2 +1,2 @@ -upload_max_filesize = 10G -post_max_size = 10G \ No newline at end of file +upload_max_filesize = 10M +post_max_size = 10M \ No newline at end of file