1 id + locks + live : more frequent, last known points, already sent points + ms points + fixed jump time

This commit is contained in:
l_samenv
2024-08-16 09:55:30 +02:00
parent f4cf8fd9a5
commit 9e08f47916
5 changed files with 83 additions and 73 deletions

View File

@ -291,6 +291,7 @@ let graphs = (function (){
let resolution = undefined;
let created = false;
let activateUpdateTimeout = undefined;
let container = document.createElement('div');
container.classList.add("graphs-container");
@ -452,6 +453,14 @@ let graphs = (function (){
showLegends(legendFlag, false);
if (legendFlag) adjustLegends();
if (activateUpdateTimeout !== undefined){
clearTimeout(activateUpdateTimeout)
}
activateUpdateTimeout = setTimeout(function(){
activateUpdates();
}, 1000);
// Was here before
// result = AJAX( "http://" + hostPort +
// "/updategraph?variables=" + variables() +
@ -662,6 +671,7 @@ let graphs = (function (){
}
}
console.log("RELOAD")
activateUpdates();
// Was here before
// AJAX( "http://" + hostPort + "/updategraph?id=" + clientID).getJSON(); // activate updates
// result = AJAX("http://" + hostPort +
@ -796,11 +806,11 @@ let graphs = (function (){
}
/**
* Sets the resolution of the viewing window in seconds
* Sets the resolution of the viewing window in milliseconds
* @param {*} timeDelta - The difference between the maximum time and the minimum time of the window
*/
function setResolution(timeDelta){
resolution = Math.ceil((timeDelta / container.getBoundingClientRect().width)/1000)
resolution = Math.ceil((timeDelta / container.getBoundingClientRect().width))
}
function jumpToDate(dateTimestampMs, timeValueMs, mode){
@ -938,7 +948,6 @@ let graphs = (function (){
ngraphs = 0;
createGraphs();
globalIndicators.getIndicatorsMap()[datesKey].update(currentMinTime);
activateUpdates();
});
}
@ -1037,7 +1046,6 @@ let graphs = (function (){
createGraphs();
activateUpdates();
});
}

View File

@ -1,3 +1,11 @@
Legend:
U <msg>: what to do on an update <msg> on server side event connection
R <reply> : what to do on an AJAX request <reply>
CHAIN <something>: initiate a request <something>
U id : if showMain : CHAIN getblock/main (accept-block, draw, redraw)
R draw: if main: if showConsole: CHAIN console (accept-console)

View File

@ -101,13 +101,13 @@ class InfluxDataGetter:
return res
def poll_last_values(self, variables, time):
def poll_last_values(self, variables, lastvalues, end_time):
"""
Polls the lastest values for the given variables that are in the range [time[1]-30s, time[1]].
Polls the lastest values for the given variables since their last known point to end_time.
Parameters :
variables ([(str)]) : an array of variable names (Influx) to get the last known values for
time ([int]) : the current asked timerange in the calling polling function (only second value used). It consists of two values which are Unix timestamps in seconds, first included, second excluded.
end_time (int) : the Unix timestamp in seconds of the last point in time to include the values in
Returns :
{(str):[[(int), (float)]]} : a dictionnary of points. The key is the name of the influx variable, and the value is an array of pairs (also array), the first value being the Unix timestamp in second (x), the seconds being the value (y).
@ -120,8 +120,8 @@ class InfluxDataGetter:
if variable_name_for_query.endswith(".target"):
variable_name_for_query = variable_name_for_query[:-len(".target")]
is_target = True
points = self._get_last_values(variable_name_for_query, is_target, time)
start_time = int(lastvalues[variable_name_for_query][0]) if variable_name_for_query in lastvalues.keys() else None
points = self._get_last_values(variable_name_for_query, is_target,start_time, end_time)
if len(points) > 0 :
res[variable] = points
return res
@ -148,10 +148,10 @@ class InfluxDataGetter:
to_add = []
query = f"""
from(bucket: "{self._bucket}")
|> range(start: {times[0] if all else 0}, stop: {times[1] + 1})
|> range(start: {times[0] if all=="True" else 0}, stop: {times[1] + 1})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r._field == "setup_info")
{"" if all else "|> last()"}
{"" if all=="True" else "|> last()"}
|> yield(name: "res")
"""
tables = self._db.query(query)
@ -159,7 +159,7 @@ class InfluxDataGetter:
for record in table.records:
to_add.append(ast.literal_eval(record.get_value()))
if all:
if all == "True":
query = f"""
from(bucket: "{self._bucket}")
|> range(start: 0, stop: {times[0]+1})
@ -356,7 +356,7 @@ class InfluxDataGetter:
tables = self._db.query(query)
for table in tables:
for record in table.records:
t = int(datetime.timestamp(record.get_time()))
t = round(datetime.timestamp(record.get_time()), 3)
value = record.get_value()
try:
value = PrettyFloat(value)
@ -402,14 +402,15 @@ class InfluxDataGetter:
curve.insert(0, [time[0], value])
return curve
def _get_last_values(self, variable, is_target, time):
def _get_last_values(self, variable, is_target, start_time, end_time):
"""
Gets the lastest values for the given variable that are in [time[1]-30s, time[1]].
Gets the lastest values for the given variable that are in [start_time, end_time].
Parameters :
variable (str) : the name (Influx) of the variable we want the last value of.
is_target (bool) : tells if the given variable is a target, or not (if variable is "nicos/se_t_chip.target", then is_target has to be set to True)
time ([int]) : the current asked timerange in the calling polling function (only second value used). It consists of two values which are Unix timestamps in seconds, first included, second excluded.
start_time (int|None) : the start of time range (Unix timestamp in seconds) to include the values in
end_time (int) : the end of time range (Unix timestamp in seconds) to include the values in
Returns :
[[(int), (float)]] : an array of points (also arrays). The first value is the Unix timestamp in second (x), the seconds is the value (y)
@ -418,9 +419,10 @@ class InfluxDataGetter:
res = []
query = f"""
from(bucket: "{self._bucket}")
|> range(start: {time[1]-30}, stop: {time[1]+1})
|> range(start: {start_time if start_time != None else 0}, stop: {end_time+1})
|> filter(fn : (r) => r._measurement == "{variable}")
|> filter(fn : (r) => r._field == "{"target_float" if is_target else "value_float"}")
{"|> last()" if start_time == None else ""}
|> keep(columns: ["_time","_value"])
|> yield(name: "res")
"""
@ -429,7 +431,7 @@ class InfluxDataGetter:
tables = self._db.query(query)
for table in tables:
for record in table.records:
t = int(datetime.timestamp(record.get_time()))
t = round(datetime.timestamp(record.get_time()), 3)
value = record.get_value()
try:
value = PrettyFloat(value)

View File

@ -30,19 +30,19 @@ class InfluxGraph:
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, json.load(open("./graphs/lab4.json", "r"))["influx"])
self.livemode = self.HISTORICAL
self.time = [0, 0]
self.end_query = 0
self.lastvalues = {}
self.variables = []
def seconds_to_nano(self, seconds):
def milliseconds_to_nano(self, milliseconds):
"""
Converts seconds to nanoseconds
Converts milliseconds to nanoseconds
Parameters:
seconds (int)
milliseconds (int)
Returns :
int
"""
return seconds*1000000000
return milliseconds*1000000
def get_abs_time(self, times):
"""
@ -59,23 +59,6 @@ class InfluxGraph:
oneyear = 365 * 24 * 3600
return [t + now if t < oneyear else t for t in times]
def strip_future(self, result):
"""
OLD : strip future points (happens only on dummy test_day)
Removes points more recent that the last requested point in time
Parameters :
result ({(str):[[(int),(float)]]}) : a dictionnary with the variable names as key, and an array of points,
which are an array containing the timestamp as their first value, and the y-value in float as their second one.
"""
# if self.livemode == self.LIVE:
for c in result.values():
while c:
lastt, _ = c[-1]
if lastt <= self.time[1]:
break
c.pop()
def complete_to_end(self, result, endtime):
"""
Completes the data until the last requested point in time by adding the last known y-value at the end point.
@ -90,7 +73,7 @@ class InfluxGraph:
lastt, lastx = c[-1]
if lastt < endtime:
c.append((endtime, lastx))
self.lastvalues[var] = (endtime, lastx)
self.lastvalues[var] = (lastt, lastx)
def w_graph(self, variables, time="-1800,0", interval=None):
"""
@ -110,17 +93,15 @@ class InfluxGraph:
time = [float(t) for t in time.split(',')]
start, end, now = self.get_abs_time(time + [0])
start, end, now = int(start), int(end), int(now)
self.time = [start, end]
self.variables = variables.split(',')
self.livemode = self.ACTUAL if end >= now else self.HISTORICAL
queried_time_range = [start, end]
queried_variables = variables.split(',')
self.livemode = self.ACTUAL if end+10 >= now else self.HISTORICAL
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
if interval : interval = self.milliseconds_to_nano(int(interval))
if interval : interval = self.seconds_to_nano(int(interval))
result = self.influx_data_getter.get_curves_in_timerange(self.variables, self.time, interval)
self.strip_future(result)
result = self.influx_data_getter.get_curves_in_timerange(queried_variables, queried_time_range, interval)
self.complete_to_end(result, min(end, now))
self.time[0] = self.time[1]
self.end_query = end
# reduction not yet implemented
return dict(type='graph-draw', reduced=False, graph=result)
@ -192,25 +173,32 @@ class InfluxGraph:
for data viewing), and a "graph" dictionnary with the variable names as key, and an array of points, which are an array containing the timestamp
as their first value, and the y-value in float as their second one
"""
if self.livemode == self.LIVE:
self.time[1], = self.get_abs_time([0])
else:
self.time[1] = self.time[0] # Do not update (the current requested value is the last)
if self.time[1] > self.time[0]:
result = self.influx_data_getter.poll_last_values(self.variables, self.time)
for variable in self.lastvalues.keys():
if variable in result.keys():
if result[variable][-1][0] > self.lastvalues[variable][0]:
self.lastvalues[variable] = (result[variable][-1][0], result[variable][-1][1])
else:
del result[variable]
if self.livemode != self.LIVE:
return None
now, = self.get_abs_time([0])
if int(self.time[1] / 60) != int(self.time[0] / 60):
# Update unchanged values every plain minute
for var, (_, lastx) in self.lastvalues.items():
if var not in result:
result[var] = [(self.time[1], lastx)]
self.time[0] = self.time[1]
if len(result) > 0:
return dict(type='graph-update', reduced=False, time=self.time[1], graph=result)
result = self.influx_data_getter.poll_last_values(self.variables, self.lastvalues, now)
for variable in self.lastvalues.keys():
if variable in result.keys():
# removes points older than the last known point (queries are in seconds and might return points already displayed)
while len(result[variable]) > 0:
if result[variable][0][0] <= self.lastvalues[variable][0]:
result[variable].pop(0)
else:
break
if len(result[variable]) > 0 and result[variable][-1][0] > self.lastvalues[variable][0]:
self.lastvalues[variable] = (result[variable][-1][0], result[variable][-1][1])
else:
del result[variable]
if int(now / 60) != int(self.end_query / 60):
# Update unchanged values every plain minute
for var, (_, lastx) in self.lastvalues.items():
if var not in result:
result[var] = [(now, lastx)]
self.end_query = now
if len(result) > 0:
return dict(type='graph-update', reduced=False, time=now, graph=result)
return None

View File

@ -21,7 +21,7 @@ import seagraph
import traceback
import logging
import circularlog
from gevent.lock import RLock
import os
import signal
@ -81,6 +81,7 @@ def get_update(path=None):
yield to_json_sse(msg)
if messages:
lastmsg = time.time()
gevent.sleep(0.1)
else:
if time.time() > lastmsg + 30:
if not client.info():
@ -292,13 +293,15 @@ class SeaInstrument(Instrument):
self.seacmd = None
self.last_client_remove = time.time()
self.history = deque(maxlen=1000)
self.sea_lock = RLock()
self.init()
gevent.Greenlet.spawn(self.checkconnections)
def init(self):
self.values = {}
self.groups = {}
self.device = sea_request_reply(self.seaspy, "samenv name")[0] # first line
with self.sea_lock:
self.device = sea_request_reply(self.seaspy, "samenv name")[0] # first line
self.consolepos = 0
self.timeStamp = None
self.history.clear()
@ -344,7 +347,8 @@ class SeaInstrument(Instrument):
gobj.lastreq = now
gobj.lastpoll = now
try:
data = sea_request_reply(self.seaspy, 'getgroup '+path)
with self.sea_lock:
data = sea_request_reply(self.seaspy, 'getgroup '+path)
except Exception as e:
logging.error('ERROR (getgroup %s) %s', path, traceback.format_exc())
continue
@ -611,7 +615,7 @@ class SeaParams:
self.consolepos = 0
self.id = uuid.uuid4().hex[0:15]
# SeaGraph.__init__(self)
self.queue = [dict(type='id', id=self.id, instrument=instrument.inst_name, device=instrument.device)]
self.queue = []
def poll(self):
messages = self.queue