, msg_id = None
- timeout = None
-
- def _check_if_message_on_server(self, msg_id, timeout=None):
- """Try to load page for specific message. If there is a html tag like then there is no
- such message.
-
- :param msg_id: ID of message to be checked
- :params timeout: The value of timeout to be passed to the get request
- :return:
- """
-
- request_headers = dict()
- if self._user or self._password:
- request_headers['Cookie'] = self._make_user_and_pswd_cookie()
- try:
- > response = requests.get(self._url + str(msg_id), headers=request_headers, allow_redirects=False,
- verify=False, timeout=timeout)
-
- .pixi/envs/default/lib/python3.8/site-packages/elog/logbook.py:581:
- _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
- .pixi/envs/default/lib/python3.8/site-packages/requests/api.py:73: in get
- return request("get", url, params=params, **kwargs)
- .pixi/envs/default/lib/python3.8/site-packages/requests/api.py:59: in request
- return session.request(method=method, url=url, **kwargs)
- .pixi/envs/default/lib/python3.8/site-packages/requests/sessions.py:589: in request
- resp = self.send(prep, **send_kwargs)
- .pixi/envs/default/lib/python3.8/site-packages/requests/sessions.py:703: in send
- r = adapter.send(request, **kwargs)
- _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
-
- self =
- request = , stream = False
- timeout = Timeout(connect=None, read=None, total=None), verify = False
- cert = None, proxies = OrderedDict()
-
- def send(
- self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None
- ):
- """Sends PreparedRequest object. Returns Response object.
-
- :param request: The :class:`PreparedRequest ` being sent.
- :param stream: (optional) Whether to stream the request content.
- :param timeout: (optional) How long to wait for the server to send
- data before giving up, as a float, or a :ref:`(connect timeout,
- read timeout) ` tuple.
- :type timeout: float or tuple or urllib3 Timeout object
- :param verify: (optional) Either a boolean, in which case it controls whether
- we verify the server's TLS certificate, or a string, in which case it
- must be a path to a CA bundle to use
- :param cert: (optional) Any user-provided SSL certificate to be trusted.
- :param proxies: (optional) The proxies dictionary to apply to the request.
- :rtype: requests.Response
- """
-
- try:
- conn = self.get_connection_with_tls_context(
- request, verify, proxies=proxies, cert=cert
- )
- except LocationValueError as e:
- raise InvalidURL(e, request=request)
-
- self.cert_verify(conn, request.url, verify, cert)
- url = self.request_url(request, proxies)
- self.add_headers(
- request,
- stream=stream,
- timeout=timeout,
- verify=verify,
- cert=cert,
- proxies=proxies,
- )
-
- chunked = not (request.body is None or "Content-Length" in request.headers)
-
- if isinstance(timeout, tuple):
- try:
- connect, read = timeout
- timeout = TimeoutSauce(connect=connect, read=read)
- except ValueError:
- raise ValueError(
- f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, "
- f"or a single float to set both timeouts to the same value."
- )
- elif isinstance(timeout, TimeoutSauce):
- pass
- else:
- timeout = TimeoutSauce(connect=timeout, read=timeout)
-
- try:
- resp = conn.urlopen(
- method=request.method,
- url=url,
- body=request.body,
- headers=request.headers,
- redirect=False,
- assert_same_host=False,
- preload_content=False,
- decode_content=False,
- retries=self.max_retries,
- timeout=timeout,
- chunked=chunked,
- )
-
- except (ProtocolError, OSError) as err:
- raise ConnectionError(err, request=request)
-
- except MaxRetryError as e:
- if isinstance(e.reason, ConnectTimeoutError):
- # TODO: Remove this in 3.0.0: see #2811
- if not isinstance(e.reason, NewConnectionError):
- raise ConnectTimeout(e, request=request)
-
- if isinstance(e.reason, ResponseError):
- raise RetryError(e, request=request)
-
- if isinstance(e.reason, _ProxyError):
- raise ProxyError(e, request=request)
-
- if isinstance(e.reason, _SSLError):
- # This branch is for urllib3 v1.22 and later.
- raise SSLError(e, request=request)
-
- > raise ConnectionError(e, request=request)
- E requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /demo/None (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))
-
- .pixi/envs/default/lib/python3.8/site-packages/requests/adapters.py:700: ConnectionError
-
- During handling of the above exception, another exception occurred:
-
- mock_screenshot_class =
+ mock_screenshot_class =
@patch("slic.utils.elog.Screenshot")
def test_screenshot(mock_screenshot_class):
@@ -7077,51 +718,227 @@
self.post(message, **kwargs)
slic/utils/elog.py:16: in post
return self._log.post(*args, **kwargs)
- .pixi/envs/default/lib/python3.8/site-packages/elog/logbook.py:307: in post
- self._check_if_message_on_server(msg_id) # raises exceptions if no message or no response from server
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
- self = , msg_id = None
- timeout = None
+ self =
+ message = 'SCREENSHOT_INTEGRATION_TEST_MSG_456', msg_id = None, reply = False
+ attributes = {'Author': 'robot', 'When': 1754992241, 'cmd': 'Submit', 'exp': 'demo', ...}
+ attachments = ['/tmp/tmpb3lom6st.png'], suppress_email_notification = False
+ encoding = None, timeout = None, kwargs = {'Author': 'robot'}
+ new_attachment_list = [('attfile0', ('tmpb3lom6st.png', <_io.BufferedReader name='/tmp/tmpb3lom6st.png'>)), ('Text', ('', b'SCREENSHOT_INTEGRATION_TEST_MSG_456'))]
+ objects_to_close = [<_io.BufferedReader name='/tmp/tmpb3lom6st.png'>]
+ attributes_to_edit = {'Author': b'robot', 'When': 1754992241, 'cmd': b'Submit', 'exp': b'demo', ...}
- def _check_if_message_on_server(self, msg_id, timeout=None):
- """Try to load page for specific message. If there is a html tag like | then there is no
- such message.
+ def post(self, message, msg_id=None, reply=False, attributes=None, attachments=None,
+ suppress_email_notification=False, encoding=None, timeout=None, **kwargs):
+ """
+ Posts message to the logbook. If msg_id is not specified new message will be created, otherwise existing
+ message will be edited, or a reply (if reply=True) to it will be created. This method returns the msg_id
+ of the newly created message.
- :param msg_id: ID of message to be checked
- :params timeout: The value of timeout to be passed to the get request
- :return:
+ :param message: string with message text
+ :param msg_id: ID number of message to edit or reply. If not specified new message is created.
+ :param reply: If 'True' reply to existing message is created instead of editing it
+ :param attributes: Dictionary of attributes. Following attributes are used internally by the elog and will be
+ ignored: Text, Date, Encoding, Reply to, In reply to, Locked by, Attachment
+ :param attachments: list of:
+ - file like objects which read() will return bytes (if file_like_object.name is not
+ defined, default name "attachment" will be used.
+ - paths to the files
+ All items will be appended as attachment to the elog entry. In case of unknown
+ attachment an exception LogbookInvalidAttachment will be raised.
+ :param suppress_email_notification: If set to True or 1, E-Mail notification will be suppressed, defaults to False.
+ :param encoding: Defines encoding of the message. Can be: 'plain' -> plain text, 'html'->html-text,
+ 'ELCode' --> elog formatting syntax
+ :param timeout: Define the timeout to be used by the post request. Its value is directly passed to the requests
+ post. Use None to disable the request timeout.
+ :param kwargs: Anything in the kwargs will be interpreted as attribute. e.g.: logbook.post('Test text',
+ Author='Rok Vintar), "Author" will be sent as an attribute. If named same as one of the
+ attributes defined in "attributes", kwargs will have priority.
+
+ :return: msg_id
"""
- request_headers = dict()
- if self._user or self._password:
- request_headers['Cookie'] = self._make_user_and_pswd_cookie()
- try:
- response = requests.get(self._url + str(msg_id), headers=request_headers, allow_redirects=False,
- verify=False, timeout=timeout)
+ attributes = attributes or {}
+ attributes = {**attributes, **kwargs} # kwargs as attributes with higher priority
- # If there is no message code 200 will be returned (OK) and _validate_response will not recognise it
- # but there will be some error in the html code.
+ attachments = attachments or []
+
+ if encoding is not None:
+ if encoding not in ['plain', 'HTML', 'ELCode']:
+ raise LogbookMessageRejected('Invalid message encoding. Valid options: plain, HTML, ELCode.')
+ attributes['Encoding'] = encoding
+
+ if suppress_email_notification:
+ attributes["suppress"] = 1
+
+ # THE ATTACHMENT STRATEGY WHEN DEALING WITH POST MODIFICATION
+ #
+ # 1. Does the message on the server have already attachments?
+ # 1.1 - We read the message getting the existing attachment list.
+ # 1.2 - Add to the attributes dictionary one line for each attachment like this:
+ # attributes['attachmentN'] = timestamped_filename_name
+ #
+ # 2. Do we have new attachments?
+ # 2.1 - Those are in the new_attachment_list. This is a list of this type:
+ # [ ('attfileN', ('filename', fileobject)) ]
+ # 2.2 - We need to loop over all the new attachments:
+ # 2.2.1 - Does a file already on the server with the same name exist?
+ # 2.2.1.1 - No: OK. Then we go ahead with the next attachment.
+ # 2.2.1.2 - Yes:
+ # 2.2.1.2.1 - Are the two files identical?
+ # 2.2.1.2.1.1 - Yes: then we remove this current entry from the new_attachment_list and we leave the one
+ # already on server.
+ # 2.2.1.2.1.2 - No:
+ # 2.2.1.2.1.2.1 - Then the file has been update.
+ # 2.2.1.2.1.2.2 - We need to remove the file on server first (using special post)
+ # 2.2.1.2.1.2.3 - We have to remove the old attachment from the attributes dictionary.
+ #
+
+ if attachments:
+ # here we accomplish point 2.1.
+ # new_attachment_list is something like [ ('attfileN', ('filename', fileobject)) ]
+ new_attachment_list, objects_to_close = self._prepare_attachments(attachments)
+ else:
+ objects_to_close = list()
+ new_attachment_list = list()
+
+ attributes_to_edit = dict()
+ if msg_id:
+ # Message exists, we can continue
+ if reply:
+ # Verify that there is a message on the server, otherwise do not reply to it!
+ self._check_if_message_on_server(msg_id) # raises exception in case of none existing message
+ attributes['reply_to'] = str(msg_id)
+ else: # Edit existing
+ attributes['edit_id'] = str(msg_id)
+ attributes['skiplock'] = '1'
+
+ # here we accomplish point 1.1.
+ # existing_attachments_list is something like:
+ # [ 'https://elog.url.com/logbook/timestamped_filename' ]
+ msg_to_edit, attributes_to_edit, existing_attachments_list = self.read(msg_id)
+
+ for attribute, data in attributes.items():
+ new_data = attributes.get(attribute)
+ if new_data is not None:
+ attributes_to_edit[attribute] = new_data
+
+ i = 0
+ existing_attachments_filename_list = list()
+ for attachment in existing_attachments_list:
+ # here we accomplish point 1.2. We strip the timestamped_filename from the whole URL.
+ attributes_to_edit[f'attachment{i}'] = os.path.basename(attachment)
+ existing_attachments_filename_list.append(os.path.basename(attachment)[14:])
+ i += 1
+
+ # let's accomplish 2.2. Loop over all new attachment
+ duplicate_attachment_list = list()
+ for new_attachment in new_attachment_list:
+ # the new_attachment_list is something like:
+ # [ ('attfileN', ('filename', fileobject)) ]
+ new_attachment_filename = new_attachment[1][0]
+ if new_attachment_filename in existing_attachments_filename_list:
+ # a file with the same name existing already on the server.
+ # we need to check if the two files are the same.
+ # read the content of the new file
+ new_attachment_content = new_attachment[1][1].read()
+ # don't forget to reset the fileobj to the beginning of the file
+ new_attachment[1][1].seek(0)
+ # get the existing attachment content
+ attachment_index = existing_attachments_filename_list.index(new_attachment_filename)
+ existing_attachment_content = self.download_attachment(
+ url=existing_attachments_list[attachment_index],
+ timeout=timeout
+ )
+ # check if the two contents are the same
+ if new_attachment_content == existing_attachment_content:
+ # yes. then we don't upload a second copy. we remove the current entry from the list
+ duplicate_attachment_list.append(new_attachment)
+ else:
+ # no. they are not the same file. we will replace the existing file with the new one
+ # first: we need to remove the attachment from the server using the dedicated method
+ self.delete_attachment(msg_id, attributes=attributes_to_edit,
+ attachment_id=attachment_index,
+ timeout=timeout, text=msg_to_edit)
+ # now we can remove this attachment from the auxiliary lists.
+ existing_attachments_filename_list.pop(attachment_index)
+ existing_attachments_list.pop(attachment_index)
+ # now we need to rebuild the attributes dictionary for the part concerning the attachments.
+ # we remove all of them first
+ keys_to_be_removed = list()
+ for key in attributes_to_edit.keys():
+ if key.startswith('attachment'):
+ keys_to_be_removed.append(key)
+ if key.startswith('delatt'):
+ keys_to_be_removed.append(key)
+ for key in keys_to_be_removed:
+ del attributes_to_edit[key]
+
+ # now we rebuild it
+ for i, attachment in enumerate(existing_attachments_list):
+ attributes_to_edit[f'attachment{i}'] = os.path.basename(attachment)
+
+ # remove all duplicate attachments from the new_attachment_list
+ for attach in duplicate_attachment_list:
+ new_attachment_list.remove(attach)
+
+ else:
+ # As we create a new message, specify creation time if not already specified in attributes
+ if 'When' not in attributes:
+ attributes['When'] = int(datetime.now().timestamp())
+
+ if not attributes_to_edit:
+ attributes_to_edit = attributes
+
+ # Remove any attributes that should not be sent
+ _remove_reserved_attributes(attributes_to_edit)
+
+ # Make requests module think that Text is a "file". This is the only way to force requests to send data as
+ # multipart/form-data even if there are no attachments. Elog understands only multipart/form-data
+ new_attachment_list.append(('Text', ('', message.encode('iso-8859-1'))))
+
+ # Base attributes are common to all messages
+ self._add_base_msg_attributes(attributes_to_edit)
+
+ # Keys in attributes cannot have certain characters like whitespaces or dashes for the http request
+ attributes_to_edit = _replace_special_characters_in_attribute_keys(attributes_to_edit)
+
+ # All string values in the attributes must be encoded in latin1
+ attributes_to_edit = _encode_values(attributes_to_edit)
+
+ try:
+ response = requests.post(self._url, data=attributes_to_edit, files=new_attachment_list,
+ allow_redirects=False, verify=False, timeout=timeout)
+
+ # Validate response. Any problems will raise an Exception.
resp_message, resp_headers, resp_msg_id = _validate_response(response)
- # If there is no message, code 200 will be returned (OK) but there will be some error indication in
- # the html code.
- if re.findall(' | .*? | ',
- resp_message.decode('utf-8', 'ignore'),
- flags=re.DOTALL):
- raise LogbookInvalidMessageID('Message with ID: ' + str(msg_id) + ' does not exist on logbook.')
+
+ # Close file like objects that were opened by the elog (if path
+ for file_like_object in objects_to_close:
+ if hasattr(file_like_object, 'close'):
+ file_like_object.close()
except requests.Timeout as e:
# Catch here a timeout o the post request.
- # Raise the logbook exception and let the user handle it
+ # Raise the logbook excetion and let the user handle it
raise LogbookServerTimeout('{0} method cannot be completed because of a network timeout:\n' +
'{1}'.format(sys._getframe().f_code.co_name, e))
except requests.RequestException as e:
- > raise LogbookServerProblem('No response from the logbook server.\nDetails: ' + '{0}'.format(e))
- E elog.logbook_exceptions.LogbookServerProblem: No response from the logbook server.
- E Details: HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /demo/None (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))
+ # Check if message on server.
+ self._check_if_message_on_server(msg_id) # raises exceptions if no message or no response from server
+
+ # If here: message is on server but cannot be downloaded (should never happen)
+ raise LogbookServerProblem('Cannot access logbook server to post a message, ' + 'because of:\n' +
+ '{0}'.format(e))
+
+ # Any error before here should raise an exception, but check again for nay case.
+ if not resp_msg_id or resp_msg_id < 1:
+ > raise LogbookInvalidMessageID('Invalid message ID: ' + str(resp_msg_id) + ' returned')
+ E elog.logbook_exceptions.LogbookInvalidMessageID: Invalid message ID: None returned
- .pixi/envs/default/lib/python3.8/site-packages/elog/logbook.py:601: LogbookServerProblem
+ .pixi/envs/default/lib/python3.8/site-packages/elog/logbook.py:315: LogbookInvalidMessageID
```
**_*๐ Teardown phase*_**
@@ -7129,7 +946,155 @@
**duration:**
```python
- 0.000262250192463398
+ 0.0001628049649298191
+ ```
+
+ **outcome:**
+
+ ```python
+ passed
+ ```
+