Files
miniarchiver/script/archiver/short_term_archiver.py
gobbo_a 8837d77401
2024-05-23 08:55:33 +02:00

122 lines
4.0 KiB
Python

import java.io.IOException as IOException
if get_context().localMode:
raise Exception ("Must run in exclusive mode")
set_exec_pars(name="archiver", open=True)
DATA_PATH = get_exec_pars().path #expand_path("{data}/{date}/{name}")
def get_data_path(channel):
path = DATA_PATH
channel = channel.replace(":", "_") #FILENAMES DO NOT SUPPORT :
return path + "/"+ channel + ".txt"
if "stop_short_term_archiver" in globals():
stop_short_term_archiver()
def _log(msg):
print msg
log(msg)
_log("Data path:" + str(DATA_PATH))
def archive_channel(channel, interval):
_log("Enter: " + str(channel))
try:
monitored = interval is True
polling = 0 if monitored else int (interval * 1000)
#dev = "ca://" + channel + ("?monitored=true" if monitored else ")
while True:
try:
with Channel(channel, monitored, polling) as ch:
filename = get_data_path(channel)
with open(filename, 'a') as data_file:
_log("Opened: " + filename + " monitor: " + str(monitored) + " polling: " + str(polling))
channel_listener = ChannelListener(data_file)
ch.addListener(channel_listener)
ch.update()
while True:
if filename != get_data_path(channel):
_log("Closing: " + filename)
break
time.sleep(1.0)
except IOException:
_log("Error processing channel " + str(channel) + ": " + str( sys.exc_info()[1]))
time.sleep(60.0 * 10)
"""
class Timestamp(Readable):
def read(self):
timestamp = ch.takeTimestamped().timestamp
return timestamp
timestamp = Timestamp()
devices = (ch, timestamp)
if monitored:
mscan(channel, devices, take_initial=True, display=False, keep=False, tag=channel)
else:
tscan(devices, -1, interval*100, display=False, keep=False, tag=channel)
"""
except Exception as ex:
_log("ERROR: " + str( sys.exc_info()[1]))
traceback.print_exc()
finally:
_log("Exit: " + str(channel))
def archive_monitor():
global DATA_PATH
_log("Start Monitor")
try:
while(True):
path = expand_path(get_context().config.dataPath)
if path != DATA_PATH:
_log("Day change")
set_exec_pars(open=False)
set_exec_pars(name="archiver", open=True)
DATA_PATH = path #get_exec_pars().path
_log("Data path:" + str(DATA_PATH))
#os.makedirs(path)
time.sleep(1.0)
finally:
_log("Stop Monitor")
short_term_archiver_cfg = {}
short_term_archiver_tasks=[]
short_term_archiver_monitor=None
def start_short_term_archiver(metadata=False):
global short_term_archiver_cfg
global short_term_archiver_tasks
global short_term_archiver_monitor
print "Starting Short Term Archiver"
short_term_archiver_cfg = parse_short_term_archiver_config(metadata)
short_term_archiver_tasks = fork(* [[archive_channel,[channel, interval,]] for channel,interval in short_term_archiver_cfg.items()])
short_term_archiver_monitor = fork(archive_monitor)[0]
def wait_short_term_archiver():
global short_term_archiver_tasks
join(short_term_archiver_tasks)
join(short_term_archiver_monitor)
def stop_short_term_archiver():
global short_term_archiver_tasks
print "Stopping Short Term Archiver"
for task in short_term_archiver_tasks:
try:
task.cancel(True)
except:
pass
try:
short_term_archiver_monitor.cancel(True)
except:
pass
try:
start_short_term_archiver()
wait_short_term_archiver()
finally:
stop_short_term_archiver()