Delete slic/utils/logbook.py
Run CI Tests / test (push) Has been cancelled

This commit is contained in:
2025-08-12 16:17:32 +02:00
parent 511a8cf6b9
commit 93a206beea
-858
View File
@@ -1,858 +0,0 @@
import requests
import urllib.parse
import os
import builtins
import re
import sys
from .logbook_exceptions import *
from datetime import datetime
def open(*args, **kwargs):
"""
Will return a Logbook object. All arguments are passed to the logbook constructor.
:param args:
:param kwargs:
:return: Logbook() instance
"""
return Logbook(*args, **kwargs)
class Logbook(object):
"""
Logbook provides methods to interface with logbook on location: "server:port/subdir/logbook". User can create,
edit, delete logbook messages.
"""
def __init__(self, hostname, logbook='', port=None, user=None, password=None, subdir='', use_ssl=True,
encrypt_pwd=True):
"""
:param hostname: elog server hostname. If whole url is specified here, it will be parsed and arguments:
"logbook, port, subdir, use_ssl" will be overwritten by parsed values.
:param logbook: name of the logbook on the elog server
:param port: elog server port (if not specified will default to '80' if use_ssl=False or '443' if use_ssl=True
:param user: username (if authentication needed)
:param password: password (if authentication needed) Password will be encrypted with sha256 unless
encrypt_pwd=False (default: True)
:param subdir: subdirectory of logbooks locations
:param use_ssl: connect using ssl (ignored if url starts with 'http://'' or 'https://'?
:param encrypt_pwd: To avoid exposing password in the code, this flag can be set to False and password
will then be handled as it is (user needs to provide sha256 encrypted password with
salt= '' and rounds=5000)
:return:
"""
hostname = hostname.strip()
# parse url to see if some parameters are defined with url
parsed_url = urllib.parse.urlsplit(hostname)
# ---- handle SSL -----
# hostname must be modified according to use_ssl flag. If hostname starts with https:// or http://
# the use_ssl flag is ignored
url_scheme = parsed_url.scheme
if url_scheme == 'http':
use_ssl = False
elif url_scheme == 'https':
use_ssl = True
elif not url_scheme:
# add http or https
if use_ssl:
url_scheme = 'https'
else:
url_scheme = 'http'
# ---- handle port -----
# 1) by default use port defined in the url
# 2) remove any 'default' ports such as 80 for http and 443 for https
# 3) if port not defined in url and not 'default' add it to netloc
netloc = parsed_url.netloc
if netloc == "" and "localhost" in hostname:
netloc = 'localhost'
netloc_split = netloc.split(':')
if len(netloc_split) > 1:
# port defined in url --> remove if needed
port = netloc_split[1]
if (port == 80 and not use_ssl) or (port == 443 and use_ssl):
netloc = netloc_split[0]
else:
# add port info if needed
if port is not None and not (port == 80 and not use_ssl) and not (port == 443 and use_ssl):
netloc += ':{}'.format(port)
# ---- handle subdir and logbook -----
# parsed_url.path = /<subdir>/<logbook>/
# Remove last '/' for easier parsing
url_path = parsed_url.path
if url_path.endswith('/'):
url_path = url_path[:-1]
splitted_path = url_path.split('/')
if url_path and len(splitted_path) > 1:
# If here ... then at least some part of path is defined.
# If logbook defined --> treat path current path as subdir and add logbook at the end
# to define the full path. Else treat existing path as <subdir>/<logbook>.
# Put first and last '/' back on its place
if logbook:
url_path += '/{}'.format(logbook)
else:
logbook = splitted_path[-1]
else:
# There is nothing. Use arguments.
url_path = subdir + '/' + logbook
# urllib.parse.quote replaces special characters with %xx escapes
# self._logbook_path = urllib.parse.quote('/' + url_path + '/').replace('//', '/')
self._logbook_path = ('/' + url_path + '/').replace('//', '/')
self._url = url_scheme + '://' + netloc + self._logbook_path
self.logbook = logbook
self._user = user
self._password = _handle_pswd(password, encrypt_pwd)
def post(self, message, msg_id=None, reply=False, attributes=None, attachments=None,
suppress_email_notification=False, encoding=None, timeout=None, **kwargs):
"""
Posts message to the logbook. If msg_id is not specified new message will be created, otherwise existing
message will be edited, or a reply (if reply=True) to it will be created. This method returns the msg_id
of the newly created message.
"""
logbook_directory = "elog_instance/logbooks/demo"
print(f"Checking the existence of the directory {logbook_directory}")
# Check if the directory exists
if not os.path.exists(logbook_directory):
print(f"The directory {logbook_directory} does not exist.")
else:
print(f"The directory {logbook_directory} exists.")
# Check write permissions
if os.access(logbook_directory, os.W_OK):
print(f"The directory {logbook_directory} has write permissions.")
else:
print(f"The directory {logbook_directory} does not have write permissions.")
print("STARTING POST")
# Ajout des impressions pour déboguer
print(f"Message to post: {message}")
print(f"msg_id: {msg_id}")
print(f"Attributes: {attributes}")
print(f"Attachments: {attachments}")
print(f"Encoding: {encoding}")
print(f"Timeout: {timeout}")
print(f"Additional kwargs: {kwargs}")
attributes = attributes or {}
attributes = {**attributes, **kwargs} # kwargs as attributes with higher priority
print(f"Updated attributes: {attributes}")
attachments = attachments or []
print(f"Attachments list: {attachments}")
if encoding is not None:
if encoding not in ['plain', 'HTML', 'ELCode']:
raise LogbookMessageRejected('Invalid message encoding. Valid options: plain, HTML, ELCode.')
attributes['Encoding'] = encoding
if suppress_email_notification:
attributes["suppress"] = 1
# Prepare attachments
if attachments:
new_attachment_list, objects_to_close = self._prepare_attachments(attachments)
print(f"New attachments prepared: {new_attachment_list}")
else:
objects_to_close = []
new_attachment_list = []
attributes_to_edit = dict()
if msg_id:
print(f"Editing message with msg_id: {msg_id}")
if reply:
print(f"Replying to message with msg_id: {msg_id}")
self._check_if_message_on_server(msg_id)
attributes['reply_to'] = str(msg_id)
else:
print("Editing existing message.")
attributes['edit_id'] = str(msg_id)
attributes['skiplock'] = '1'
msg_to_edit, attributes_to_edit, existing_attachments_list = self.read(msg_id)
# Merge new attributes
for attribute, data in attributes.items():
if data is not None:
attributes_to_edit[attribute] = data
print(f"Attributes after merging: {attributes_to_edit}")
# Process existing attachments
i = 0
existing_attachments_filename_list = []
for attachment in existing_attachments_list:
attributes_to_edit[f'attachment{i}'] = os.path.basename(attachment)
existing_attachments_filename_list.append(os.path.basename(attachment)[14:])
i += 1
print(f"Existing attachments: {existing_attachments_filename_list}")
duplicate_attachment_list = []
for new_attachment in new_attachment_list:
new_attachment_filename = new_attachment[1][0]
print(f"Checking new attachment: {new_attachment_filename}")
if new_attachment_filename in existing_attachments_filename_list:
# Same attachment exists on the server, compare content
new_attachment_content = new_attachment[1][1].read()
new_attachment[1][1].seek(0)
attachment_index = existing_attachments_filename_list.index(new_attachment_filename)
existing_attachment_content = self.download_attachment(
url=existing_attachments_list[attachment_index],
timeout=timeout
)
if new_attachment_content == existing_attachment_content:
print(f"Duplicate attachment detected: {new_attachment_filename}")
duplicate_attachment_list.append(new_attachment)
else:
print(f"Attachment content has changed: {new_attachment_filename}")
self.delete_attachment(msg_id, attributes=attributes_to_edit,
attachment_id=attachment_index,
timeout=timeout, text=msg_to_edit)
existing_attachments_filename_list.pop(attachment_index)
existing_attachments_list.pop(attachment_index)
print(f"Duplicate attachments to remove: {duplicate_attachment_list}")
# Remove duplicates
for attach in duplicate_attachment_list:
new_attachment_list.remove(attach)
print(f"Final new attachments list: {new_attachment_list}")
else:
# Creating a new message, add timestamp if not present
if 'When' not in attributes:
attributes['When'] = int(datetime.now().timestamp())
# Final check on attributes
if not attributes_to_edit:
attributes_to_edit = attributes
print(f"Final attributes to send to the server: {attributes_to_edit}")
# Remove reserved attributes
_remove_reserved_attributes(attributes_to_edit)
new_attachment_list.append(('Text', ('', message.encode('iso-8859-1'))))
print(f"Final attachment list including message text: {new_attachment_list}")
# Add base message attributes
self._add_base_msg_attributes(attributes_to_edit)
print(f"Attributes with base message added: {attributes_to_edit}")
# Sanitize attribute keys
attributes_to_edit = _replace_special_characters_in_attribute_keys(attributes_to_edit)
print(f"Attributes after sanitizing keys: {attributes_to_edit}")
# Encode all string values in latin1
attributes_to_edit = _encode_values(attributes_to_edit)
print(f"Attributes after encoding: {attributes_to_edit}")
try:
print("Sending POST request to the server...")
response = requests.post(self._url, data=attributes_to_edit, files=new_attachment_list,
allow_redirects=False, verify=False, timeout=timeout)
print("Response received:", response)
resp_message, resp_headers, resp_msg_id = _validate_response(response)
print(f"Response message: {resp_message}")
print(f"Response headers: {resp_headers}")
print(f"Response msg_id: {resp_msg_id}")
# Close file-like objects if needed
for file_like_object in objects_to_close:
if hasattr(file_like_object, 'close'):
file_like_object.close()
except requests.Timeout as e:
print("Request timed out:", e)
raise LogbookServerTimeout(f"Timeout while posting: {e}")
except requests.RequestException as e:
print("Request exception:", e)
self._check_if_message_on_server(msg_id) # Raises exceptions if no message or no response from server
raise LogbookServerProblem(f"Cannot access logbook server: {e}")
if not resp_msg_id or resp_msg_id < 1:
raise LogbookInvalidMessageID(f"Invalid message ID: {resp_msg_id}")
print(f"Message posted successfully with msg_id: {resp_msg_id}")
return resp_msg_id
def read(self, msg_id, timeout=None):
"""
Reads message from the logbook server and returns tuple of (message, attributes, attachments) where:
message: string with message body
attributes: dictionary of all attributes returned by the logbook
attachments: list of urls to attachments on the logbook server
:param msg_id: ID of the message to be read
:param timeout: The timeout value to be passed to the get request.
:return: message, attributes, attachments
"""
request_headers = dict()
if self._user or self._password:
request_headers['Cookie'] = self._make_user_and_pswd_cookie()
try:
self._check_if_message_on_server(msg_id) # raises exceptions if no message or no response from server
response = requests.get(self._url + str(msg_id) + '?cmd=download', headers=request_headers,
allow_redirects=False, verify=False, timeout=timeout)
# Validate response. If problems Exception will be thrown.
resp_message, resp_headers, resp_msg_id = _validate_response(response)
except requests.Timeout as e:
# Catch here a timeout o the post request.
# Raise the logbook excetion and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
except requests.RequestException as e:
# If here: message is on server but cannot be downloaded (should never happen)
raise LogbookServerProblem('Cannot access logbook server to read the message with ID: ' + str(msg_id) +
'because of:\n' + '{0}'.format(e))
# Parse message to separate message body, attributes and attachments
attributes = dict()
attachments = list()
returned_msg = resp_message.decode('iso-8859-1', 'ignore').splitlines()
delimiter_idx = returned_msg.index('========================================')
message = '\n'.join(returned_msg[delimiter_idx + 1:])
for line in returned_msg[0:delimiter_idx]:
line = line.split(': ')
data = ''.join(line[1:])
if line[0] == 'Attachment':
if not data:
# Treat the empty string as special case,
# otherwise the split below returns [""] and attachments is [self._url]
attachments = []
else:
attachments = data.split(',')
# Here are only attachment names, make a full url out of it, so they could be
# recognisable by others, and downloaded if needed
attachments = [self._url + '{0}'.format(i) for i in attachments]
else:
attributes[line[0]] = data
return message, attributes, attachments
def delete_attachment(self, msg_id, text, attributes, attachment_id, timeout=None):
attributes[f'delatt{attachment_id}'] = 'Delete'
attributes['cmd'] = 'Update'
attributes['exp'] = self.logbook
if self._user:
attributes['unm'] = self._user
if self._password:
attributes['upwd'] = self._password
just_text = list()
just_text.append(('Text', ('', text.encode('iso-8859-1'))))
try:
response = requests.post(self._url, data=attributes, verify=False, allow_redirects=False,
files=just_text)
except requests.Timeout as e:
# Catch here a timeout o the post request.
# Raise the logbook excetion and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
except requests.RequestException as e:
# Check if message on server.
self._check_if_message_on_server(msg_id) # raises exceptions if no message or no response from server
# If here: message is on server but cannot be downloaded (should never happen)
raise LogbookServerProblem('Cannot access logbook server to post a message, ' + 'because of:\n' +
'{0}'.format(e))
finally:
del attributes[f'delatt{attachment_id}']
def delete_all_attachments(self, msg_id, timeout=None):
message, attributes, attachments = self.read(msg_id, timeout)
n_attach = len(attachments)
for attachment_id in range(n_attach):
self.delete_attachment(msg_id, message, attributes, attachment_id, timeout)
def delete(self, msg_id, timeout=None):
"""
Deletes message thread (!!!message + all replies!!!) from logbook.
It also deletes all attachments of corresponding messages from the server.
:param msg_id: message to be deleted
:param timeout: timeout value to be passed to the get request
:return:
"""
request_headers = dict()
if self._user or self._password:
request_headers['Cookie'] = self._make_user_and_pswd_cookie()
try:
self._check_if_message_on_server(msg_id) # check if something to delete
response = requests.get(self._url + str(msg_id) + '?cmd=Delete&confirm=Yes', headers=request_headers,
allow_redirects=False, verify=False, timeout=timeout)
_validate_response(response) # raises exception if any other error identified
except requests.Timeout as e:
# Catch here a timeout o the post request.
# Raise the logbook excetion and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
except requests.RequestException as e:
# If here: message is on server but cannot be downloaded (should never happen)
raise LogbookServerProblem('Cannot access logbook server to delete the message with ID: ' + str(msg_id) +
'because of:\n' + '{0}'.format(e))
# Additional validation: If successfully deleted then status_code = 302. In case command was not executed at
# all (not English language --> no download command supported) status_code = 200 and the content is just a
# html page of this whole message.
if response.status_code == 200:
raise LogbookServerProblem('Cannot process delete command (only logbooks in English supported).')
def search(self, search_term, n_results=20, scope="subtext", timeout=None):
"""
Searches the logbook and returns the message ids.
:param timeout: timeout value to be passed to the get request
"""
request_headers = dict()
if self._user or self._password:
request_headers['Cookie'] = self._make_user_and_pswd_cookie()
# Putting n_results = 0 crashes the elog. also in the web-gui.
n_results = 1 if n_results < 1 else n_results
params = {
"mode": "full",
"reverse": "1",
"npp": n_results
}
if type(search_term) is dict:
params.update(search_term)
else:
params.update({scope: search_term})
# Remove empty entries from params, since ELog will redirect such requests
# and remove them anyway, but the redirect leads to unexpected results
keys = list(params.keys())
for key in keys:
if params[key] == "":
params.pop(key)
try:
response = requests.get(self._url, params=params, headers=request_headers,
allow_redirects=False, verify=False, timeout=timeout)
# Validate response. If problems Exception will be thrown.
_validate_response(response)
resp_message = response
except requests.Timeout as e:
# Catch here a timeout o the post request.
# Raise the logbook excetion and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
except requests.RequestException as e:
# If here: message is on server but cannot be downloaded (should never happen)
raise LogbookServerProblem('Cannot access logbook server to read message ids '
'because of:\n' + '{0}'.format(e))
from lxml import html
tree = html.fromstring(resp_message.content)
message_ids = tree.xpath('(//tr/td[@class="list1" or @class="list2"][1])/a/@href')
message_ids = [int(m.split("/")[-1]) for m in message_ids]
return message_ids
def get_last_message_id(self, timeout=None):
ids = self.get_message_ids(timeout)
if len(ids) > 0:
return ids[0]
else:
return None
def get_message_ids(self, timeout=None):
request_headers = dict()
if self._user or self._password:
request_headers['Cookie'] = self._make_user_and_pswd_cookie()
try:
response = requests.get(self._url + 'page', headers=request_headers,
allow_redirects=False, verify=False, timeout=timeout)
# Validate response. If problems Exception will be thrown.
_validate_response(response)
resp_message = response
except requests.Timeout as e:
# Catch here a timeout o the post request.
# Raise the logbook exception and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
except requests.RequestException as e:
# If here: message is on server but cannot be downloaded (should never happen)
raise LogbookServerProblem('Cannot access logbook server to read message ids '
'because of:\n' + '{0}'.format(e))
from lxml import html
tree = html.fromstring(resp_message.content)
message_ids = tree.xpath('(//tr/td[@class="list1" or @class="list2"][1])/a/@href')
message_ids = [int(m.split("/")[-1]) for m in message_ids]
return message_ids
def download_attachment(self, url, timeout=None):
"""
Download an attachment from the specified url.
"""
request_headers = dict()
if self._user or self._password:
request_headers['Cookie'] = self._make_user_and_pswd_cookie()
try:
response = requests.get(url, headers=request_headers, allow_redirects=False,
verify=False, timeout=timeout)
# If there is no message code 200 will be returned (OK) and _validate_response will not recognise it
# but there will be some error in the html code.
resp_message, resp_headers, resp_msg_id = _validate_response(response)
except requests.Timeout as e:
# Catch here a timeout of the get request.
# Raise the logbook exception and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
return resp_message
def _check_if_message_on_server(self, msg_id, timeout=None):
"""Try to load page for specific message. If there is a html tag like <td class="errormsg"> then there is no
such message.
:param msg_id: ID of message to be checked
:params timeout: The value of timeout to be passed to the get request
:return:
"""
request_headers = dict()
if self._user or self._password:
request_headers['Cookie'] = self._make_user_and_pswd_cookie()
try:
response = requests.get(self._url + str(msg_id), headers=request_headers, allow_redirects=False,
verify=False, timeout=timeout)
# If there is no message code 200 will be returned (OK) and _validate_response will not recognise it
# but there will be some error in the html code.
resp_message, resp_headers, resp_msg_id = _validate_response(response)
# If there is no message, code 200 will be returned (OK) but there will be some error indication in
# the html code.
if re.findall('<td.*?class="errormsg".*?>.*?</td>',
resp_message.decode('utf-8', 'ignore'),
flags=re.DOTALL):
raise LogbookInvalidMessageID('Message with ID: ' + str(msg_id) + ' does not exist on logbook.')
except requests.Timeout as e:
# Catch here a timeout o the post request.
# Raise the logbook exception and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
except requests.RequestException as e:
raise LogbookServerProblem('No response from the logbook server.\nDetails: ' + '{0}'.format(e))
def _add_base_msg_attributes(self, data):
"""
Adds base message attributes which are used by all messages.
:param data: dict of current attributes
:return: content string
"""
data['cmd'] = 'Submit'
data['exp'] = self.logbook
if self._user:
data['unm'] = self._user
if self._password:
data['upwd'] = self._password
def _prepare_attachments(self, files):
"""
Parses attachments to content objects. Attachments can be:
- file like objects: must have method read() which returns bytes. If it has attribute .name it will be used
for attachment name, otherwise generic attribute<i> name will be used.
- path to the file on disk
Note that if attachment is an url pointing to the existing Logbook server it will be ignored and no
exceptions will be raised. This can happen if attachments returned with read_method are resend.
:param files: list of file like objects or paths
:return: two lists:
- one list of prepared attachment in the form of
[ ('attfileN', ('filename', file_object)) ]
- one list of object to be closed. all files that are passed as string or path are opened by the library
and need to be closed by the library.
"""
prepared = list()
i = 0
objects_to_close = list() # objects that are created (opened) by elog must be later closed
for file_obj in files:
if hasattr(file_obj, 'read'):
attribute_name = f'attfile{i}'
filename = attribute_name # If file like object has no name specified use this one
candidate_filename = os.path.basename(file_obj.name).replace(' ', '_')
if candidate_filename: # use only if not empty string
filename = candidate_filename
i += 1
elif isinstance(file_obj, str):
# Check if it is:
# - a path to the file --> open file and append
# - an url pointing to the existing Logbook server --> ignore
filename = ""
attribute_name = ""
if os.path.isfile(file_obj):
attribute_name = f'attfile{i}'
file_obj = builtins.open(file_obj, 'rb')
filename = os.path.basename(file_obj.name).replace(' ', '_')
objects_to_close.append(file_obj)
i += 1
elif not file_obj.startswith(self._url):
raise LogbookInvalidAttachmentType('Invalid type of attachment: \"' + file_obj + '\".')
else:
raise LogbookInvalidAttachmentType('Invalid type of attachment[' + str(i) + '].')
# prepared.append((attribute_name, (filename, file_obj)))
prepared.append((attribute_name, (filename, file_obj)))
return prepared, objects_to_close
def _make_user_and_pswd_cookie(self):
"""
prepares user name and password cookie. It is sent in header when posting a message.
:return: user name and password value for the Cookie header
"""
cookie = ''
if self._user:
cookie += 'unm=' + self._user + ';'
if self._password:
cookie += 'upwd=' + self._password + ';'
return cookie
def get_parent(self, msg_id, timeout=None):
"""
:return: the message id of the message specify by msg_id
"""
message, attributes, attachments = self.read(msg_id, timeout=timeout)
parent_id = attributes.get('In reply to', None)
if parent_id:
return int(parent_id)
else:
return None
@staticmethod
def from_string_to_list(children_string):
"""
:return: a list of children starting from a comma separated string of numbers
"""
return [int(child) for child in children_string.split(',')]
def get_children(self, msg_id, timeout=None):
"""
:return: a list of children of a message. The list could be empty if the message has no children.
"""
m, attributes, a = self.read(msg_id, timeout=timeout)
children_str = attributes.get('Reply to', None)
if children_str is None:
return []
else:
return self.from_string_to_list(children_str)
def get_descendants(self, msg_id, timeout=None):
"""
:return: a list with all children of a message recursively.
The list could be empty if the message has no descendant.
"""
all_children = []
children = self.get_children(msg_id, timeout)
for child in children:
all_children.append(child)
self._recursive_loop(all_children, child, timeout)
return all_children
def get_siblings(self, msg_id, timeout=None):
"""
:return: the list of siblings of the message specified by msg_id
"""
parent_id = self.get_parent(msg_id, timeout=None)
if parent_id is None:
return None
return self.get_children(parent_id, timeout)
def _recursive_loop(self, cumulative_list, current_child, timeout=None):
"""
Helper function to perform recursive loops
"""
children = self.get_children(current_child, timeout)
for child in children:
cumulative_list.append(child)
self._recursive_loop(cumulative_list, child, timeout)
def get_ancestors(self, msg_id, timeout=None):
"""
:return: the list of all predecessors up to the first element in the series. The list could be empty if the
message correspoonding to msg_id is already the first element in the series.
"""
anchestors = []
parent_id = self.get_parent(msg_id, timeout)
while parent_id is not None:
anchestors.append(parent_id)
parent_id = self.get_parent(parent_id, timeout)
return anchestors
def _remove_reserved_attributes(attributes):
"""
Removes elog reserved attributes (from the attributes dict) that can not be sent.
:param attributes: dictionary of attributes to be cleaned.
:return:
"""
if attributes:
attributes.get('$@MID@$', None)
attributes.pop('Date', None)
attributes.pop('Attachment', None)
attributes.pop('Text', None) # Remove this one because it will be send attachment like
def _encode_values(attributes):
"""
prepares a dictionary of the attributes with latin1 encoded string values.
:param attributes: dictionary of attributes to ve encoded
:return: dictionary with encoded string attributes
"""
encoded_attributes = {}
for key, value in attributes.items():
if isinstance(value, str):
encoded_attributes[key] = value.encode('iso-8859-1')
else:
encoded_attributes[key] = value
return encoded_attributes
def _replace_special_characters_in_attribute_keys(attributes):
"""
Replaces special characters in elog attribute keys by underscore, otherwise attribute values will be erased in
the http request. This is using the same replacement elog itself is using to handle these cases
:param attributes: dictionary of attributes to be cleaned.
:return: attributes with replaced keys
"""
return {re.sub('[^0-9a-zA-Z]', '_', key): value for key, value in attributes.items()}
def _validate_response(response):
""" Validate response of the request."""
msg_id = None
if response.status_code not in [200, 302]:
# 200 --> OK; 302 --> Found
# Html page is returned with error description (handling errors same way as on original client. Looks
# like there is no other way.
err = re.findall('<td.*?class="errormsg".*?>.*?</td>',
response.content.decode('utf-8', 'ignore'),
flags=re.DOTALL)
if len(err) > 0:
# Remove html tags
# If part of the message has: Please go back... remove this part since it is an instruction for
# the user when using browser.
err = re.sub('(?:<.*?>)', '', err[0])
if err:
raise LogbookMessageRejected('Rejected because of: ' + err)
else:
raise LogbookMessageRejected('Rejected because of unknown error.')
# Other unknown errors
raise LogbookMessageRejected('Rejected because of unknown error.')
else:
location = response.headers.get('Location')
if location is not None:
if 'has moved' in location:
raise LogbookServerProblem('Logbook server has moved to another location.')
elif 'fail' in location:
raise LogbookAuthenticationError('Invalid username or password.')
else:
# returned locations is something like: '<host>/<sub_dir>/<logbook>/<msg_id><query>
# with urllib.parse.urlparse returns attribute path=<sub_dir>/<logbook>/<msg_id>
try:
msg_id = int(urllib.parse.urlsplit(location).path.split('/')[-1])
except ValueError as e:
# it was not possible to get the msg_id.
# this may happen when deleting the last entry of a logbook
msg_id = None
if b'type=password' in response.content or b'type="password"' in response.content:
# Not too smart to check this way, but no other indication of this kind of error.
# C client does it the same way
raise LogbookAuthenticationError('Invalid username or password.')
return response.content, response.headers, msg_id
def _handle_pswd(password, encrypt=True):
"""
Takes password string and returns password as needed by elog. If encrypt=True then password will be
sha256 encrypted (salt='', rounds=5000). Before returning password, any trailing $5$$ will be removed
independent off encrypt flag.
:param password: password string
:param encrypt: encrypt password?
:return: elog prepared password
"""
if encrypt and password is not None:
from passlib.hash import sha256_crypt
return sha256_crypt.using(salt='', rounds=5000).hash(password)[4:]
elif password and password.startswith('$5$$'):
return password[4:]
else:
return password