From 93a206beeae4d4ec45f0d4d01fdd1feb29ea2f73 Mon Sep 17 00:00:00 2001 From: tligui_y Date: Tue, 12 Aug 2025 16:17:32 +0200 Subject: [PATCH] Delete slic/utils/logbook.py --- slic/utils/logbook.py | 858 ------------------------------------------ 1 file changed, 858 deletions(-) delete mode 100644 slic/utils/logbook.py diff --git a/slic/utils/logbook.py b/slic/utils/logbook.py deleted file mode 100644 index 61e56bf4a..000000000 --- a/slic/utils/logbook.py +++ /dev/null @@ -1,858 +0,0 @@ -import requests -import urllib.parse -import os -import builtins -import re -import sys -from .logbook_exceptions import * -from datetime import datetime - -def open(*args, **kwargs): - """ - Will return a Logbook object. All arguments are passed to the logbook constructor. - :param args: - :param kwargs: - :return: Logbook() instance - """ - return Logbook(*args, **kwargs) - - -class Logbook(object): - """ - Logbook provides methods to interface with logbook on location: "server:port/subdir/logbook". User can create, - edit, delete logbook messages. - """ - - def __init__(self, hostname, logbook='', port=None, user=None, password=None, subdir='', use_ssl=True, - encrypt_pwd=True): - """ - :param hostname: elog server hostname. If whole url is specified here, it will be parsed and arguments: - "logbook, port, subdir, use_ssl" will be overwritten by parsed values. - :param logbook: name of the logbook on the elog server - :param port: elog server port (if not specified will default to '80' if use_ssl=False or '443' if use_ssl=True - :param user: username (if authentication needed) - :param password: password (if authentication needed) Password will be encrypted with sha256 unless - encrypt_pwd=False (default: True) - :param subdir: subdirectory of logbooks locations - :param use_ssl: connect using ssl (ignored if url starts with 'http://'' or 'https://'? - :param encrypt_pwd: To avoid exposing password in the code, this flag can be set to False and password - will then be handled as it is (user needs to provide sha256 encrypted password with - salt= '' and rounds=5000) - :return: - """ - hostname = hostname.strip() - - # parse url to see if some parameters are defined with url - parsed_url = urllib.parse.urlsplit(hostname) - - # ---- handle SSL ----- - # hostname must be modified according to use_ssl flag. If hostname starts with https:// or http:// - # the use_ssl flag is ignored - url_scheme = parsed_url.scheme - if url_scheme == 'http': - use_ssl = False - - elif url_scheme == 'https': - use_ssl = True - - elif not url_scheme: - # add http or https - if use_ssl: - url_scheme = 'https' - else: - url_scheme = 'http' - - # ---- handle port ----- - # 1) by default use port defined in the url - # 2) remove any 'default' ports such as 80 for http and 443 for https - # 3) if port not defined in url and not 'default' add it to netloc - - netloc = parsed_url.netloc - if netloc == "" and "localhost" in hostname: - netloc = 'localhost' - netloc_split = netloc.split(':') - if len(netloc_split) > 1: - # port defined in url --> remove if needed - port = netloc_split[1] - if (port == 80 and not use_ssl) or (port == 443 and use_ssl): - netloc = netloc_split[0] - - else: - # add port info if needed - if port is not None and not (port == 80 and not use_ssl) and not (port == 443 and use_ssl): - netloc += ':{}'.format(port) - - # ---- handle subdir and logbook ----- - # parsed_url.path = /// - - # Remove last '/' for easier parsing - url_path = parsed_url.path - if url_path.endswith('/'): - url_path = url_path[:-1] - - splitted_path = url_path.split('/') - if url_path and len(splitted_path) > 1: - # If here ... then at least some part of path is defined. - - # If logbook defined --> treat path current path as subdir and add logbook at the end - # to define the full path. Else treat existing path as /. - # Put first and last '/' back on its place - if logbook: - url_path += '/{}'.format(logbook) - else: - logbook = splitted_path[-1] - - else: - # There is nothing. Use arguments. - url_path = subdir + '/' + logbook - - # urllib.parse.quote replaces special characters with %xx escapes - # self._logbook_path = urllib.parse.quote('/' + url_path + '/').replace('//', '/') - self._logbook_path = ('/' + url_path + '/').replace('//', '/') - - self._url = url_scheme + '://' + netloc + self._logbook_path - self.logbook = logbook - self._user = user - self._password = _handle_pswd(password, encrypt_pwd) - - def post(self, message, msg_id=None, reply=False, attributes=None, attachments=None, - suppress_email_notification=False, encoding=None, timeout=None, **kwargs): - """ - Posts message to the logbook. If msg_id is not specified new message will be created, otherwise existing - message will be edited, or a reply (if reply=True) to it will be created. This method returns the msg_id - of the newly created message. - """ - - logbook_directory = "elog_instance/logbooks/demo" - print(f"Checking the existence of the directory {logbook_directory}") - - # Check if the directory exists - if not os.path.exists(logbook_directory): - print(f"The directory {logbook_directory} does not exist.") - else: - print(f"The directory {logbook_directory} exists.") - - # Check write permissions - if os.access(logbook_directory, os.W_OK): - print(f"The directory {logbook_directory} has write permissions.") - else: - print(f"The directory {logbook_directory} does not have write permissions.") - - - print("STARTING POST") - # Ajout des impressions pour déboguer - print(f"Message to post: {message}") - print(f"msg_id: {msg_id}") - print(f"Attributes: {attributes}") - print(f"Attachments: {attachments}") - print(f"Encoding: {encoding}") - print(f"Timeout: {timeout}") - print(f"Additional kwargs: {kwargs}") - - attributes = attributes or {} - attributes = {**attributes, **kwargs} # kwargs as attributes with higher priority - print(f"Updated attributes: {attributes}") - - attachments = attachments or [] - print(f"Attachments list: {attachments}") - - if encoding is not None: - if encoding not in ['plain', 'HTML', 'ELCode']: - raise LogbookMessageRejected('Invalid message encoding. Valid options: plain, HTML, ELCode.') - attributes['Encoding'] = encoding - - if suppress_email_notification: - attributes["suppress"] = 1 - - # Prepare attachments - if attachments: - new_attachment_list, objects_to_close = self._prepare_attachments(attachments) - print(f"New attachments prepared: {new_attachment_list}") - else: - objects_to_close = [] - new_attachment_list = [] - - attributes_to_edit = dict() - - if msg_id: - print(f"Editing message with msg_id: {msg_id}") - if reply: - print(f"Replying to message with msg_id: {msg_id}") - self._check_if_message_on_server(msg_id) - attributes['reply_to'] = str(msg_id) - else: - print("Editing existing message.") - attributes['edit_id'] = str(msg_id) - attributes['skiplock'] = '1' - msg_to_edit, attributes_to_edit, existing_attachments_list = self.read(msg_id) - - # Merge new attributes - for attribute, data in attributes.items(): - if data is not None: - attributes_to_edit[attribute] = data - - print(f"Attributes after merging: {attributes_to_edit}") - - # Process existing attachments - i = 0 - existing_attachments_filename_list = [] - for attachment in existing_attachments_list: - attributes_to_edit[f'attachment{i}'] = os.path.basename(attachment) - existing_attachments_filename_list.append(os.path.basename(attachment)[14:]) - i += 1 - - print(f"Existing attachments: {existing_attachments_filename_list}") - - duplicate_attachment_list = [] - for new_attachment in new_attachment_list: - new_attachment_filename = new_attachment[1][0] - print(f"Checking new attachment: {new_attachment_filename}") - if new_attachment_filename in existing_attachments_filename_list: - # Same attachment exists on the server, compare content - new_attachment_content = new_attachment[1][1].read() - new_attachment[1][1].seek(0) - attachment_index = existing_attachments_filename_list.index(new_attachment_filename) - existing_attachment_content = self.download_attachment( - url=existing_attachments_list[attachment_index], - timeout=timeout - ) - if new_attachment_content == existing_attachment_content: - print(f"Duplicate attachment detected: {new_attachment_filename}") - duplicate_attachment_list.append(new_attachment) - else: - print(f"Attachment content has changed: {new_attachment_filename}") - self.delete_attachment(msg_id, attributes=attributes_to_edit, - attachment_id=attachment_index, - timeout=timeout, text=msg_to_edit) - existing_attachments_filename_list.pop(attachment_index) - existing_attachments_list.pop(attachment_index) - - print(f"Duplicate attachments to remove: {duplicate_attachment_list}") - - # Remove duplicates - for attach in duplicate_attachment_list: - new_attachment_list.remove(attach) - - print(f"Final new attachments list: {new_attachment_list}") - else: - # Creating a new message, add timestamp if not present - if 'When' not in attributes: - attributes['When'] = int(datetime.now().timestamp()) - - # Final check on attributes - if not attributes_to_edit: - attributes_to_edit = attributes - - print(f"Final attributes to send to the server: {attributes_to_edit}") - - # Remove reserved attributes - _remove_reserved_attributes(attributes_to_edit) - - new_attachment_list.append(('Text', ('', message.encode('iso-8859-1')))) - print(f"Final attachment list including message text: {new_attachment_list}") - - # Add base message attributes - self._add_base_msg_attributes(attributes_to_edit) - print(f"Attributes with base message added: {attributes_to_edit}") - - # Sanitize attribute keys - attributes_to_edit = _replace_special_characters_in_attribute_keys(attributes_to_edit) - print(f"Attributes after sanitizing keys: {attributes_to_edit}") - - # Encode all string values in latin1 - attributes_to_edit = _encode_values(attributes_to_edit) - print(f"Attributes after encoding: {attributes_to_edit}") - - try: - print("Sending POST request to the server...") - response = requests.post(self._url, data=attributes_to_edit, files=new_attachment_list, - allow_redirects=False, verify=False, timeout=timeout) - print("Response received:", response) - - resp_message, resp_headers, resp_msg_id = _validate_response(response) - print(f"Response message: {resp_message}") - print(f"Response headers: {resp_headers}") - print(f"Response msg_id: {resp_msg_id}") - - # Close file-like objects if needed - for file_like_object in objects_to_close: - if hasattr(file_like_object, 'close'): - file_like_object.close() - - except requests.Timeout as e: - print("Request timed out:", e) - raise LogbookServerTimeout(f"Timeout while posting: {e}") - - except requests.RequestException as e: - print("Request exception:", e) - self._check_if_message_on_server(msg_id) # Raises exceptions if no message or no response from server - raise LogbookServerProblem(f"Cannot access logbook server: {e}") - - if not resp_msg_id or resp_msg_id < 1: - raise LogbookInvalidMessageID(f"Invalid message ID: {resp_msg_id}") - - print(f"Message posted successfully with msg_id: {resp_msg_id}") - return resp_msg_id - - - def read(self, msg_id, timeout=None): - """ - Reads message from the logbook server and returns tuple of (message, attributes, attachments) where: - message: string with message body - attributes: dictionary of all attributes returned by the logbook - attachments: list of urls to attachments on the logbook server - - :param msg_id: ID of the message to be read - :param timeout: The timeout value to be passed to the get request. - :return: message, attributes, attachments - """ - - request_headers = dict() - if self._user or self._password: - request_headers['Cookie'] = self._make_user_and_pswd_cookie() - - try: - self._check_if_message_on_server(msg_id) # raises exceptions if no message or no response from server - response = requests.get(self._url + str(msg_id) + '?cmd=download', headers=request_headers, - allow_redirects=False, verify=False, timeout=timeout) - - # Validate response. If problems Exception will be thrown. - resp_message, resp_headers, resp_msg_id = _validate_response(response) - - - except requests.Timeout as e: - - # Catch here a timeout o the post request. - - # Raise the logbook excetion and let the user handle it - - raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' + - '{1}'.format(sys._getframe().f_code.co_name, e)) - - except requests.RequestException as e: - # If here: message is on server but cannot be downloaded (should never happen) - raise LogbookServerProblem('Cannot access logbook server to read the message with ID: ' + str(msg_id) + - 'because of:\n' + '{0}'.format(e)) - - # Parse message to separate message body, attributes and attachments - attributes = dict() - attachments = list() - - returned_msg = resp_message.decode('iso-8859-1', 'ignore').splitlines() - delimiter_idx = returned_msg.index('========================================') - - message = '\n'.join(returned_msg[delimiter_idx + 1:]) - for line in returned_msg[0:delimiter_idx]: - line = line.split(': ') - data = ''.join(line[1:]) - if line[0] == 'Attachment': - if not data: - # Treat the empty string as special case, - # otherwise the split below returns [""] and attachments is [self._url] - attachments = [] - else: - attachments = data.split(',') - # Here are only attachment names, make a full url out of it, so they could be - # recognisable by others, and downloaded if needed - attachments = [self._url + '{0}'.format(i) for i in attachments] - else: - attributes[line[0]] = data - - return message, attributes, attachments - - def delete_attachment(self, msg_id, text, attributes, attachment_id, timeout=None): - - attributes[f'delatt{attachment_id}'] = 'Delete' - attributes['cmd'] = 'Update' - attributes['exp'] = self.logbook - if self._user: - attributes['unm'] = self._user - if self._password: - attributes['upwd'] = self._password - - just_text = list() - just_text.append(('Text', ('', text.encode('iso-8859-1')))) - try: - response = requests.post(self._url, data=attributes, verify=False, allow_redirects=False, - files=just_text) - except requests.Timeout as e: - # Catch here a timeout o the post request. - # Raise the logbook excetion and let the user handle it - raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' + - '{1}'.format(sys._getframe().f_code.co_name, e)) - except requests.RequestException as e: - # Check if message on server. - self._check_if_message_on_server(msg_id) # raises exceptions if no message or no response from server - - # If here: message is on server but cannot be downloaded (should never happen) - raise LogbookServerProblem('Cannot access logbook server to post a message, ' + 'because of:\n' + - '{0}'.format(e)) - finally: - del attributes[f'delatt{attachment_id}'] - - def delete_all_attachments(self, msg_id, timeout=None): - - message, attributes, attachments = self.read(msg_id, timeout) - n_attach = len(attachments) - for attachment_id in range(n_attach): - self.delete_attachment(msg_id, message, attributes, attachment_id, timeout) - - - def delete(self, msg_id, timeout=None): - """ - Deletes message thread (!!!message + all replies!!!) from logbook. - It also deletes all attachments of corresponding messages from the server. - - :param msg_id: message to be deleted - :param timeout: timeout value to be passed to the get request - :return: - """ - - request_headers = dict() - if self._user or self._password: - request_headers['Cookie'] = self._make_user_and_pswd_cookie() - - try: - self._check_if_message_on_server(msg_id) # check if something to delete - - response = requests.get(self._url + str(msg_id) + '?cmd=Delete&confirm=Yes', headers=request_headers, - allow_redirects=False, verify=False, timeout=timeout) - - _validate_response(response) # raises exception if any other error identified - - except requests.Timeout as e: - # Catch here a timeout o the post request. - # Raise the logbook excetion and let the user handle it - raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' + - '{1}'.format(sys._getframe().f_code.co_name, e)) - - except requests.RequestException as e: - # If here: message is on server but cannot be downloaded (should never happen) - raise LogbookServerProblem('Cannot access logbook server to delete the message with ID: ' + str(msg_id) + - 'because of:\n' + '{0}'.format(e)) - - # Additional validation: If successfully deleted then status_code = 302. In case command was not executed at - # all (not English language --> no download command supported) status_code = 200 and the content is just a - # html page of this whole message. - if response.status_code == 200: - raise LogbookServerProblem('Cannot process delete command (only logbooks in English supported).') - - def search(self, search_term, n_results=20, scope="subtext", timeout=None): - """ - Searches the logbook and returns the message ids. - - :param timeout: timeout value to be passed to the get request - - """ - request_headers = dict() - if self._user or self._password: - request_headers['Cookie'] = self._make_user_and_pswd_cookie() - - # Putting n_results = 0 crashes the elog. also in the web-gui. - n_results = 1 if n_results < 1 else n_results - - params = { - "mode": "full", - "reverse": "1", - "npp": n_results - } - if type(search_term) is dict: - params.update(search_term) - else: - params.update({scope: search_term}) - - # Remove empty entries from params, since ELog will redirect such requests - # and remove them anyway, but the redirect leads to unexpected results - keys = list(params.keys()) - for key in keys: - if params[key] == "": - params.pop(key) - - try: - response = requests.get(self._url, params=params, headers=request_headers, - allow_redirects=False, verify=False, timeout=timeout) - - # Validate response. If problems Exception will be thrown. - _validate_response(response) - resp_message = response - - except requests.Timeout as e: - # Catch here a timeout o the post request. - # Raise the logbook excetion and let the user handle it - raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' + - '{1}'.format(sys._getframe().f_code.co_name, e)) - - except requests.RequestException as e: - # If here: message is on server but cannot be downloaded (should never happen) - raise LogbookServerProblem('Cannot access logbook server to read message ids ' - 'because of:\n' + '{0}'.format(e)) - - from lxml import html - tree = html.fromstring(resp_message.content) - message_ids = tree.xpath('(//tr/td[@class="list1" or @class="list2"][1])/a/@href') - message_ids = [int(m.split("/")[-1]) for m in message_ids] - return message_ids - - def get_last_message_id(self, timeout=None): - ids = self.get_message_ids(timeout) - if len(ids) > 0: - return ids[0] - else: - return None - - def get_message_ids(self, timeout=None): - request_headers = dict() - if self._user or self._password: - request_headers['Cookie'] = self._make_user_and_pswd_cookie() - - try: - response = requests.get(self._url + 'page', headers=request_headers, - allow_redirects=False, verify=False, timeout=timeout) - - # Validate response. If problems Exception will be thrown. - _validate_response(response) - resp_message = response - - except requests.Timeout as e: - # Catch here a timeout o the post request. - # Raise the logbook exception and let the user handle it - raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' + - '{1}'.format(sys._getframe().f_code.co_name, e)) - - except requests.RequestException as e: - # If here: message is on server but cannot be downloaded (should never happen) - raise LogbookServerProblem('Cannot access logbook server to read message ids ' - 'because of:\n' + '{0}'.format(e)) - - from lxml import html - tree = html.fromstring(resp_message.content) - message_ids = tree.xpath('(//tr/td[@class="list1" or @class="list2"][1])/a/@href') - message_ids = [int(m.split("/")[-1]) for m in message_ids] - return message_ids - - def download_attachment(self, url, timeout=None): - """ - Download an attachment from the specified url. - """ - request_headers = dict() - if self._user or self._password: - request_headers['Cookie'] = self._make_user_and_pswd_cookie() - - try: - response = requests.get(url, headers=request_headers, allow_redirects=False, - verify=False, timeout=timeout) - # If there is no message code 200 will be returned (OK) and _validate_response will not recognise it - # but there will be some error in the html code. - resp_message, resp_headers, resp_msg_id = _validate_response(response) - - except requests.Timeout as e: - # Catch here a timeout of the get request. - # Raise the logbook exception and let the user handle it - raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' + - '{1}'.format(sys._getframe().f_code.co_name, e)) - - return resp_message - - def _check_if_message_on_server(self, msg_id, timeout=None): - """Try to load page for specific message. If there is a html tag like then there is no - such message. - - :param msg_id: ID of message to be checked - :params timeout: The value of timeout to be passed to the get request - :return: - """ - - request_headers = dict() - if self._user or self._password: - request_headers['Cookie'] = self._make_user_and_pswd_cookie() - try: - response = requests.get(self._url + str(msg_id), headers=request_headers, allow_redirects=False, - verify=False, timeout=timeout) - - # If there is no message code 200 will be returned (OK) and _validate_response will not recognise it - # but there will be some error in the html code. - resp_message, resp_headers, resp_msg_id = _validate_response(response) - # If there is no message, code 200 will be returned (OK) but there will be some error indication in - # the html code. - if re.findall('.*?', - resp_message.decode('utf-8', 'ignore'), - flags=re.DOTALL): - raise LogbookInvalidMessageID('Message with ID: ' + str(msg_id) + ' does not exist on logbook.') - - except requests.Timeout as e: - # Catch here a timeout o the post request. - # Raise the logbook exception and let the user handle it - raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' + - '{1}'.format(sys._getframe().f_code.co_name, e)) - - except requests.RequestException as e: - raise LogbookServerProblem('No response from the logbook server.\nDetails: ' + '{0}'.format(e)) - - def _add_base_msg_attributes(self, data): - """ - Adds base message attributes which are used by all messages. - :param data: dict of current attributes - :return: content string - """ - data['cmd'] = 'Submit' - data['exp'] = self.logbook - if self._user: - data['unm'] = self._user - if self._password: - data['upwd'] = self._password - - def _prepare_attachments(self, files): - """ - Parses attachments to content objects. Attachments can be: - - file like objects: must have method read() which returns bytes. If it has attribute .name it will be used - for attachment name, otherwise generic attribute name will be used. - - path to the file on disk - - Note that if attachment is an url pointing to the existing Logbook server it will be ignored and no - exceptions will be raised. This can happen if attachments returned with read_method are resend. - - :param files: list of file like objects or paths - :return: two lists: - - one list of prepared attachment in the form of - [ ('attfileN', ('filename', file_object)) ] - - - one list of object to be closed. all files that are passed as string or path are opened by the library - and need to be closed by the library. - """ - prepared = list() - i = 0 - objects_to_close = list() # objects that are created (opened) by elog must be later closed - for file_obj in files: - if hasattr(file_obj, 'read'): - attribute_name = f'attfile{i}' - filename = attribute_name # If file like object has no name specified use this one - candidate_filename = os.path.basename(file_obj.name).replace(' ', '_') - - if candidate_filename: # use only if not empty string - filename = candidate_filename - i += 1 - - elif isinstance(file_obj, str): - # Check if it is: - # - a path to the file --> open file and append - # - an url pointing to the existing Logbook server --> ignore - - filename = "" - attribute_name = "" - - if os.path.isfile(file_obj): - - attribute_name = f'attfile{i}' - file_obj = builtins.open(file_obj, 'rb') - filename = os.path.basename(file_obj.name).replace(' ', '_') - - objects_to_close.append(file_obj) - i += 1 - - elif not file_obj.startswith(self._url): - raise LogbookInvalidAttachmentType('Invalid type of attachment: \"' + file_obj + '\".') - else: - raise LogbookInvalidAttachmentType('Invalid type of attachment[' + str(i) + '].') - - # prepared.append((attribute_name, (filename, file_obj))) - prepared.append((attribute_name, (filename, file_obj))) - - return prepared, objects_to_close - - def _make_user_and_pswd_cookie(self): - """ - prepares user name and password cookie. It is sent in header when posting a message. - :return: user name and password value for the Cookie header - """ - cookie = '' - if self._user: - cookie += 'unm=' + self._user + ';' - if self._password: - cookie += 'upwd=' + self._password + ';' - - return cookie - - def get_parent(self, msg_id, timeout=None): - """ - :return: the message id of the message specify by msg_id - """ - message, attributes, attachments = self.read(msg_id, timeout=timeout) - parent_id = attributes.get('In reply to', None) - if parent_id: - return int(parent_id) - else: - return None - - @staticmethod - def from_string_to_list(children_string): - """ - :return: a list of children starting from a comma separated string of numbers - """ - return [int(child) for child in children_string.split(',')] - - def get_children(self, msg_id, timeout=None): - """ - :return: a list of children of a message. The list could be empty if the message has no children. - """ - m, attributes, a = self.read(msg_id, timeout=timeout) - children_str = attributes.get('Reply to', None) - if children_str is None: - return [] - else: - return self.from_string_to_list(children_str) - - def get_descendants(self, msg_id, timeout=None): - """ - :return: a list with all children of a message recursively. - The list could be empty if the message has no descendant. - """ - all_children = [] - children = self.get_children(msg_id, timeout) - for child in children: - all_children.append(child) - self._recursive_loop(all_children, child, timeout) - return all_children - - - def get_siblings(self, msg_id, timeout=None): - """ - :return: the list of siblings of the message specified by msg_id - """ - parent_id = self.get_parent(msg_id, timeout=None) - if parent_id is None: - return None - return self.get_children(parent_id, timeout) - - - def _recursive_loop(self, cumulative_list, current_child, timeout=None): - """ - Helper function to perform recursive loops - """ - children = self.get_children(current_child, timeout) - for child in children: - cumulative_list.append(child) - self._recursive_loop(cumulative_list, child, timeout) - - def get_ancestors(self, msg_id, timeout=None): - """ - :return: the list of all predecessors up to the first element in the series. The list could be empty if the - message correspoonding to msg_id is already the first element in the series. - """ - anchestors = [] - parent_id = self.get_parent(msg_id, timeout) - while parent_id is not None: - anchestors.append(parent_id) - parent_id = self.get_parent(parent_id, timeout) - return anchestors - -def _remove_reserved_attributes(attributes): - """ - Removes elog reserved attributes (from the attributes dict) that can not be sent. - - :param attributes: dictionary of attributes to be cleaned. - :return: - """ - - if attributes: - attributes.get('$@MID@$', None) - attributes.pop('Date', None) - attributes.pop('Attachment', None) - attributes.pop('Text', None) # Remove this one because it will be send attachment like - - -def _encode_values(attributes): - """ - prepares a dictionary of the attributes with latin1 encoded string values. - - :param attributes: dictionary of attributes to ve encoded - :return: dictionary with encoded string attributes - """ - - encoded_attributes = {} - for key, value in attributes.items(): - if isinstance(value, str): - encoded_attributes[key] = value.encode('iso-8859-1') - else: - encoded_attributes[key] = value - return encoded_attributes - - -def _replace_special_characters_in_attribute_keys(attributes): - """ - Replaces special characters in elog attribute keys by underscore, otherwise attribute values will be erased in - the http request. This is using the same replacement elog itself is using to handle these cases - - :param attributes: dictionary of attributes to be cleaned. - :return: attributes with replaced keys - """ - return {re.sub('[^0-9a-zA-Z]', '_', key): value for key, value in attributes.items()} - - -def _validate_response(response): - """ Validate response of the request.""" - - msg_id = None - - if response.status_code not in [200, 302]: - # 200 --> OK; 302 --> Found - # Html page is returned with error description (handling errors same way as on original client. Looks - # like there is no other way. - - err = re.findall('.*?', - response.content.decode('utf-8', 'ignore'), - flags=re.DOTALL) - - if len(err) > 0: - # Remove html tags - # If part of the message has: Please go back... remove this part since it is an instruction for - # the user when using browser. - err = re.sub('(?:<.*?>)', '', err[0]) - if err: - raise LogbookMessageRejected('Rejected because of: ' + err) - else: - raise LogbookMessageRejected('Rejected because of unknown error.') - - # Other unknown errors - raise LogbookMessageRejected('Rejected because of unknown error.') - else: - location = response.headers.get('Location') - if location is not None: - if 'has moved' in location: - raise LogbookServerProblem('Logbook server has moved to another location.') - elif 'fail' in location: - raise LogbookAuthenticationError('Invalid username or password.') - else: - # returned locations is something like: '/// - # with urllib.parse.urlparse returns attribute path=// - try: - msg_id = int(urllib.parse.urlsplit(location).path.split('/')[-1]) - except ValueError as e: - # it was not possible to get the msg_id. - # this may happen when deleting the last entry of a logbook - msg_id = None - - if b'type=password' in response.content or b'type="password"' in response.content: - # Not too smart to check this way, but no other indication of this kind of error. - # C client does it the same way - raise LogbookAuthenticationError('Invalid username or password.') - - return response.content, response.headers, msg_id - - -def _handle_pswd(password, encrypt=True): - """ - Takes password string and returns password as needed by elog. If encrypt=True then password will be - sha256 encrypted (salt='', rounds=5000). Before returning password, any trailing $5$$ will be removed - independent off encrypt flag. - - :param password: password string - :param encrypt: encrypt password? - :return: elog prepared password - """ - if encrypt and password is not None: - from passlib.hash import sha256_crypt - return sha256_crypt.using(salt='', rounds=5000).hash(password)[4:] - elif password and password.startswith('$5$$'): - return password[4:] - else: - return password \ No newline at end of file