122 lines
4.0 KiB
Python
122 lines
4.0 KiB
Python
import java.io.IOException as IOException
|
|
|
|
if get_context().localMode:
|
|
raise Exception ("Must run in exclusive mode")
|
|
|
|
|
|
|
|
set_exec_pars(name="archiver", open=True)
|
|
DATA_PATH = get_exec_pars().path #expand_path("{data}/{date}/{name}")
|
|
|
|
def get_data_path(channel):
|
|
path = DATA_PATH
|
|
channel = channel.replace(":", "_") #FILENAMES DO NOT SUPPORT :
|
|
return path + "/"+ channel + ".txt"
|
|
|
|
|
|
if "stop_short_term_archiver" in globals():
|
|
stop_short_term_archiver()
|
|
|
|
def _log(msg):
|
|
print msg
|
|
log(msg)
|
|
|
|
_log("Data path:" + str(DATA_PATH))
|
|
|
|
|
|
def archive_channel(channel, interval):
|
|
_log("Enter: " + str(channel))
|
|
|
|
try:
|
|
monitored = interval is True
|
|
polling = 0 if monitored else int (interval * 1000)
|
|
#dev = "ca://" + channel + ("?monitored=true" if monitored else ")
|
|
while True:
|
|
try:
|
|
with Channel(channel, monitored, polling) as ch:
|
|
filename = get_data_path(channel)
|
|
with open(filename, 'a') as data_file:
|
|
_log("Opened: " + filename + " monitor: " + str(monitored) + " polling: " + str(polling))
|
|
channel_listener = ChannelListener(data_file)
|
|
ch.addListener(channel_listener)
|
|
ch.update()
|
|
while True:
|
|
if filename != get_data_path(channel):
|
|
_log("Closing: " + filename)
|
|
break
|
|
time.sleep(1.0)
|
|
except IOException:
|
|
_log("Error processing channel " + str(channel) + ": " + str( sys.exc_info()[1]))
|
|
time.sleep(60.0 * 10)
|
|
"""
|
|
class Timestamp(Readable):
|
|
def read(self):
|
|
timestamp = ch.takeTimestamped().timestamp
|
|
return timestamp
|
|
timestamp = Timestamp()
|
|
devices = (ch, timestamp)
|
|
if monitored:
|
|
mscan(channel, devices, take_initial=True, display=False, keep=False, tag=channel)
|
|
else:
|
|
tscan(devices, -1, interval*100, display=False, keep=False, tag=channel)
|
|
"""
|
|
|
|
except Exception as ex:
|
|
_log("ERROR: " + str( sys.exc_info()[1]))
|
|
traceback.print_exc()
|
|
finally:
|
|
_log("Exit: " + str(channel))
|
|
|
|
|
|
def archive_monitor():
|
|
global DATA_PATH
|
|
_log("Start Monitor")
|
|
try:
|
|
while(True):
|
|
path = expand_path(get_context().config.dataPath)
|
|
if path != DATA_PATH:
|
|
_log("Day change")
|
|
set_exec_pars(open=False)
|
|
set_exec_pars(name="archiver", open=True)
|
|
DATA_PATH = path #get_exec_pars().path
|
|
_log("Data path:" + str(DATA_PATH))
|
|
#os.makedirs(path)
|
|
time.sleep(1.0)
|
|
finally:
|
|
_log("Stop Monitor")
|
|
|
|
short_term_archiver_cfg = {}
|
|
short_term_archiver_tasks=[]
|
|
short_term_archiver_monitor=None
|
|
|
|
def start_short_term_archiver(metadata=False):
|
|
global short_term_archiver_cfg
|
|
global short_term_archiver_tasks
|
|
global short_term_archiver_monitor
|
|
print "Starting Short Term Archiver"
|
|
short_term_archiver_cfg = parse_short_term_archiver_config(metadata)
|
|
short_term_archiver_tasks = fork(* [[archive_channel,[channel, interval,]] for channel,interval in short_term_archiver_cfg.items()])
|
|
short_term_archiver_monitor = fork(archive_monitor)[0]
|
|
|
|
def wait_short_term_archiver():
|
|
global short_term_archiver_tasks
|
|
join(short_term_archiver_tasks)
|
|
join(short_term_archiver_monitor)
|
|
|
|
def stop_short_term_archiver():
|
|
global short_term_archiver_tasks
|
|
print "Stopping Short Term Archiver"
|
|
for task in short_term_archiver_tasks:
|
|
try:
|
|
task.cancel(True)
|
|
except:
|
|
pass
|
|
try:
|
|
short_term_archiver_monitor.cancel(True)
|
|
except:
|
|
pass
|
|
try:
|
|
start_short_term_archiver()
|
|
wait_short_term_archiver()
|
|
finally:
|
|
stop_short_term_archiver() |