Code refactoring

This commit is contained in:
2020-08-12 16:38:57 +02:00
parent 9fa68037da
commit 6db2f863d8
+56 -70
View File
@@ -1,48 +1,41 @@
import datetime
import time
import logging
import requests
logger = logging.getLogger("logger")
logger = logging.getLogger(__name__)
DATA_API_QUERY_URL = "https://data-api.psi.ch/sf/query"
def write_epics_pvs(output_file, start_pulse_id, stop_pulse_id, metadata, epics_pvs):
logger.info("Retrieve pulse-id / data mapping for pulse ids")
start_date, end_date = get_pulse_id_date_mapping([start_pulse_id, stop_pulse_id])
start_date = get_pulse_id_date_mapping(start_pulse_id)
stop_date = get_pulse_id_date_mapping(stop_pulse_id)
logger.info("Retrieving data for interval start: " + str(
start_date) + " end: " + str(end_date) + " . From " + new_base_url)
data = get_data(epics_pvs, start=start_date, end=end_date, base_url=new_base_url)
if len(data) < 1:
logger.error("No data retrieved")
open(new_filename + "_NO_DATA", 'a').close()
data = get_data(epics_pvs, start=start_date, stop=stop_date)
# TODO: Merge metadata to data.
if data:
logger.info("Persist data to hdf5 file")
data_api.to_hdf5(data, new_filename, overwrite=True, compression=None, shuffle=False)
else:
if new_filename:
logger.info("Persist data to hdf5 file")
data_api.to_hdf5(data, new_filename, overwrite=True,
compression=None, shuffle=False)
logger.error("No data retrieved")
open(output_file + "_NO_DATA", 'a').close()
def get_data(channel_list, start=None, end=None, base_url=None):
logger.info("Requesting range %s to %s." % (start, end))
logger.info("Retrieve data for channels: %s" % channel_list)
def get_data(channel_list, start=None, stop=None, base_url=None):
logger.info("Requesting range %s to %s for channels: " % (start, stop, channel_list))
query = {"range": {"startDate": datetime.datetime.isoformat(start),
"endDate": datetime.datetime.isoformat(end),
"endDate": datetime.datetime.isoformat(stop),
"startExpansion": True},
"channels": channel_list,
"fields": ["pulseId", "globalSeconds", "globalDate", "value",
"eventCount"]}
logger.debug(query)
response = requests.post(base_url + '/query', json=query)
response = requests.post(DATA_API_QUERY_URL, json=query)
# Check for successful return of data
if response.status_code != 200:
@@ -52,9 +45,10 @@ def get_data(channel_list, start=None, end=None, base_url=None):
while itry < 5:
itry += 1
time.sleep(60)
response = requests.post(base_url + '/query', json=query)
response = requests.post(DATA_API_QUERY_URL, json=query)
if response.status_code == 200:
break
logger.info("Data retrieval failed, post attempt %d" % itry)
if response.status_code != 200:
@@ -67,67 +61,59 @@ def get_data(channel_list, start=None, end=None, base_url=None):
return data_api.client._build_pandas_data_frame(data, index_field="globalDate")
def get_pulse_id_date_mapping(pulse_ids):
def get_pulse_id_date_mapping(pulse_id):
# See https://jira.psi.ch/browse/ATEST-897 for more details ...
logger.info("Retrieve pulse-id/date mapping for pulse_id %s" % pulse_id)
try:
dates = []
for pulse_id in pulse_ids:
query = {"range": {"startPulseId": 0,
"endPulseId": pulse_id},
"limit": 1,
"ordering": "desc",
"channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"],
"fields": ["pulseId", "globalDate"]}
query = {"range": {"startPulseId": 0,
"endPulseId": pulse_id},
"limit": 1,
"ordering": "desc",
"channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"],
"fields": ["pulseId", "globalDate"]}
for c in range(10):
for c in range(10):
logger.info("Retrieve mapping for pulse_id %d" % pulse_id)
# Query server
response = requests.post("https://data-api.psi.ch/sf/query",
json=query)
response = requests.post("https://data-api.psi.ch/sf/query", json=query)
# Check for successful return of data
if response.status_code != 200:
raise RuntimeError("Unable to retrieve data from server: ",
response)
# Check for successful return of data
if response.status_code != 200:
raise RuntimeError("Unable to retrieve data from server: ", response)
data = response.json()
data = response.json()
if len(data[0]["data"]) == 0 or not "pulseId" in \
data[0]["data"][0]:
raise RuntimeError(
"Didn't get good responce from data_api : %s " % data)
if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]:
raise RuntimeError(
"Didn't get good responce from data_api : %s " % data)
if not pulse_id == data[0]["data"][0]["pulseId"]:
logger.info("retrieval failed")
if c == 0:
ref_date = data[0]["data"][0]["globalDate"]
ref_date = dateutil.parser.parse(ref_date)
if not pulse_id == data[0]["data"][0]["pulseId"]:
logger.info("retrieval failed")
if c == 0:
ref_date = data[0]["data"][0]["globalDate"]
ref_date = dateutil.parser.parse(ref_date)
now_date = datetime.datetime.now()
now_date = pytz.timezone('Europe/Zurich').localize(
now_date)
now_date = datetime.datetime.now()
now_date = pytz.timezone('Europe/Zurich').localize(
now_date)
check_date = ref_date + datetime.timedelta(
seconds=24) # 20 seconds should be enough
delta_date = check_date - now_date
check_date = ref_date + datetime.timedelta(
seconds=24) # 20 seconds should be enough
delta_date = check_date - now_date
s = delta_date.seconds
logger.info("retry in " + str(s) + " seconds ")
if not s <= 0:
time.sleep(s)
continue
s = delta_date.seconds
logger.info("retry in " + str(s) + " seconds ")
if not s <= 0:
time.sleep(s)
continue
raise RuntimeError('Unable to retrieve mapping')
raise RuntimeError('Unable to retrieve mapping')
date = data[0]["data"][0]["globalDate"]
date = dateutil.parser.parse(date)
dates.append(date)
break
return dates
date = data[0]["data"][0]["globalDate"]
date = dateutil.parser.parse(date)
dates.append(date)
break
except Exception as e:
raise RuntimeError('Unable to retrieve pulse_id date mapping') from e