Added upload by chanks and lock for resources

This commit is contained in:
Lukáš Plevač 2023-12-29 09:36:25 +01:00
parent 7188f67683
commit aaf501aff1
7 changed files with 98 additions and 52 deletions

View File

@ -1,4 +1,5 @@
masterUrl = "http://10.0.0.8"
apiKey = "6f44206f-6d59-4761-b5a2-07172ecea2e6"
pullInterval = 120 # in sec
planInterval = 1200 # in sec
masterUrl = "http://10.0.0.8"
apiKey = "d0ec2b81-601b-481a-bde9-4e6699fd9297"
pullInterval = 120 # in sec
planInterval = 1200 # in sec
MaxUploadChunk = 5000000 # in bytes

View File

@ -7,11 +7,15 @@ from recorder import recorder
import sys
import traceback
import planner
import threading
def onRecorded(info):
pass
from loguru import logger
assigned = threading.Lock()
#assigned.release()
i = 0
while True:
try:
if (i % config.pullInterval) == 0:
@ -23,29 +27,32 @@ while True:
jobsDeltas.append((job["start"] - datetime.utcnow()).total_seconds())
if job["start"] <= datetime.utcnow() + timedelta(seconds=60):
if job["end"] <= datetime.utcnow():
if job["end"] <= datetime.utcnow() or not assigned.acquire(timeout=10):
puller.setFail(job["id"])
puller.watingJobs.remove(job)
logger.debug("Canceling job {} because is ended lock state is {}".format(job["id"], assigned.locked()))
break
logger.debug("I have lock")
# start record
puller.setRecording(job["id"])
curRecorder = recorder(job, puller.location)
logger.debug("Starting record process for job {}".format(job["id"]))
curRecorder = recorder(job, puller.location, assigned)
curRecorder.start()
puller.watingJobs.remove(job)
if (i % 10) == 0 and len(jobsDeltas):
print(f"Next job in {min(jobsDeltas)}s")
puller.watingJobs.remove(job)
# ask for planeble satellites
if (i % config.planInterval) == 0:
logger.debug("Calling planner")
planner.planAll(puller.location)
i += 1
except Exception as inst:
print(f"[ERROR] main script fail restarting - error {inst}")
logger.error(f"main script fail restarting - error {inst}")
# Get current system exception
ex_type, ex_value, ex_traceback = sys.exc_info()
@ -59,8 +66,8 @@ while True:
for trace in trace_back:
stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (trace[0], trace[1], trace[2], trace[3]))
print("Exception type : %s " % ex_type.__name__)
print("Exception message : %s" %ex_value)
print("Stack trace : %s" %stack_trace)
logger.error("Exception type : %s " % ex_type.__name__)
logger.error("Exception message : %s" %ex_value)
logger.error("Stack trace : %s" %stack_trace)
time.sleep(1)
time.sleep(1)

View File

