Added expired=True support
This commit is contained in:
40
influxdb.py
40
influxdb.py
@ -116,9 +116,10 @@ class InfluxDataGetter:
|
||||
var_param = variable.split(".")
|
||||
variable_name_for_query = var_param[0]
|
||||
parameter = "value" if len(var_param) == 1 else var_param[1]
|
||||
start_time = int(lastvalues[variable_name_for_query][0]) if variable_name_for_query in lastvalues.keys() else None
|
||||
start_time = start_time if start_time != None and start_time <= end_time else None # to prevent self.lastvalues being changed by w_graph between entrance of graphpoll and poll_last_values
|
||||
points = self._get_last_values(variable_name_for_query,parameter,start_time, end_time)
|
||||
start_time = int(lastvalues[variable][0]) if variable in lastvalues.keys() else None #if the call to poll_last_values is more recent than getgraph, we trigger only one value, which is the last one available from 0 to end_time
|
||||
points = []
|
||||
if start_time == None or start_time < end_time: # start_time might be more recent than end_time if getgraph is called between graphpoll and pol_last_values (influxgraph.py)
|
||||
points = self._get_last_values(variable_name_for_query,parameter,start_time, end_time)
|
||||
if len(points) > 0 :
|
||||
res[variable] = points
|
||||
return res
|
||||
@ -325,14 +326,14 @@ class InfluxDataGetter:
|
||||
Returns :
|
||||
[[(int), (float)]] : an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
|
||||
"""
|
||||
res = []
|
||||
raw = []
|
||||
query = f"""
|
||||
from(bucket: "{self._bucket}")
|
||||
|> range(start: {time[0]}, stop: {time[1] + 1})
|
||||
|> filter(fn : (r) => r._measurement == "{variable}")
|
||||
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
||||
{"|> aggregateWindow(every: duration(v:"+str(interval)+"), fn: last, createEmpty:false)" if interval else ""}
|
||||
|> keep(columns: ["_time","_value"])
|
||||
|> keep(columns: ["_time","_value","expired"])
|
||||
|> yield(name: "res")
|
||||
"""
|
||||
tables = self._db.query(query)
|
||||
@ -344,7 +345,15 @@ class InfluxDataGetter:
|
||||
value = PrettyFloat(value)
|
||||
except:
|
||||
value = None
|
||||
res.append([t, value])
|
||||
raw.append([t, value, record["expired"]])
|
||||
sorted_raw = sorted(raw, key=lambda pair: pair[0]) #expired=True and expired=False are in two Influx tables, so they need to be synchronized
|
||||
res = []
|
||||
for pair in sorted_raw:
|
||||
if pair[2] == "True":
|
||||
res.append([pair[0], pair[1]]) # So the user can know precisely when a curve is expired
|
||||
res.append([pair[0], None]) # So chartJS will cut the curve from this point (which is expired)
|
||||
else:
|
||||
res.append([pair[0], pair[1]])
|
||||
|
||||
return self._insert_last_known_value(variable, parameter, res, time)
|
||||
|
||||
@ -367,7 +376,7 @@ class InfluxDataGetter:
|
||||
from(bucket: "{self._bucket}")
|
||||
|> range(start: 0, stop: {time[0]+1})
|
||||
|> filter(fn : (r) => r._measurement == "{variable}")
|
||||
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
||||
|> filter(fn : (r) => r._field == "{parameter+"_float"}" and r.expired == "False")
|
||||
|> last()
|
||||
|> keep(columns: ["_value"])
|
||||
|> yield(name: "res")
|
||||
@ -387,6 +396,7 @@ class InfluxDataGetter:
|
||||
def _get_last_values(self, variable, parameter, start_time, end_time):
|
||||
"""
|
||||
Gets the lastest values for the given variable and parameter that are in [start_time, end_time].
|
||||
The process is the same as _get_curve.
|
||||
|
||||
Parameters :
|
||||
variable (str) : the name (Influx) of the variable we want the last value of.
|
||||
@ -398,14 +408,14 @@ class InfluxDataGetter:
|
||||
[[(int), (float)]] : an array of points (also arrays). The first value is the Unix timestamp in second (x), the seconds is the value (y)
|
||||
"""
|
||||
|
||||
res = []
|
||||
raw = []
|
||||
query = f"""
|
||||
from(bucket: "{self._bucket}")
|
||||
|> range(start: {start_time if start_time != None else 0}, stop: {end_time+1})
|
||||
|> filter(fn : (r) => r._measurement == "{variable}")
|
||||
|> filter(fn : (r) => r._field == "{parameter+ "_float"}")
|
||||
{"|> last()" if start_time == None else ""}
|
||||
|> keep(columns: ["_time","_value"])
|
||||
|> keep(columns: ["_time","_value", "expired"])
|
||||
|> yield(name: "res")
|
||||
"""
|
||||
|
||||
@ -419,7 +429,17 @@ class InfluxDataGetter:
|
||||
value = PrettyFloat(value)
|
||||
except:
|
||||
value = None
|
||||
res.append([t, value])
|
||||
raw.append([t, value, record["expired"]])
|
||||
|
||||
sorted_raw = sorted(raw, key=lambda pair: pair[0])
|
||||
res = []
|
||||
for pair in sorted_raw:
|
||||
if pair[2] == "True":
|
||||
res.append([pair[0], pair[1]])
|
||||
res.append([pair[0], None])
|
||||
else:
|
||||
res.append([pair[0], pair[1]])
|
||||
|
||||
return res
|
||||
|
||||
def _get_device_name_components(self, time):
|
||||
|
Reference in New Issue
Block a user