YAGS/station/recorder.py

127 lines
4.0 KiB
Python

import os
import puller
import threading
from rotator import rotator
from simplecom import simplecom
from pathlib import Path
import config
import time
import datetime
from loguru import logger
# A recursive function to remove the folder
def del_folder(path):
for sub in path.iterdir():
if sub.is_dir():
# Delete directory if it's a subdirectory
del_folder(sub)
else :
# Delete file if it is a file:
sub.unlink()
# This removes the top-level folder:
path.rmdir()
class recorder(threading.Thread):
def __init__(self, job, location, lock):
threading.Thread.__init__(self)
self.job = job
self.location = location
self.lock = lock
def run(self):
logger.debug(f"Recorder for job {self.job['target']['name']} started")
recordTime = (self.job["end"] - self.job["start"]).total_seconds()
#init rotator
rotatorDriver = simplecom("/dev/ttyUSB0")
rotatorCTR = rotator(rotatorDriver, self.job, self.location)
rotatorCTR.start()
baseband = f"records/{self.job['id']}"
fs = max(self.job["receiver"]["params"]["fs"])
# find supported FS
for sample in self.job["receiver"]["params"]["fs"]:
if (sample > int(self.job['transmitter']['bandwidth'])) and (sample < fs):
fs = sample
time.sleep(50)
realStart = int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
ret = os.system(f"satdump record {baseband} --source {self.job['receiver']['params']['radio']} --samplerate {fs} --frequency {self.job['transmitter']['centerFrequency']} --gain {self.job['receiver']['params']['gain']} --baseband_format s8 --timeout {recordTime}")
rotatorCTR.kill()
self.lock.release()
logger.debug("Release lock")
if ret != 0: # fail to open sdr
puller.setFail(self.job["id"])
return
realEnd = int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
print(f"Recorder for job {self.job['target']['name']} stoped")
puller.setRecorded(self.job["id"])
puller.setDecoding(self.job["id"])
#create artecats dir
adir = f"artefacts/{self.job['id']}"
os.makedirs(adir)
replacements = {
"{baseband}": str(baseband) + ".s8",
"{fs}": str(fs),
"{artefactDir}": str(adir),
"{freq}": str(self.job['transmitter']['centerFrequency']),
"{targetNum}": ''.join(x for x in self.job['target']['name'] if x.isdigit()),
"{target}": self.job['target']['name'],
"{start}": str(realStart),
"{end}": str(realEnd)
}
for pipe in self.job["proccessPipe"]:
#ok now replace
for k, v in replacements.items():
pipe = pipe.replace(k, v)
ret = os.system(pipe)
if ret != 0:
logger.error("Process pipe {} fail".format(pipe))
puller.setSuccess(self.job["id"])
# compress artifacts
if config.compress:
logger.debug("Compressing artifacts")
os.system("find " + adir + " -name '*.png' -exec mogrify -format jpg -quality " + str(config.compressJpgQ) + " {} +")
os.system(f"rm -f {adir}/*.png")
os.system(f"tar -czvf {adir}/baseband.tar.gz {adir}/*.s8")
os.system(f"rm -f {adir}/*.s8")
logger.debug("Starting upload of artifacts")
if not puller.setArtefacts(adir, self.job["id"]):
puller.setFail(self.job["id"])
# remove basband record
os.remove(str(baseband) + ".s8")
# remove artefacts
path = Path(adir)
try:
del_folder(path)
logger.debug("Directory with artifacts removed successfully")
except OSError as o:
logger.debug(f"Error, {o.strerror}: {path}")
logger.debug("Recored done")