import java.io.IOException as IOException if get_context().localMode: raise Exception ("Must run in exclusive mode") set_exec_pars(name="archiver", open=True) DATA_PATH = get_exec_pars().path #expand_path("{data}/{date}/{name}") def get_data_path(channel): path = DATA_PATH channel = channel.replace(":", "_") #FILENAMES DO NOT SUPPORT : return path + "/"+ channel + ".txt" if "stop_short_term_archiver" in globals(): stop_short_term_archiver() def _log(msg): print msg log(msg) _log("Data path:" + str(DATA_PATH)) def archive_channel(channel, interval): _log("Enter: " + str(channel)) try: monitored = interval is True polling = 0 if monitored else int (interval * 1000) #dev = "ca://" + channel + ("?monitored=true" if monitored else ") while True: try: with Channel(channel, monitored, polling) as ch: filename = get_data_path(channel) with open(filename, 'a') as data_file: _log("Opened: " + filename + " monitor: " + str(monitored) + " polling: " + str(polling)) channel_listener = ChannelListener(data_file) ch.addListener(channel_listener) ch.update() while True: if filename != get_data_path(channel): _log("Closing: " + filename) break time.sleep(1.0) except IOException: _log("Error processing channel " + str(channel) + ": " + str( sys.exc_info()[1])) time.sleep(60.0 * 10) """ class Timestamp(Readable): def read(self): timestamp = ch.takeTimestamped().timestamp return timestamp timestamp = Timestamp() devices = (ch, timestamp) if monitored: mscan(channel, devices, take_initial=True, display=False, keep=False, tag=channel) else: tscan(devices, -1, interval*100, display=False, keep=False, tag=channel) """ except Exception as ex: _log("ERROR: " + str( sys.exc_info()[1])) traceback.print_exc() finally: _log("Exit: " + str(channel)) def archive_monitor(): global DATA_PATH _log("Start Monitor") try: while(True): path = expand_path(get_context().config.dataPath) if path != DATA_PATH: _log("Day change") set_exec_pars(open=False) set_exec_pars(name="archiver", open=True) DATA_PATH = path #get_exec_pars().path _log("Data path:" + str(DATA_PATH)) #os.makedirs(path) time.sleep(1.0) finally: _log("Stop Monitor") short_term_archiver_cfg = {} short_term_archiver_tasks=[] short_term_archiver_monitor=None def start_short_term_archiver(metadata=False): global short_term_archiver_cfg global short_term_archiver_tasks global short_term_archiver_monitor print "Starting Short Term Archiver" short_term_archiver_cfg = parse_short_term_archiver_config(metadata) short_term_archiver_tasks = fork(* [[archive_channel,[channel, interval,]] for channel,interval in short_term_archiver_cfg.items()]) short_term_archiver_monitor = fork(archive_monitor)[0] def wait_short_term_archiver(): global short_term_archiver_tasks join(short_term_archiver_tasks) join(short_term_archiver_monitor) def stop_short_term_archiver(): global short_term_archiver_tasks print "Stopping Short Term Archiver" for task in short_term_archiver_tasks: try: task.cancel(True) except: pass try: short_term_archiver_monitor.cancel(True) except: pass try: start_short_term_archiver() wait_short_term_archiver() finally: stop_short_term_archiver()