@ -5,13 +5,15 @@ from operator import itemgetter
import puller
from loguru import logger
def plan(lat, lon, alt, tle, transmitter, receiver, priority, name, delta = timedelta(seconds=1800), predictH = 12, horizon = 5):
#prevent plan same obsevation
last = datetime.utcnow()
plans = []
for ob in puller.watingJobs:
last = max(ob["start"], last)
last = max(ob["end"], last)
orb = Orbital(name, line1=tle["line1"], line2=tle["line2"])
@ -30,7 +32,7 @@ def plan(lat, lon, alt, tle, transmitter, receiver, priority, name, delta = time
end = ob[1]
if start <= (last + timedelta(seconds=60)): # must be minute after last
print(f"[INFO] alredy planed {name} at {start}")
#logger.debug(f"alredy planed {name} at {start} skiping")
continue
plans.append({
@ -41,6 +43,8 @@ def plan(lat, lon, alt, tle, transmitter, receiver, priority, name, delta = time
"priority": priority
})
logger.debug(f"planed {name} at {start}")
return plans
def planAll(location):
@ -70,4 +74,4 @@ def planAll(location):
elif plans[i]["priority"] > plans[i + 1]["priority"]:
plans.pop(i + 1)
else:
i += 1
i += 1

View File

@ -4,8 +4,8 @@ from urllib.request import urlopen
import requests
import json
import os
import pathlib
from loguru import logger
watingJobs = []
location = {}
@ -26,7 +26,15 @@ def getPlaneble():
return data_json
def apiSend(url, data, files=None):
r = requests.post(url=config.masterUrl + url, data=data, files=files)
try:
r = requests.post(url=config.masterUrl + url, data=data, files=files, timeout=10)
except requests.Timeout:
logger.error("Api send fail timeout {}".format(config.masterUrl + url))
return None
except requests.ConnectionError:
logger.error("Api send fail connection error {}".format(config.masterUrl + url))
return None
return r.text
def plan(transmitter, receiver, start, end):
@ -55,20 +63,45 @@ def setDecoding(observation):
def setSuccess(observation):
apiSend("/api/observation/success", {"id": observation})
def setArtefacts(adir, observation):
ufiles = {} # open('file.txt','rb')
def read_in_chunks(file_object, chunk_size=5000000):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
print("Uploading artefacts")
def setArtefacts(adir, observation):
logger.debug("Uploading artefacts")
for path, subdirs, files in os.walk(adir):
for name in files:
afile = os.path.join(path, name)
fileName = str(afile).replace(str(adir), "").replace("/", "\\")
print(fileName)
ufiles[fileName] = open(afile, 'rb')
aPath = str(path).replace(str(adir), "").replace("/", "\\")
ufile = open(afile, 'rb')
index = 0
offset = 0
for chunk in read_in_chunks(ufile):
offset = index + len(chunk)
logger.debug(f"Sending file {fileName} chunk with offset {index}")
if apiSend("/api/observation/addArtefacts", {
"id": observation,
"fname": name,
"path": aPath,
"offset": index,
"data": chunk
}) is None:
logger.error(f"Sending file {fileName} fail in chunk with offset {index}")
break
apiSend("/api/observation/addArtefacts", {"id": observation}, ufiles)
index = offset
return True
def parseNewJobs(jobs):
@ -97,4 +130,4 @@ def pull():
info = getInfo()
parseInfo(info)
jobs = getNewJobs()
parseNewJobs(jobs)
parseNewJobs(jobs)

View File

@ -8,6 +8,8 @@ import config
import time
import datetime
from loguru import logger
# A recursive function to remove the folder
def del_folder(path):
for sub in path.iterdir():
@ -22,13 +24,14 @@ def del_folder(path):
path.rmdir()
class recorder(threading.Thread):
def __init__(self, job, location):
def __init__(self, job, location, lock):
threading.Thread.__init__(self)
self.job = job
self.location = location
self.lock = lock
def run(self):
print(f"Recorder for job {self.job['target']['name']} started")
logger.debug(f"Recorder for job {self.job['target']['name']} started")
recordTime = (self.job["end"] - self.job["start"]).total_seconds()
@ -52,6 +55,11 @@ class recorder(threading.Thread):
ret = os.system(f"satdump record {baseband} --source {self.job['receiver']['params']['radio']} --samplerate {fs} --frequency {self.job['transmitter']['centerFrequency']} --gain {self.job['receiver']['params']['gain']} --baseband_format s8 --timeout {recordTime}")
rotatorCTR.kill()
self.lock.release()
logger.debug("Release lock")
if ret != 0: # fail to open sdr
puller.setFail(self.job["id"])
return
@ -61,11 +69,6 @@ class recorder(threading.Thread):
print(f"Recorder for job {self.job['target']['name']} stoped")
puller.setRecorded(self.job["id"])
rotatorCTR.kill()
if self.job["proccessPipe"] == []:
return
puller.setDecoding(self.job["id"])
#create artecats dir
@ -88,11 +91,17 @@ class recorder(threading.Thread):
for k, v in replacements.items():
pipe = pipe.replace(k, v)
os.system(pipe)
ret = os.system(pipe)
if ret != 0:
logger.error("Process pipe {} fail".format(pipe))
puller.setSuccess(self.job["id"])
puller.setArtefacts(adir, self.job["id"])
logger.debug("Starting upload of artifacts")
if not puller.setArtefacts(adir, self.job["id"]):
puller.setFail(self.job["id"])
# remove basband record
os.remove(str(baseband) + ".s8")
@ -101,8 +110,8 @@ class recorder(threading.Thread):
path = Path(adir)
try:
del_folder(path)
print("Directory removed successfully")
logger.debug("Directory with artifacts removed successfully")
except OSError as o:
print(f"Error, {o.strerror}: {path}")
logger.debug(f"Error, {o.strerror}: {path}")
logger.debug("Recored done")

View File

@ -20,7 +20,7 @@ class rotator(threading.Thread):
#init pyorbytal
orb = Orbital(self.job["target"]["name"], line1=self.job["target"]["locator"]["tle"]["line1"], line2=self.job["target"]["locator"]["tle"]["line2"])
while (True):
while (not self.killed):
az, el = orb.get_observer_look(
utc_time=datetime.utcnow() + timedelta(seconds=5),
lon=self.station["lon"],
@ -32,17 +32,9 @@ class rotator(threading.Thread):
print(f"[INFO] rotator az: {az}, el: {el}")
self.driver.set_azel(az, el)
if (self.killed):
break
time.sleep(10)
# home the rotator on end
self.driver.reset()
time.sleep(60)
def kill(self):

View File

@ -1,2 +1,2 @@
upload_max_filesize = 10G
post_max_size = 10G
upload_max_filesize = 10M
post_max_size = 10M