Deploy site

This commit is contained in:
Gitea Actions
2025-06-09 03:00:56 +02:00
commit a96b026468
2329 changed files with 367195 additions and 0 deletions

View File

@ -0,0 +1,74 @@
from ._elasticsearch import ElasticsearchWriter
from ._null import NullTableWriter
from ._table_writer import AbstractTableWriter
from .binary import (
ExcelXlsTableWriter,
ExcelXlsxTableWriter,
PandasDataFramePickleWriter,
SqliteTableWriter,
)
from .text import (
AsciiDocTableWriter,
BoldUnicodeTableWriter,
BorderlessTableWriter,
CssTableWriter,
CsvTableWriter,
HtmlTableWriter,
JsonLinesTableWriter,
JsonTableWriter,
LatexMatrixWriter,
LatexTableWriter,
LtsvTableWriter,
MarkdownTableWriter,
MediaWikiTableWriter,
RstCsvTableWriter,
RstGridTableWriter,
RstSimpleTableWriter,
SpaceAlignedTableWriter,
TomlTableWriter,
TsvTableWriter,
UnicodeTableWriter,
YamlTableWriter,
)
from .text.sourcecode import (
JavaScriptTableWriter,
NumpyTableWriter,
PandasDataFrameWriter,
PythonCodeTableWriter,
)
__all__ = (
"AbstractTableWriter",
"AsciiDocTableWriter",
"BoldUnicodeTableWriter",
"BorderlessTableWriter",
"CssTableWriter",
"CsvTableWriter",
"ElasticsearchWriter",
"ExcelXlsTableWriter",
"ExcelXlsxTableWriter",
"HtmlTableWriter",
"JavaScriptTableWriter",
"JsonLinesTableWriter",
"JsonTableWriter",
"LatexMatrixWriter",
"LatexTableWriter",
"LtsvTableWriter",
"MarkdownTableWriter",
"MediaWikiTableWriter",
"NullTableWriter",
"NumpyTableWriter",
"PandasDataFramePickleWriter",
"PandasDataFrameWriter",
"PythonCodeTableWriter",
"RstCsvTableWriter",
"RstGridTableWriter",
"RstSimpleTableWriter",
"SpaceAlignedTableWriter",
"SqliteTableWriter",
"TomlTableWriter",
"TsvTableWriter",
"UnicodeTableWriter",
"YamlTableWriter",
)

View File

@ -0,0 +1,12 @@
from textwrap import dedent
HEADER_ROW = -1
import_error_msg_template = dedent(
"""\
dependency packages for {0} not found.
you can install the dependencies with 'pip install pytablewriter[{0}]'
"""
)

View File

@ -0,0 +1,205 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import copy
from collections.abc import Generator
from typing import Any
import dataproperty
from dataproperty import ColumnDataProperty
from typepy import Typecode
from ..error import EmptyValueError
from ._msgfy import to_error_message
from ._table_writer import AbstractTableWriter
DataType = dict[str, str]
Properties = dict[str, DataType]
def _get_es_datatype(column_dp: ColumnDataProperty) -> DataType:
if column_dp.typecode in (
Typecode.NONE,
Typecode.NULL_STRING,
Typecode.INFINITY,
Typecode.NAN,
):
return {"type": "keyword"}
if column_dp.typecode == Typecode.STRING:
return {"type": "text"}
if column_dp.typecode == Typecode.DATETIME:
return {"type": "date", "format": "date_optional_time"}
if column_dp.typecode == Typecode.REAL_NUMBER:
return {"type": "double"}
if column_dp.typecode == Typecode.BOOL:
return {"type": "boolean"}
if column_dp.typecode == Typecode.IP_ADDRESS:
return {"type": "ip"}
if column_dp.typecode == Typecode.INTEGER:
assert column_dp.bit_length is not None
if column_dp.bit_length <= 8:
return {"type": "byte"}
elif column_dp.bit_length <= 16:
return {"type": "short"}
elif column_dp.bit_length <= 32:
return {"type": "integer"}
elif column_dp.bit_length <= 64:
return {"type": "long"}
raise ValueError(
f"too large integer bits: expected<=64bits, actual={column_dp.bit_length:d}bits"
)
raise ValueError(f"unknown typecode: {column_dp.typecode}")
class ElasticsearchWriter(AbstractTableWriter):
"""
A table writer class for Elasticsearch.
:Dependency Packages:
- `elasticsearch-py <https://github.com/elastic/elasticsearch-py>`__
.. py:attribute:: index_name
:type: str
Alias attribute for |table_name|.
.. py:attribute:: document_type
:type: str
:value: "table"
Specify document type for indices.
.. py:method:: write_table()
Create an index and put documents for each row to Elasticsearch.
You need to pass an
`elasticsearch.Elasticsearch <https://elasticsearch-py.rtfd.io/en/master/api.html#elasticsearch>`__
instance to |stream| before calling this method.
|table_name|/:py:attr:`~pytablewriter.ElasticsearchWriter.index_name`
used as the creating index name,
invalid characters in the name are replaced with underscores (``'_'``).
Document data types for documents are automatically detected from the data.
:raises ValueError:
If the |stream| has not elasticsearch.Elasticsearch instance.
:Example:
:ref:`example-elasticsearch-table-writer`
"""
FORMAT_NAME = "elasticsearch"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
@property
def table_name(self) -> str:
return super().table_name
@table_name.setter
def table_name(self, value: str) -> None:
from pathvalidate import ErrorReason, ValidationError
from ..sanitizer import ElasticsearchIndexNameSanitizer
try:
self._table_name = ElasticsearchIndexNameSanitizer(value).sanitize(replacement_text="_")
except ValidationError as e:
if e.reason is ErrorReason.NULL_NAME:
self._table_name = ""
else:
raise
@property
def index_name(self) -> str:
return self.table_name
@index_name.setter
def index_name(self, value: str) -> None:
self.table_name = value
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.stream = None
self.is_padding = False
self.is_formatting_float = False
self._is_require_table_name = True
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
self._dp_extractor.type_value_map = copy.deepcopy(dataproperty.DefaultValue.TYPE_VALUE_MAP)
self.document_type = "table"
def write_null_line(self) -> None:
pass
def _get_mappings(self) -> dict[str, dict[str, dict[str, Properties]]]:
properties: Properties = {}
for header, column_dp in zip(self.headers, self._column_dp_list):
properties[header] = _get_es_datatype(column_dp)
return {"mappings": {self.document_type: {"properties": properties}}}
def _get_body(self) -> Generator:
str_datatype = (Typecode.DATETIME, Typecode.IP_ADDRESS, Typecode.INFINITY, Typecode.NAN)
for value_dp_list in self._table_value_dp_matrix:
values = [
value_dp.data if value_dp.typecode not in str_datatype else value_dp.to_str()
for value_dp in value_dp_list
]
yield dict(zip(self.headers, values))
def _write_table(self, **kwargs: Any) -> None:
import elasticsearch as es
if not isinstance(self.stream, es.Elasticsearch):
raise ValueError("stream must be an elasticsearch.Elasticsearch instance")
try:
self._verify_value_matrix()
except EmptyValueError:
self._logger.logger.debug("no tabular data found")
return
self._preprocess()
mappings = self._get_mappings()
try:
result = self.stream.indices.create(index=self.index_name, body=mappings)
self._logger.logger.debug(result)
except es.TransportError as e:
for error in e.errors:
if error == "index_already_exists_exception":
# ignore already existing index
self._logger.logger.debug(to_error_message(e))
else:
raise
for body in self._get_body():
try:
self.stream.index(index=self.index_name, body=body)
except es.exceptions.RequestError as e:
self._logger.logger.error(f"{to_error_message(e)}, body={body}")
def _write_value_row_separator(self) -> None:
pass

View File

@ -0,0 +1,86 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import abc
from typing import IO, Any, Union
class TableWriterInterface(metaclass=abc.ABCMeta):
"""
Interface class for writing a table.
"""
@property
@abc.abstractmethod
def format_name(self) -> str: # pragma: no cover
"""Format name for the writer.
Returns:
|str|
"""
@property
@abc.abstractmethod
def support_split_write(self) -> bool: # pragma: no cover
"""Indicates whether the writer class supports iterative table writing (``write_table_iter``) method.
Returns:
bool: |True| if the writer supported iterative table writing.
"""
@abc.abstractmethod
def write_table(self, **kwargs: Any) -> None: # pragma: no cover
"""
|write_table|.
"""
def dump(
self, output: Union[str, IO], close_after_write: bool, **kwargs: Any
) -> None: # pragma: no cover
raise NotImplementedError(f"{self.format_name} writer did not support dump method")
def dumps(self) -> str: # pragma: no cover
raise NotImplementedError(f"{self.format_name} writer did not support dumps method")
def write_table_iter(self, **kwargs: Any) -> None: # pragma: no cover
"""
Write a table with iteration.
"Iteration" means that divide the table writing into multiple writes.
This method is helpful, especially for extensive data.
The following are the premises to execute this method:
- set iterator to the |value_matrix|
- set the number of iterations to the |iteration_length| attribute
Call back function (Optional):
A callback function is called when each iteration of writing a table is completed.
You can set a callback function via the |write_callback| attribute.
Raises:
pytablewriter.NotSupportedError: If the writer class does not support this method.
.. note::
The following classes do not support this method:
- |HtmlTableWriter|
- |RstGridTableWriter|
- |RstSimpleTableWriter|
``support_split_write`` attribute return |True| if the class
is supporting this method.
"""
self._write_table_iter(**kwargs)
@abc.abstractmethod
def _write_table_iter(self, **kwargs: Any) -> None: # pragma: no cover
pass
@abc.abstractmethod
def close(self) -> None: # pragma: no cover
pass
@abc.abstractmethod
def _write_value_row_separator(self) -> None: # pragma: no cover
pass

View File

@ -0,0 +1,56 @@
"""
Import from https://github.com/thombashi/msgfy
"""
import inspect
import os.path
from types import FrameType
from typing import Optional
DEFAULT_ERROR_MESSAGE_FORMAT = "{exception}: {error_msg}"
DEFAULT_DEBUG_MESSAGE_FORMAT = "{exception} {file_name}({line_no}) {func_name}: {error_msg}"
error_message_format = DEFAULT_ERROR_MESSAGE_FORMAT
debug_message_format = DEFAULT_DEBUG_MESSAGE_FORMAT
def _to_message(exception_obj: Exception, format_str: str, frame: Optional[FrameType]) -> str:
if not isinstance(exception_obj, Exception):
raise ValueError("exception_obj must be an instance of a subclass of the Exception class")
if frame is None:
return str(exception_obj)
try:
return (
format_str.replace("{exception}", exception_obj.__class__.__name__)
.replace("{file_name}", os.path.basename(frame.f_code.co_filename))
.replace("{line_no}", str(frame.f_lineno))
.replace("{func_name}", frame.f_code.co_name)
.replace("{error_msg}", str(exception_obj))
)
except AttributeError:
raise ValueError("format_str must be a string")
def to_error_message(exception_obj: Exception, format_str: Optional[str] = None) -> str:
if not format_str:
format_str = error_message_format
frame = inspect.currentframe()
if frame is None:
return str(exception_obj)
return _to_message(exception_obj, format_str, frame.f_back)
def to_debug_message(exception_obj: Exception, format_str: Optional[str] = None) -> str:
if not format_str:
format_str = debug_message_format
frame = inspect.currentframe()
if frame is None:
return str(exception_obj)
return _to_message(exception_obj, format_str, frame.f_back)

View File

@ -0,0 +1,61 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
from typing import IO, Any, Union
from ._interface import TableWriterInterface
from .text._interface import IndentationInterface, TextWriterInterface
class NullTableWriter(IndentationInterface, TextWriterInterface, TableWriterInterface):
FORMAT_NAME = "null"
def __init__(self, **kwargs: Any) -> None:
self.table_name = kwargs.get("table_name", "")
self.value_matrix = kwargs.get("value_matrix", [])
self.is_formatting_float = kwargs.get("is_formatting_float", True)
self.headers = kwargs.get("headers", [])
self.type_hints = kwargs.get("type_hints", [])
self.max_workers = kwargs.get("max_workers", 1)
def __repr__(self) -> str:
return self.dumps()
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def set_indent_level(self, indent_level: int) -> None:
pass
def inc_indent_level(self) -> None:
pass
def dec_indent_level(self) -> None:
pass
def write_null_line(self) -> None:
pass
def write_table(self, **kwargs: Any) -> None:
pass
def dump(self, output: Union[str, IO], close_after_write: bool = True, **kwargs: Any) -> None:
pass
def dumps(self) -> str:
return ""
def _write_table_iter(self, **kwargs: Any) -> None:
pass
def close(self) -> None:
pass
def _write_value_row_separator(self) -> None:
pass

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,11 @@
from ._excel import ExcelXlsTableWriter, ExcelXlsxTableWriter
from ._pandas import PandasDataFramePickleWriter
from ._sqlite import SqliteTableWriter
__all__ = (
"ExcelXlsTableWriter",
"ExcelXlsxTableWriter",
"PandasDataFramePickleWriter",
"SqliteTableWriter",
)

View File

@ -0,0 +1,501 @@
import abc
import copy
import warnings
from typing import IO, TYPE_CHECKING, Any, Final, Optional, Union, cast
import dataproperty
import typepy
from dataproperty import DataProperty
from tabledata import TableData
from typepy import Integer
from .._common import import_error_msg_template
from ._excel_workbook import ExcelWorkbookInterface, ExcelWorkbookXls, ExcelWorkbookXlsx
from ._interface import AbstractBinaryTableWriter
if TYPE_CHECKING:
from xlwt import XFStyle
class ExcelTableWriter(AbstractBinaryTableWriter, metaclass=abc.ABCMeta):
"""
An abstract class of a table writer for Excel file format.
"""
FORMAT_NAME = "excel"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def workbook(self) -> Optional[ExcelWorkbookInterface]:
return self._workbook
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._workbook: Optional[ExcelWorkbookInterface] = None
self._dp_extractor.type_value_map = {
typepy.Typecode.INFINITY: "Inf",
typepy.Typecode.NAN: "NaN",
}
self._first_header_row = 0
self._last_header_row = self.first_header_row
self._first_data_row = self.last_header_row + 1
self._first_data_col = 0
self._last_data_row: Optional[int] = None
self._last_data_col: Optional[int] = None
self._current_data_row = self._first_data_row
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
self._quoting_flags[typepy.Typecode.DATETIME] = True
@property
def first_header_row(self) -> int:
"""int: Index of the first row of the header.
.. note:: |excel_attr|
"""
return self._first_header_row
@property
def last_header_row(self) -> int:
"""int: Index of the last row of the header.
.. note:: |excel_attr|
"""
return self._last_header_row
@property
def first_data_row(self) -> int:
"""int: Index of the first row of the data (table body).
.. note:: |excel_attr|
"""
return self._first_data_row
@property
def last_data_row(self) -> Optional[int]:
"""int: Index of the last row of the data (table body).
.. note:: |excel_attr|
"""
return self._last_data_row
@property
def first_data_col(self) -> int:
"""int: Index of the first column of the table.
.. note:: |excel_attr|
"""
return self._first_data_col
@property
def last_data_col(self) -> Optional[int]:
"""int: Index of the last column of the table.
.. note:: |excel_attr|
"""
return self._last_data_col
def is_opened(self) -> bool:
return self.workbook is not None
def open(self, file_path: str) -> None:
"""
Open an Excel workbook file.
:param str file_path: Excel workbook file path to open.
"""
if self.workbook and self.workbook.file_path == file_path:
self._logger.logger.debug(f"workbook already opened: {self.workbook.file_path}")
return
self.close()
self._open(file_path)
@abc.abstractmethod
def _open(self, workbook_path: str) -> None: # pragma: no cover
pass
def close(self) -> None:
"""
Close the current workbook.
"""
if self.is_opened():
self.workbook.close() # type: ignore
self._workbook = None
def from_tabledata(self, value: TableData, is_overwrite_table_name: bool = True) -> None:
"""
Set following attributes from |TableData|
- :py:attr:`~.table_name`.
- :py:attr:`~.headers`.
- :py:attr:`~.value_matrix`.
And create worksheet named from :py:attr:`~.table_name` ABC
if not existed yet.
:param tabledata.TableData value: Input table data.
"""
super().from_tabledata(value)
if self.is_opened():
self.make_worksheet(self.table_name)
def make_worksheet(self, sheet_name: Optional[str] = None) -> None:
"""Make a worksheet to the current workbook.
Args:
sheet_name (str):
Name of the worksheet to create. The name will be automatically generated
(like ``"Sheet1"``) if the ``sheet_name`` is empty.
"""
if sheet_name is None:
sheet_name = self.table_name
if not sheet_name:
sheet_name = ""
self._stream = self.workbook.add_worksheet(sheet_name) # type: ignore
self._current_data_row = self._first_data_row
def dump(self, output: Union[str, IO], close_after_write: bool = True, **kwargs: Any) -> None:
"""Write a worksheet to the current workbook.
Args:
output (str):
Path to the workbook file to write.
close_after_write (bool, optional):
Close the workbook after write.
Defaults to |True|.
"""
if not isinstance(output, str):
raise TypeError(f"output must be a str: actual={type(output)}")
self.open(output)
try:
self.make_worksheet(self.table_name)
self.write_table(**kwargs)
finally:
if close_after_write:
self.close()
@abc.abstractmethod
def _write_header(self) -> None:
pass
@abc.abstractmethod
def _write_cell(self, row: int, col: int, value_dp: DataProperty) -> None:
pass
def _write_table(self, **kwargs: Any) -> None:
self._preprocess_table_dp()
self._preprocess_table_property()
self._write_header()
self._write_value_matrix()
self._postprocess()
def _write_value_matrix(self) -> None:
for value_dp_list in self._table_value_dp_matrix:
for col_idx, value_dp in enumerate(value_dp_list):
self._write_cell(self._current_data_row, col_idx, value_dp)
self._current_data_row += 1
def _get_last_column(self) -> int:
if typepy.is_not_empty_sequence(self.headers):
return len(self.headers) - 1
if typepy.is_not_empty_sequence(self.value_matrix):
return len(self.value_matrix[0]) - 1
raise ValueError("data not found")
def _postprocess(self) -> None:
self._last_data_row = self._current_data_row
self._last_data_col = self._get_last_column()
class ExcelXlsTableWriter(ExcelTableWriter):
"""
A table writer class for Excel file format: ``.xls`` (older or equal to Office 2003).
``xlwt`` package required to use this class.
.. py:method:: write_table()
Write a table to the current opened worksheet.
:raises IOError: If failed to write data to the worksheet.
.. note::
Specific values in the tabular data are converted when writing:
- |None|: written as an empty string
- |inf|: written as ``Inf``
- |nan|: written as ``NaN``
"""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.__col_style_table: dict[int, Any] = {}
def _open(self, workbook_path: str) -> None:
self._workbook = ExcelWorkbookXls(workbook_path)
def _write_header(self) -> None:
if not self.is_write_header or typepy.is_empty_sequence(self.headers):
return
for col, value in enumerate(self.headers):
self.stream.write(self.first_header_row, col, value)
def _write_cell(self, row: int, col: int, value_dp: DataProperty) -> None:
if value_dp.typecode in [typepy.Typecode.REAL_NUMBER]:
try:
cell_style = self.__get_cell_style(col)
except ValueError:
pass
else:
self.stream.write(row, col, value_dp.data, cell_style)
return
self.stream.write(row, col, value_dp.data)
def _postprocess(self) -> None:
super()._postprocess()
self.__col_style_table = {}
def __get_cell_style(self, col: int) -> "XFStyle":
try:
import xlwt
except ImportError:
warnings.warn(import_error_msg_template.format("excel"))
raise
if col in self.__col_style_table:
return self.__col_style_table.get(col) # type: ignore
try:
col_dp = self._column_dp_list[col]
except KeyError:
return {} # type: ignore
if col_dp.typecode not in [typepy.Typecode.REAL_NUMBER]:
raise ValueError()
if not Integer(col_dp.minmax_decimal_places.max_value).is_type():
raise ValueError()
float_digit = col_dp.minmax_decimal_places.max_value
if float_digit is None or float_digit <= 0:
raise ValueError()
num_format_str = "#,{:s}0.{:s}".format("#" * int(float_digit), "0" * int(float_digit))
cell_style = xlwt.easyxf(num_format_str=num_format_str)
self.__col_style_table[col] = cell_style
return cell_style
class ExcelXlsxTableWriter(ExcelTableWriter):
"""
A table writer class for Excel file format: ``.xlsx`` (newer or equal to Office 2007).
.. py:method:: write_table()
Write a table to the current opened worksheet.
:raises IOError: If failed to write data to the worksheet.
Examples:
:ref:`example-excel-table-writer`
.. note::
Specific values in the tabular data are converted when writing:
- |None|: written as an empty string
- |inf|: written as ``Inf``
- |nan|: written as ``NaN``
"""
MAX_CELL_WIDTH: Final[int] = 60
class TableFormat:
HEADER: Final = "header"
CELL: Final = "cell"
NAN: Final = "nan"
class Default:
FONT_NAME: Final[str] = "MS Gothic"
FONT_SIZE: Final[int] = 9
CELL_FORMAT: Final[dict[str, Union[int, str, bool]]] = {
"font_name": FONT_NAME,
"font_size": FONT_SIZE,
"align": "top",
"text_wrap": True,
"top": 1,
"left": 1,
"bottom": 1,
"right": 1,
}
HEADER_FORMAT: Final[dict[str, Union[int, str, bool]]] = {
"font_name": FONT_NAME,
"font_size": FONT_SIZE,
"bg_color": "#DFDFFF",
"bold": True,
"left": 1,
"right": 1,
}
NAN_FORMAT: Final[dict[str, Union[int, str, bool]]] = {
"font_name": FONT_NAME,
"font_size": FONT_SIZE,
"font_color": "silver",
"top": 1,
"left": 1,
"bottom": 1,
"right": 1,
}
@property
def __nan_format_property(self) -> dict[str, Union[int, str, bool]]:
return self.format_table.get(self.TableFormat.NAN, self.default_format)
@property
def __cell_format_property(self) -> dict[str, Union[int, str, bool]]:
return self.format_table.get(self.TableFormat.CELL, self.default_format)
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.default_format = self.Default.CELL_FORMAT
self.format_table = {
self.TableFormat.CELL: self.Default.CELL_FORMAT,
self.TableFormat.HEADER: self.Default.HEADER_FORMAT,
self.TableFormat.NAN: self.Default.NAN_FORMAT,
}
self.__col_cell_format_cache: dict[int, Any] = {}
self.__col_numprops_table: dict[int, dict[str, str]] = {}
def _open(self, workbook_path: str) -> None:
self._workbook = ExcelWorkbookXlsx(workbook_path)
def _write_header(self) -> None:
if not self.is_write_header or typepy.is_empty_sequence(self.headers):
return
header_format_props = self.format_table.get(self.TableFormat.HEADER, self.default_format)
header_format = self.__add_format(header_format_props)
self.stream.write_row(
row=self.first_header_row, col=0, data=self.headers, cell_format=header_format
)
for row in range(self.first_header_row, self.last_header_row):
self.stream.write_row(
row=row, col=0, data=[""] * len(self.headers), cell_format=header_format
)
def _write_cell(self, row: int, col: int, value_dp: DataProperty) -> None:
base_props = dict(self.__cell_format_property)
format_key = f"{col:d}_{value_dp.typecode.name:s}"
if value_dp.typecode in [typepy.Typecode.INTEGER, typepy.Typecode.REAL_NUMBER]:
num_props = self.__get_number_property(col)
base_props.update(num_props)
cell_format = self.__get_cell_format(format_key, base_props)
try:
self.stream.write_number(row, col, float(value_dp.data), cell_format)
return
except TypeError:
pass
if value_dp.typecode is typepy.Typecode.NAN:
base_props = dict(self.__nan_format_property)
cell_format = self.__get_cell_format(format_key, base_props)
self.stream.write(row, col, value_dp.data, cell_format)
def __get_number_property(self, col: int) -> dict[str, str]:
if col in self.__col_numprops_table:
return self.__col_numprops_table[col]
try:
col_dp = self._column_dp_list[col]
except KeyError:
return {}
if col_dp.typecode not in [typepy.Typecode.INTEGER, typepy.Typecode.REAL_NUMBER]:
return {}
num_props = {}
if Integer(col_dp.minmax_decimal_places.max_value).is_type():
float_digit = col_dp.minmax_decimal_places.max_value
if float_digit is not None and float_digit > 0:
num_props = {"num_format": "0.{:s}".format("0" * int(float_digit))}
self.__col_numprops_table[col] = num_props
return num_props
def __get_cell_format(self, format_key, cell_props) -> dict: # type: ignore
cell_format = self.__col_cell_format_cache.get(format_key)
if cell_format is not None:
return cell_format
# cache miss
cell_format = self.__add_format(cell_props)
self.__col_cell_format_cache[format_key] = cell_format
return cell_format
def __add_format(self, dict_property): # type: ignore
assert self.workbook
return self.workbook.workbook.add_format(dict_property)
def __set_cell_width(self) -> None:
font_size = cast(int, self.__cell_format_property.get("font_size"))
if not Integer(font_size).is_type():
return
for col_idx, col_dp in enumerate(self._column_dp_list):
width = min(col_dp.ascii_char_width, self.MAX_CELL_WIDTH) * (font_size / 10.0) + 2
self.stream.set_column(col_idx, col_idx, width=width)
def _preprocess_table_property(self) -> None:
super()._preprocess_table_property()
self.__set_cell_width()
def _postprocess(self) -> None:
super()._postprocess()
self.stream.autofilter(
self.last_header_row, self.first_data_col, self.last_data_row, self.last_data_col
)
self.stream.freeze_panes(self.first_data_row, self.first_data_col)
self.__col_cell_format_cache = {}
self.__col_numprops_table = {}

View File

@ -0,0 +1,141 @@
import abc
import warnings
from typing import Any, Optional
import typepy
from ..._logger import logger
from ...sanitizer import sanitize_excel_sheet_name
from .._common import import_error_msg_template
from .._msgfy import to_error_message
class ExcelWorkbookInterface(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def workbook(self) -> Any: # pragma: no cover
pass
@property
@abc.abstractmethod
def file_path(self) -> Optional[str]: # pragma: no cover
pass
@abc.abstractmethod
def open(self, file_path: str) -> None: # pragma: no cover
pass
@abc.abstractmethod
def close(self) -> None: # pragma: no cover
pass
@abc.abstractmethod
def add_worksheet(self, worksheet_name: Optional[str]) -> Any: # pragma: no cover
pass
class ExcelWorkbook(ExcelWorkbookInterface):
@property
def workbook(self) -> Any:
return self._workbook
@property
def file_path(self) -> Optional[str]:
return self._file_path
def _clear(self) -> None:
self._workbook = None
self._file_path: Optional[str] = None
self._worksheet_table: dict[str, Any] = {}
def __init__(self, file_path: str) -> None:
self._clear()
self._file_path = file_path
def __del__(self) -> None:
self.close()
class ExcelWorkbookXls(ExcelWorkbook):
def __init__(self, file_path: str) -> None:
super().__init__(file_path)
self.open(file_path)
def open(self, file_path: str) -> None:
try:
import xlwt
except ImportError:
warnings.warn(import_error_msg_template.format("excel"))
raise
self._workbook = xlwt.Workbook()
def close(self) -> None:
if self.workbook is None:
return
try:
self.workbook.save(self._file_path)
except IndexError as e:
logger.debug(to_error_message(e))
self._clear()
def add_worksheet(self, worksheet_name: Optional[str]) -> Any:
if typepy.is_not_null_string(worksheet_name):
assert worksheet_name
worksheet_name = sanitize_excel_sheet_name(worksheet_name)
if worksheet_name in self._worksheet_table:
# the work sheet is already exists
return self._worksheet_table.get(worksheet_name)
else:
sheet_id = 1
while True:
worksheet_name = f"Sheet{sheet_id:d}"
if worksheet_name not in self._worksheet_table:
break
sheet_id += 1
worksheet = self.workbook.add_sheet(worksheet_name)
self._worksheet_table[worksheet.get_name()] = worksheet
return worksheet
class ExcelWorkbookXlsx(ExcelWorkbook):
def __init__(self, file_path: str) -> None:
super().__init__(file_path)
self.open(file_path)
def open(self, file_path: str) -> None:
try:
import xlsxwriter
except ImportError:
warnings.warn(import_error_msg_template.format("excel"))
raise
self._workbook = xlsxwriter.Workbook(file_path)
def close(self) -> None:
if self.workbook is None:
return
self._workbook.close() # type: ignore
self._clear()
def add_worksheet(self, worksheet_name: Optional[str]) -> Any:
if typepy.is_not_null_string(worksheet_name):
assert worksheet_name
worksheet_name = sanitize_excel_sheet_name(worksheet_name)
if worksheet_name in self._worksheet_table:
# the work sheet is already exists
return self._worksheet_table.get(worksheet_name)
else:
worksheet_name = None
worksheet = self.workbook.add_worksheet(worksheet_name)
self._worksheet_table[worksheet.get_name()] = worksheet
return worksheet

View File

@ -0,0 +1,58 @@
import abc
from typing import Any
from .._table_writer import AbstractTableWriter
class BinaryWriterInterface(metaclass=abc.ABCMeta):
@abc.abstractmethod
def is_opened(self) -> bool: # pragma: no cover
pass
@abc.abstractmethod
def open(self, file_path: str) -> None: # pragma: no cover
"""
Open a file for output stream.
Args:
file_path (str): path to the file.
"""
class AbstractBinaryTableWriter(AbstractTableWriter, BinaryWriterInterface):
@property
def stream(self) -> Any:
return self._stream
@stream.setter
def stream(self, value: Any) -> None:
raise RuntimeError(
"cannot assign a stream to binary format writers. use open method instead."
)
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.table_name = kwargs.get("table_name", "")
self._stream = None
def __del__(self) -> None:
self.close()
def is_opened(self) -> bool:
return self.stream is not None
def dumps(self) -> str:
raise NotImplementedError("binary format writers did not support dumps method")
def _verify_stream(self) -> None:
if self.stream is None:
raise OSError("null output stream. required to open(file_path) first.")
def _write_value_row_separator(self) -> None:
pass

View File

@ -0,0 +1,99 @@
from typing import IO, Any, Optional, Union
import tabledata
from ...error import EmptyValueError
from ._interface import AbstractBinaryTableWriter
class PandasDataFramePickleWriter(AbstractBinaryTableWriter):
"""
A table writer class for pandas DataFrame pickle.
.. py:method:: write_table()
Write a table to a pandas DataFrame pickle file.
"""
FORMAT_NAME = "pandas_pickle"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return False
def __init__(self, **kwargs: Any) -> None:
import copy
import dataproperty
super().__init__(**kwargs)
self.is_padding = False
self.is_formatting_float = False
self._use_default_header = True
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
self.__filepath: Optional[str] = None
def is_opened(self) -> bool:
return self.__filepath is not None
def open(self, file_path: str) -> None:
self.__filepath = file_path
def close(self) -> None:
super().close()
self.__filepath = None
def dump(self, output: Union[str, IO], close_after_write: bool = True, **kwargs: Any) -> None:
"""Write data to a DataFrame pickle file.
Args:
output (str): Path to an output DataFrame pickle file.
"""
if not isinstance(output, str):
raise TypeError(f"output must be a str: actual={type(output)}")
self.open(output)
try:
self.write_table(**kwargs)
finally:
if close_after_write:
self.close()
def _verify_stream(self) -> None:
pass
def _write_table(self, **kwargs: Any) -> None:
if self.__filepath is None or not self.is_opened():
self._logger.logger.error("required to open(file_path) first.")
return
try:
self._verify_value_matrix()
except EmptyValueError:
self._logger.logger.debug("no tabular data found")
return
self._preprocess()
table_data = tabledata.TableData(
self.table_name,
self.headers,
[
[value_dp.data for value_dp in value_dp_list]
for value_dp_list in self._table_value_dp_matrix
],
type_hints=self.type_hints,
max_workers=self.max_workers,
)
table_data.as_dataframe().to_pickle(self.__filepath)
def _write_table_iter(self, **kwargs: Any) -> None:
self._write_table(**kwargs)

View File

@ -0,0 +1,104 @@
from os.path import abspath
from typing import IO, Any, Union
import tabledata
from ...error import EmptyValueError
from ._interface import AbstractBinaryTableWriter
class SqliteTableWriter(AbstractBinaryTableWriter):
"""
A table writer class for `SQLite <https://www.sqlite.org/index.html>`__ database.
.. py:method:: write_table()
Write a table to a `SQLite <https://www.sqlite.org/index.html>`__ database.
:raises pytablewriter.EmptyTableNameError:
If the |table_name| is empty.
:Example:
:ref:`example-sqlite-table-writer`
"""
FORMAT_NAME = "sqlite"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
def __init__(self, **kwargs: Any) -> None:
import copy
import dataproperty
super().__init__(**kwargs)
self.is_padding = False
self.is_formatting_float = False
self._use_default_header = True
self._is_require_table_name = True
self._is_require_header = True
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
def open(self, file_path: str) -> None:
"""
Open a SQLite database file.
:param str file_path: SQLite database file path to open.
"""
from simplesqlite import SimpleSQLite
if self.is_opened():
if self.stream.database_path == abspath(file_path):
self._logger.logger.debug(f"database already opened: {self.stream.database_path}")
return
self.close()
self._stream = SimpleSQLite(file_path, "w", max_workers=self.max_workers)
def dump(self, output: Union[str, IO], close_after_write: bool = True, **kwargs: Any) -> None:
"""Write data to the SQLite database file.
Args:
output (str):
path to the output SQLite database file.
close_after_write (bool, optional):
Close the output after write.
Defaults to |True|.
"""
if not isinstance(output, str):
raise TypeError(f"output must be a str: actual={type(output)}")
self.open(output)
try:
self.write_table(**kwargs)
finally:
if close_after_write:
self.close()
def _write_table(self, **kwargs: Any) -> None:
try:
self._verify_value_matrix()
except EmptyValueError:
self._logger.logger.debug("no tabular data found")
return
self._preprocess()
table_data = tabledata.TableData(
self.table_name,
self.headers,
[
[value_dp.data for value_dp in value_dp_list]
for value_dp_list in self._table_value_dp_matrix
],
type_hints=self.type_hints,
max_workers=self.max_workers,
)
self.stream.create_table_from_tabledata(table_data)

View File

@ -0,0 +1,44 @@
from ._asciidoc import AsciiDocTableWriter
from ._borderless import BorderlessTableWriter
from ._css import CssTableWriter
from ._csv import CsvTableWriter
from ._html import HtmlTableWriter
from ._json import JsonTableWriter
from ._jsonlines import JsonLinesTableWriter
from ._latex import LatexMatrixWriter, LatexTableWriter
from ._ltsv import LtsvTableWriter
from ._markdown import MarkdownFlavor, MarkdownTableWriter, normalize_md_flavor
from ._mediawiki import MediaWikiTableWriter
from ._rst import RstCsvTableWriter, RstGridTableWriter, RstSimpleTableWriter
from ._spacealigned import SpaceAlignedTableWriter
from ._toml import TomlTableWriter
from ._tsv import TsvTableWriter
from ._unicode import BoldUnicodeTableWriter, UnicodeTableWriter
from ._yaml import YamlTableWriter
__all__ = (
"AsciiDocTableWriter",
"BoldUnicodeTableWriter",
"BorderlessTableWriter",
"CssTableWriter",
"CsvTableWriter",
"HtmlTableWriter",
"JsonTableWriter",
"JsonLinesTableWriter",
"LatexMatrixWriter",
"LatexTableWriter",
"LtsvTableWriter",
"MarkdownFlavor",
"MarkdownTableWriter",
"normalize_md_flavor",
"MediaWikiTableWriter",
"RstCsvTableWriter",
"RstGridTableWriter",
"RstSimpleTableWriter",
"SpaceAlignedTableWriter",
"TomlTableWriter",
"TsvTableWriter",
"UnicodeTableWriter",
"YamlTableWriter",
)

View File

@ -0,0 +1,147 @@
import copy
from collections.abc import Sequence
from typing import Any, Final
import dataproperty as dp
import typepy
from dataproperty import ColumnDataProperty, DataProperty, LineBreakHandling
from mbstrdecoder import MultiByteStrDecoder
from ...style import (
Align,
FontStyle,
FontWeight,
Style,
StylerInterface,
TextStyler,
get_align_char,
)
from .._table_writer import AbstractTableWriter
from ._text_writer import TextTableWriter
class AsciiDocStyler(TextStyler):
def apply(self, value: str, style: Style) -> str:
value = super().apply(value, style)
if not value:
return value
try:
fg_color = style.fg_color.name.lower() # type: ignore
except AttributeError:
fg_color = None
try:
bg_color = style.bg_color.name.lower() # type: ignore
except AttributeError:
bg_color = None
if fg_color and bg_color:
value = f"[{fg_color} {bg_color}-background]##{value}##"
elif fg_color:
value = f"[{fg_color}]##{value}##"
elif bg_color:
value = f"[{bg_color}-background]##{value}##"
if style.font_weight == FontWeight.BOLD:
value = f"*{value}*"
if style.font_style == FontStyle.ITALIC:
value = f"_{value}_"
return value
class AsciiDocTableWriter(TextTableWriter):
"""
A table writer class for `AsciiDoc <https://asciidoc.org/>`__ format.
"""
FORMAT_NAME = "asciidoc"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.column_delimiter = "\n"
self.is_padding = False
self.is_write_header_separator_row = True
self.is_write_value_separator_row = True
self.is_write_opening_row = True
self.is_write_closing_row = True
self.update_preprocessor(line_break_handling=LineBreakHandling.NOP)
self._quoting_flags = copy.deepcopy(dp.NOT_QUOTING_FLAGS)
def _create_styler(self, writer: AbstractTableWriter) -> StylerInterface:
return AsciiDocStyler(writer)
def _write_value_row(
self, row: int, values: Sequence[str], value_dp_list: Sequence[DataProperty]
) -> None:
self._write_row(
row,
[
self.__modify_row_element(row, col_idx, value, value_dp)
for col_idx, (value, value_dp) in enumerate(zip(values, value_dp_list))
],
)
def _get_opening_row_items(self) -> list[str]:
cols: Final = ", ".join(
f"{get_align_char(col_dp.align)}{col_dp.ascii_char_width}"
for col_dp in self._column_dp_list
)
rows = [f'[cols="{cols}", options="header"]']
if typepy.is_not_null_string(self.table_name):
rows.append("." + MultiByteStrDecoder(self.table_name).unicode_str)
rows.append("|===")
return ["\n".join(rows)]
def _get_header_row_separator_items(self) -> list[str]:
return [""]
def _get_value_row_separator_items(self) -> list[str]:
return self._get_header_row_separator_items()
def _get_closing_row_items(self) -> list[str]:
return ["|==="]
def __apply_align(self, value: str, style: Style) -> str:
return f"{get_align_char(Align.CENTER)}|{value}"
def _apply_style_to_header_item(
self, col_dp: ColumnDataProperty, value_dp: DataProperty, style: Style
) -> str:
value = self._styler.apply(col_dp.dp_to_str(value_dp), style=style)
return self.__apply_align(value, style)
def __modify_row_element(
self, row_idx: int, col_idx: int, value: str, value_dp: DataProperty
) -> str:
col_dp: Final = self._column_dp_list[col_idx]
style: Final = self._fetch_style(row_idx, col_dp, value_dp)
align = col_dp.align
if style and style.align and style.align != align:
forma_stirng = "{0:s}|{1:s}"
align = style.align
elif value_dp.align != align:
forma_stirng = "{0:s}|{1:s}"
align = value_dp.align
else:
forma_stirng = "|{1:s}"
return forma_stirng.format(get_align_char(align), value)

View File

@ -0,0 +1,41 @@
import copy
from typing import Any
import dataproperty as dp
from ._text_writer import IndentationTextTableWriter
class BorderlessTableWriter(IndentationTextTableWriter):
"""
A table writer class for borderless table.
"""
FORMAT_NAME = "borderless"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.table_name = ""
self.column_delimiter = ""
self.char_left_side_row = ""
self.char_right_side_row = ""
self.indent_string = kwargs.get("indent_string", " ")
self.is_write_header_separator_row = False
self.is_write_value_separator_row = False
self.is_write_opening_row = False
self.is_write_closing_row = False
self._quoting_flags = copy.deepcopy(dp.NOT_QUOTING_FLAGS)
self._init_cross_point_maps()

View File

@ -0,0 +1,26 @@
from decimal import Decimal
from typing import Any
from dataproperty import DataProperty
from typepy import Typecode
def bool_to_str(value: Any) -> Any:
if value is True:
return "true"
if value is False:
return "false"
return value
def serialize_dp(dp: DataProperty) -> Any:
if dp.typecode in (Typecode.REAL_NUMBER, Typecode.INFINITY, Typecode.NAN) and isinstance(
dp.data, Decimal
):
return float(dp.data)
if dp.typecode == Typecode.DATETIME:
return dp.to_str()
return dp.data

View File

@ -0,0 +1,157 @@
import copy
from typing import Any, Final, cast
from dataproperty import NOT_QUOTING_FLAGS, DataProperty
from pathvalidate import replace_symbol
from ...error import EmptyTableDataError
from ...style import Align, DecorationLine, FontStyle, FontWeight, Style, VerticalAlign
from .._common import HEADER_ROW
from ._text_writer import IndentationTextTableWriter
class CssTableWriter(IndentationTextTableWriter):
"""
A CSS writer class.
"""
FORMAT_NAME = "css"
MARGIN_PIXEL: Final = 6
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return False
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.is_padding = False
self.indent_string = kwargs.get("indent_string", " ")
self._dp_extractor.preprocessor.is_escape_html_tag = False
self._quoting_flags = copy.deepcopy(NOT_QUOTING_FLAGS)
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with CSS.
"""
with self._logger:
try:
self._verify_property()
except EmptyTableDataError:
self._logger.logger.debug("no tabular data found")
return
self._preprocess()
self.__write_css(
css_class=replace_symbol(self.table_name, replacement_text="-"),
write_style_tag=kwargs.get("write_style_tag", False),
)
def __extract_css_tags(self, value_dp: DataProperty, style: Style) -> list[str]:
css_tags: list[str] = []
if self._styler.get_font_size(style):
css_tags.append(cast(str, self._styler.get_font_size(style)))
if style.font_weight == FontWeight.BOLD:
css_tags.append("font-weight:bold")
if style.font_style == FontStyle.ITALIC:
css_tags.append("font-style:italic")
if style.color:
css_tags.append(f"color: {style.color.color_code}")
if style.bg_color:
css_tags.append(f"background-color: {style.bg_color.color_code}")
css_tag = self.__extract_align_tag(value_dp, style)
if css_tag:
css_tags.append(css_tag)
if style.vertical_align != VerticalAlign.BASELINE:
css_tags.append(f"vertical-align: {style.vertical_align.align_str}")
if style.decoration_line in (DecorationLine.LINE_THROUGH, DecorationLine.STRIKE):
css_tags.append("text-decoration-line: line-through")
elif style.decoration_line == DecorationLine.UNDERLINE:
css_tags.append("text-decoration-line: underline")
if self.margin > 0:
css_tags.append(f"padding: {self.margin * self.MARGIN_PIXEL}px")
return css_tags
def __extract_align_tag(self, value_dp: DataProperty, style: Style) -> str:
if style.align == Align.AUTO:
value = value_dp.align.align_string
else:
value = style.align.align_string
return f"text-align: {value}"
def __write_css_thead(self, css_class: str, base_indent_level: int) -> None:
for col_dp, header_dp in zip(self._column_dp_list, self._dp_extractor.to_header_dp_list()):
style = self._fetch_style(HEADER_ROW, col_dp, header_dp)
css_tags = self.__extract_css_tags(header_dp, style)
if not css_tags:
continue
self.set_indent_level(base_indent_level)
self._write_line(
".{css_class} thead th:nth-child({col}) {{".format(
css_class=css_class, col=col_dp.column_index + 1
)
)
self.set_indent_level(base_indent_level + 1)
for css_tag in css_tags:
self._write_line(f"{css_tag};")
self.set_indent_level(base_indent_level)
self._write_line("}")
def __write_css_tbody(self, css_class: str, base_indent_level: int) -> None:
for row_idx, (values, value_dp_list) in enumerate(
zip(self._table_value_matrix, self._table_value_dp_matrix)
):
for value, value_dp, col_dp in zip(values, value_dp_list, self._column_dp_list):
style = self._fetch_style(row_idx, col_dp, value_dp)
css_tags = self.__extract_css_tags(value_dp, style)
if not css_tags:
continue
self.set_indent_level(base_indent_level)
self._write_line(
".{css_class} tbody tr:nth-child({row}) td:nth-child({col}) {{".format(
css_class=css_class, row=row_idx + 1, col=col_dp.column_index + 1
)
)
self.set_indent_level(base_indent_level + 1)
for css_tag in css_tags:
self._write_line(f"{css_tag};")
self.set_indent_level(base_indent_level)
self._write_line("}")
def __write_css(self, css_class: str, write_style_tag: bool = False) -> None:
base_indent_level = 0
if write_style_tag:
self._write_line('<style type="text/css">')
base_indent_level = 1
self.__write_css_thead(css_class, base_indent_level)
self.__write_css_tbody(css_class, base_indent_level)
if write_style_tag:
self.set_indent_level(0)
self._write_line("</style>")

View File

@ -0,0 +1,63 @@
from typing import Any
import typepy
from ._text_writer import TextTableWriter
class CsvTableWriter(TextTableWriter):
"""
A table writer class for character separated values format.
The default separated character is a comma (``","``).
:Example:
:ref:`example-csv-table-writer`
"""
FORMAT_NAME = "csv"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
@property
def margin(self) -> int:
return self._margin
@margin.setter
def margin(self, value: int) -> None:
# margin setting must be ignored
return
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._set_chars("")
self.indent_string = ""
self.column_delimiter = kwargs.get("column_delimiter", ",")
self._margin = 0
self.is_padding = False
self.is_formatting_float = False
self.is_write_header_separator_row = False
self._quoting_flags[typepy.Typecode.NULL_STRING] = False
def _write_header(self) -> None:
if typepy.is_empty_sequence(self.headers):
return
super()._write_header()
def _get_opening_row_items(self) -> list[str]:
return []
def _get_value_row_separator_items(self) -> list[str]:
return []
def _get_closing_row_items(self) -> list[str]:
return []

View File

@ -0,0 +1,194 @@
import copy
import warnings
from typing import Any, Final, Optional, cast
import dataproperty
import typepy
from mbstrdecoder import MultiByteStrDecoder
from pathvalidate import replace_symbol
from ...error import EmptyTableDataError
from ...sanitizer import sanitize_python_var_name
from ...style import Align, FontStyle, FontWeight, HtmlStyler, Style, StylerInterface, VerticalAlign
from .._common import import_error_msg_template
from .._table_writer import AbstractTableWriter
from ._css import CssTableWriter
from ._text_writer import TextTableWriter
def _get_tags_module() -> tuple:
try:
from dominate import tags
from dominate.util import raw
return tags, raw
except ImportError:
warnings.warn(import_error_msg_template.format("html"))
raise
class HtmlTableWriter(TextTableWriter):
"""
A table writer class for HTML format.
:Example:
:ref:`example-html-table-writer`
"""
FORMAT_NAME = "html"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return False
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.is_padding = False
self.indent_string = kwargs.get("indent_string", " ")
self._dp_extractor.preprocessor.line_break_repl = "<br>"
self._dp_extractor.preprocessor.is_escape_html_tag = False
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
self._table_tag: Any = None
self.enable_ansi_escape = False
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with HTML table format.
Args:
write_css (bool):
If |True|, write CSS corresponding to the specified styles,
instead of attributes of HTML tags.
Example:
:ref:`example-html-table-writer`
.. note::
- |None| values will be replaced with an empty value
"""
tags, raw = _get_tags_module()
write_css: Final[bool] = kwargs.get("write_css", False)
with self._logger:
try:
self._verify_property()
except EmptyTableDataError:
self._logger.logger.debug("no tabular data found")
return
self._preprocess()
css_class: Optional[str] = None
if write_css:
default_css_class_name = replace_symbol(self.table_name, replacement_text="-")
if default_css_class_name:
default_css_class_name += "-css"
else:
default_css_class_name = "ptw-table-css"
css_class = kwargs.get("css_class", default_css_class_name)
css_writer = CssTableWriter(
table_name=css_class,
margin=self.margin,
stream=self.stream,
)
css_writer.from_writer(self, is_overwrite_table_name=False)
css_writer.write_table(write_style_tag=True)
if typepy.is_not_null_string(self.table_name):
if css_class:
self._table_tag = tags.table(
id=sanitize_python_var_name(self.table_name), class_name=css_class
)
else:
self._table_tag = tags.table(id=sanitize_python_var_name(self.table_name))
self._table_tag += tags.caption(MultiByteStrDecoder(self.table_name).unicode_str)
else:
if css_class:
self._table_tag = tags.table(class_name=css_class)
else:
self._table_tag = tags.table()
try:
self._write_header()
except ValueError:
pass
self._write_body(not write_css)
def _write_header(self) -> None:
tags, raw = _get_tags_module()
if not self.is_write_header:
return
if typepy.is_empty_sequence(self._table_headers):
raise ValueError("headers is empty")
tr_tag = tags.tr()
for header in self._table_headers:
tr_tag += tags.th(raw(MultiByteStrDecoder(header).unicode_str))
thead_tag = tags.thead()
thead_tag += tr_tag
self._table_tag += thead_tag
def _write_body(self, write_attr: bool) -> None:
tags, raw = _get_tags_module()
tbody_tag = tags.tbody()
for row_idx, (values, value_dp_list) in enumerate(
zip(self._table_value_matrix, self._table_value_dp_matrix)
):
tr_tag = tags.tr()
for value, value_dp, column_dp in zip(values, value_dp_list, self._column_dp_list):
td_tag = tags.td(raw(MultiByteStrDecoder(value).unicode_str))
style = self._fetch_style(row_idx, column_dp, value_dp)
if write_attr:
if style.align == Align.AUTO:
td_tag["align"] = value_dp.align.align_string
else:
td_tag["align"] = style.align.align_string
if style.vertical_align != VerticalAlign.BASELINE:
td_tag["valign"] = style.vertical_align.align_str
style_tag = self.__make_style_tag(style=style)
if style_tag:
td_tag["style"] = style_tag
tr_tag += td_tag
tbody_tag += tr_tag
self._table_tag += tbody_tag
self._write_line(self._table_tag.render(indent=self.indent_string))
def __make_style_tag(self, style: Style) -> Optional[str]:
styles: list[str] = []
if self._styler.get_font_size(style):
styles.append(cast(str, self._styler.get_font_size(style)))
if style.font_weight == FontWeight.BOLD:
styles.append("font-weight:bold")
if style.font_style == FontStyle.ITALIC:
styles.append("font-style:italic")
if not styles:
return None
return "; ".join(styles)
def _create_styler(self, writer: AbstractTableWriter) -> StylerInterface:
return HtmlStyler(writer)

View File

@ -0,0 +1,29 @@
import abc
class TextWriterInterface(metaclass=abc.ABCMeta):
"""
Interface class for writing texts.
"""
@abc.abstractmethod
def write_null_line(self) -> None: # pragma: no cover
pass
class IndentationInterface(metaclass=abc.ABCMeta):
"""
Interface class for indentation methods.
"""
@abc.abstractmethod
def set_indent_level(self, indent_level: int) -> None: # pragma: no cover
pass
@abc.abstractmethod
def inc_indent_level(self) -> None: # pragma: no cover
pass
@abc.abstractmethod
def dec_indent_level(self) -> None: # pragma: no cover
pass

View File

@ -0,0 +1,167 @@
import copy
from textwrap import indent
from typing import Any, Final
import dataproperty
import typepy
from dataproperty import ColumnDataProperty, DataProperty
from mbstrdecoder import MultiByteStrDecoder
from tabledata import to_value_matrix
from typepy import Typecode
from .._msgfy import to_error_message
from ._common import serialize_dp
from ._text_writer import IndentationTextTableWriter
try:
import simplejson as json
except ImportError:
import json # type: ignore
class JsonTableWriter(IndentationTextTableWriter):
"""
A table writer class for JSON format.
Examples:
:ref:`example-json-table-writer`
.. py:method:: write_table
|write_table| with JSON format.
Args:
indent (Optional[int]):
Indent level of an output.
Interpretation of indent level value differ format to format.
Some writer classes may ignore this value.
Defaults to 4.
sort_keys (Optional[bool]):
If |True|, the output of dictionaries will be sorted by key.
Defaults to |False|.
Examples:
:ref:`example-json-table-writer`
.. note::
Specific values in the tabular data are converted when writing:
- |None|: written as ``null``
- |inf|: written as ``Infinity``
- |nan|: written as ``NaN``
"""
FORMAT_NAME = "json"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.set_indent_level(4)
self.is_formatting_float = False
self.is_write_opening_row = True
self.is_write_closing_row = True
self.char_right_side_row = ","
self.char_opening_row_cross_point = ""
self.char_closing_row_cross_point = ""
self._is_require_header = True
self._dp_extractor.type_value_map = {
Typecode.INFINITY: "Infinity",
Typecode.NAN: "NaN",
}
self._dp_extractor.update_strict_level_map({Typecode.BOOL: typepy.StrictLevel.MAX})
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
self._init_cross_point_maps()
def write_null_line(self) -> None:
self._verify_stream()
self.stream.write("\n")
def _write_table(self, **kwargs: Any) -> None:
sort_keys: Final = kwargs.get("sort_keys", False)
self._preprocess()
with self._logger:
self._write_opening_row()
json_text_list = []
for json_data in self._table_value_matrix:
json_text = json.dumps(
json_data, indent=self._indent_level, ensure_ascii=False, sort_keys=sort_keys
)
json_text_list.append(json_text)
joint_text = self.char_right_side_row + "\n"
json_text = joint_text.join(json_text_list)
if all([not self.is_write_closing_row, typepy.is_not_null_string(json_text)]):
json_text += joint_text
self.stream.write(indent(json_text, " " * self._indent_level))
self._write_closing_row()
def _to_row_item(self, row_idx: int, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
return value_dp.to_str()
def _preprocess_table_dp(self) -> None:
if self._is_complete_table_dp_preprocess:
return
self._logger.logger.debug("_preprocess_table_dp")
try:
self._table_value_dp_matrix = self._dp_extractor.to_dp_matrix(
to_value_matrix(self.headers, self.value_matrix)
)
except TypeError as e:
self._logger.logger.debug(to_error_message(e))
self._table_value_dp_matrix = []
self._is_complete_table_dp_preprocess = True
def _preprocess_header(self) -> None:
if self._is_complete_header_preprocess:
return
self._logger.logger.debug("_preprocess_header")
self._table_headers = [
header_dp.to_str() for header_dp in self._dp_extractor.to_header_dp_list()
]
self._is_complete_header_preprocess = True
def _preprocess_value_matrix(self) -> None:
if self._is_complete_value_matrix_preprocess:
return
self._table_value_matrix = [
dict(zip(self._table_headers, [serialize_dp(dp) for dp in dp_list]))
for dp_list in self._table_value_dp_matrix
]
self._is_complete_value_matrix_preprocess = True
def _get_opening_row_items(self) -> list[str]:
if typepy.is_not_null_string(self.table_name):
return [f'{{ "{MultiByteStrDecoder(self.table_name).unicode_str:s}" : [']
return ["["]
def _get_closing_row_items(self) -> list[str]:
if typepy.is_not_null_string(self.table_name):
return ["\n]}"]
return ["\n]"]

View File

@ -0,0 +1,53 @@
from typing import Any, Final
from ._json import JsonTableWriter
try:
import simplejson as json
except ImportError:
import json # type: ignore
class JsonLinesTableWriter(JsonTableWriter):
"""
A table writer class for JSON lines format.
:Example:
:ref:`example-jsonl-writer`
"""
FORMAT_NAME = "json_lines"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with
`Line-delimited JSON(LDJSON)
<https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON>`__
/NDJSON/JSON Lines format.
Args:
sort_keys (Optional[bool]):
If |True|, the output of dictionaries will be sorted by key.
Defaults to |False|.
Example:
:ref:`example-jsonl-writer`
"""
sort_keys: Final = kwargs.get("sort_keys", False)
with self._logger:
self._verify_property()
self._preprocess()
for values in self._table_value_matrix:
self._write_line(json.dumps(values, ensure_ascii=False, sort_keys=sort_keys))

View File

@ -0,0 +1,226 @@
import copy
import re
from typing import Any, Final
import dataproperty as dp
import typepy
from dataproperty import ColumnDataProperty, DataProperty
from typepy import Typecode
from ...style import Align, LatexStyler, StylerInterface
from .._table_writer import AbstractTableWriter
from ._text_writer import IndentationTextTableWriter
class LatexWriter(IndentationTextTableWriter):
"""
A base writer class for LaTeX format.
"""
_RE_MATH_PARTS: Final = re.compile(r"^\\?[a-zA-Z]+$")
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._set_chars("")
self.is_write_opening_row = True
self.is_write_closing_row = True
self.indent_string = kwargs.get("indent_string", " ")
self.column_delimiter = " & "
self.char_right_side_row = r" \\"
self._quoting_flags = copy.deepcopy(dp.NOT_QUOTING_FLAGS)
self._init_cross_point_maps()
def _is_math_parts(self, value_dp: DataProperty) -> bool:
if value_dp.typecode in [Typecode.INTEGER, Typecode.REAL_NUMBER]:
return False
try:
if self._RE_MATH_PARTS.search(value_dp.data):
return True
except TypeError:
pass
return False
def _get_col_align_char_list(self) -> list[str]:
col_align_list = []
for col_dp in self._column_dp_list:
align = self._get_col_style(col_dp.column_index).align
if align is None or align == Align.AUTO:
align = col_dp.align
if align == Align.RIGHT:
col_align = "r"
elif align == Align.CENTER:
col_align = "c"
else:
col_align = "l"
col_align_list.append(col_align)
return col_align_list
def _write_opening_row(self) -> None:
super()._write_opening_row()
self.inc_indent_level()
def _write_closing_row(self) -> None:
self.dec_indent_level()
super()._write_closing_row()
def _to_math_parts(self, value: str) -> str:
# dollar characters for both sides of math parts are not required in
# Jupyter latex.
# return r"${:s}$".format(value)
return value
def _create_styler(self, writer: AbstractTableWriter) -> StylerInterface:
return LatexStyler(writer)
class LatexMatrixWriter(LatexWriter):
"""
A matrix writer class for LaTeX environment.
:Example:
:ref:`example-latex-matrix-writer`
.. py:method:: write_table
|write_table| with LaTeX ``array`` environment.
:Example:
:ref:`example-latex-matrix-writer`
"""
FORMAT_NAME = "latex_matrix"
_RE_VAR: Final = re.compile(r"^[a-zA-Z]+_\{[a-zA-Z0-9]+\}$")
@property
def format_name(self) -> str:
return self.FORMAT_NAME
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.is_write_header = False
self.is_write_header_separator_row = False
def _to_row_item(self, row_idx: int, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
row_item = super()._to_row_item(row_idx, col_dp, value_dp)
if self._RE_VAR.search(row_item):
return row_item
if self._is_math_parts(value_dp):
return self._to_math_parts(row_item)
return row_item
def _get_header_row_separator_items(self) -> list[str]:
return []
def _get_opening_row_items(self) -> list[str]:
row_item_list = []
if typepy.is_not_null_string(self.table_name):
row_item_list.append(self.table_name + r" = \left( ")
else:
row_item_list.append(r"\left( ")
row_item_list.extend(
[r"\begin{array}{", "{:s}".format("".join(self._get_col_align_char_list())), "}"]
)
return ["".join(row_item_list)]
def _get_closing_row_items(self) -> list[str]:
return [r"\end{array} \right)"]
def _write_opening_row(self) -> None:
self._write_line(r"\begin{equation}")
self.inc_indent_level()
super()._write_opening_row()
def _write_closing_row(self) -> None:
super()._write_closing_row()
self.dec_indent_level()
self._write_line(r"\end{equation}")
class LatexTableWriter(LatexWriter):
"""
A matrix writer class for LaTeX environment.
:Example:
:ref:`example-latex-table-writer`
.. py:method:: write_table
|write_table| with LaTeX ``array`` environment.
:Example:
:ref:`example-latex-table-writer`
"""
FORMAT_NAME = "latex_table"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.char_right_side_row = r" \\ \hline"
self._dp_extractor.set_type_value(Typecode.INFINITY, r"\infty")
def _get_opening_row_items(self) -> list[str]:
return [
"".join(
[
r"\begin{array}{",
"{:s}".format(" | ".join(self._get_col_align_char_list())),
r"} \hline",
]
)
]
def __is_requre_verbatim(self, value_dp: DataProperty) -> bool:
if value_dp.typecode != typepy.Typecode.STRING:
return False
return True
def __verbatim(self, value: str) -> str:
return r"\verb" + f"|{value:s}|"
def _to_header_item(self, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
return self.__verbatim(super()._to_header_item(col_dp, value_dp))
def _to_row_item(self, row_idx: int, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
row_item = super()._to_row_item(row_idx, col_dp, value_dp)
if self._is_math_parts(value_dp):
return self._to_math_parts(row_item)
if self.__is_requre_verbatim(value_dp):
return self.__verbatim(row_item)
return row_item
def _get_header_row_separator_items(self) -> list[str]:
return [r"\hline"]
def _get_closing_row_items(self) -> list[str]:
return [r"\end{array}"]

View File

@ -0,0 +1,59 @@
from typing import Any
import pathvalidate
import typepy
from ._csv import CsvTableWriter
class LtsvTableWriter(CsvTableWriter):
"""
A table writer class for
`Labeled Tab-separated Values (LTSV) <http://ltsv.org/>`__ format.
:Example:
:ref:`example-ltsv-table-writer`
"""
FORMAT_NAME = "ltsv"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.is_write_header = False
self._is_require_header = True
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with
`Labeled Tab-separated Values (LTSV) <http://ltsv.org/>`__ format.
Invalid characters in labels/data are removed.
:Example:
:ref:`example-ltsv-table-writer`
"""
with self._logger:
self._verify_property()
self._preprocess()
for values in self._table_value_matrix:
ltsv_item_list = [
f"{pathvalidate.sanitize_ltsv_label(header_name):s}:{value}"
for header_name, value in zip(self.headers, values)
if typepy.is_not_null_string(value)
]
if typepy.is_empty_sequence(ltsv_item_list):
continue
self._write_line("\t".join(ltsv_item_list))

View File

@ -0,0 +1,198 @@
import copy
from enum import Enum, unique
from typing import Any, Final, Union
import dataproperty as dp
import typepy
from dataproperty import ColumnDataProperty, DataProperty
from mbstrdecoder import MultiByteStrDecoder
from pathvalidate import replace_symbol
from ..._function import normalize_enum
from ...error import EmptyTableDataError
from ...style import Align, GFMarkdownStyler, MarkdownStyler, StylerInterface
from .._table_writer import AbstractTableWriter
from ._text_writer import IndentationTextTableWriter
@unique
class MarkdownFlavor(Enum):
COMMON_MARK = "common_mark"
GITHUB = "github"
GFM = "gfm"
JEKYLL = "jekyll"
KRAMDOWN = "kramdown"
def normalize_md_flavor(flavor: Union[str, MarkdownFlavor, None]) -> MarkdownFlavor:
if isinstance(flavor, str):
flavor_str = replace_symbol(flavor.strip(), "_").upper()
flavor_str = flavor_str.replace("COMMONMARK", "COMMON_MARK")
flavor = flavor_str
norm_flavor = normalize_enum(flavor, MarkdownFlavor, default=MarkdownFlavor.COMMON_MARK)
if norm_flavor == MarkdownFlavor.GITHUB:
return MarkdownFlavor.GFM
if norm_flavor == MarkdownFlavor.JEKYLL:
return MarkdownFlavor.KRAMDOWN
return norm_flavor
class MarkdownTableWriter(IndentationTextTableWriter):
"""
A table writer class for Markdown format.
Example:
:ref:`example-markdown-table-writer`
"""
FORMAT_NAME = "markdown"
DEFAULT_FLAVOR: Final = MarkdownFlavor.COMMON_MARK
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
self.__flavor = normalize_md_flavor(kwargs.pop("flavor", self.DEFAULT_FLAVOR))
super().__init__(**kwargs)
self.indent_string = ""
self.column_delimiter = "|"
self.char_left_side_row = "|"
self.char_right_side_row = "|"
self.char_cross_point = "|"
self.char_header_row_cross_point = "|"
self.char_header_row_left_cross_point = "|"
self.char_header_row_right_cross_point = "|"
self.is_write_opening_row = True
self._use_default_header = True
self._is_require_header = True
self._quoting_flags = copy.deepcopy(dp.NOT_QUOTING_FLAGS)
self._dp_extractor.min_column_width = 3
self._init_cross_point_maps()
def _to_header_item(self, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
return self.__escape_vertical_bar_char(super()._to_header_item(col_dp, value_dp))
def _to_row_item(self, row_idx: int, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
return self.__escape_vertical_bar_char(super()._to_row_item(row_idx, col_dp, value_dp))
def _get_opening_row_items(self) -> list[str]:
return []
def _get_header_row_separator_items(self) -> list[str]:
header_separator_list = []
margin = " " * self.margin
for col_dp in self._column_dp_list:
padding_len = self._get_padding_len(col_dp)
align = self._get_align(col_dp.column_index, col_dp.align)
if align == Align.RIGHT:
separator_item = "-" * (padding_len - 1) + ":"
elif align == Align.CENTER:
separator_item = ":" + "-" * (padding_len - 2) + ":"
else:
separator_item = "-" * padding_len
header_separator_list.append(
"{margin}{item}{margin}".format(margin=margin, item=separator_item)
)
return header_separator_list
def _get_value_row_separator_items(self) -> list[str]:
return []
def _get_closing_row_items(self) -> list[str]:
return []
def _write_header(self) -> None:
super()._write_header()
if self.__flavor == MarkdownFlavor.KRAMDOWN:
self._write_line()
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with Markdown table format.
Args:
flavor (Optional[str]):
possible flavors are as follows (case insensitive):
- ``"CommonMark"``
- ``"gfm"``
- ``"github"`` (alias of ``"gfm"``)
- ``kramdown``
- ``Jekyll`` (alias of ``"kramdown"``)
Defaults to ``"CommonMark"``.
Example:
:ref:`example-markdown-table-writer`
.. note::
- |None| values are written as an empty string
- Vertical bar characters (``'|'``) in table items are escaped
"""
if "flavor" in kwargs:
new_flavor = normalize_md_flavor(kwargs["flavor"])
if new_flavor != self.__flavor:
self._clear_preprocess()
self.__flavor = new_flavor
if self.__flavor:
self._styler = self._create_styler(self)
with self._logger:
try:
self._verify_property()
except EmptyTableDataError:
self._logger.logger.debug("no tabular data found")
return
self.__write_chapter()
self._write_table(**kwargs)
if self.is_write_null_line_after_table:
self.write_null_line()
def _write_table_iter(self, **kwargs: Any) -> None:
self.__write_chapter()
super()._write_table_iter()
def __write_chapter(self) -> None:
if typepy.is_null_string(self.table_name):
return
self._write_line(
"{:s} {:s}".format(
"#" * (self._indent_level + 1), MultiByteStrDecoder(self.table_name).unicode_str
)
)
if self.__flavor == MarkdownFlavor.KRAMDOWN:
self._write_line()
def _create_styler(self, writer: AbstractTableWriter) -> StylerInterface:
if self.__flavor == MarkdownFlavor.GFM:
return GFMarkdownStyler(writer)
return MarkdownStyler(writer)
@staticmethod
def __escape_vertical_bar_char(value: str) -> str:
return value.replace("|", r"\|")

View File

@ -0,0 +1,106 @@
import copy
import re
from collections.abc import Sequence
from typing import Any, Final
import dataproperty as dp
import typepy
from dataproperty import ColumnDataProperty, DataProperty, LineBreakHandling
from mbstrdecoder import MultiByteStrDecoder
from ...style import Align, Style, get_align_char
from ._text_writer import TextTableWriter
class MediaWikiTableWriter(TextTableWriter):
"""
A table writer class for `MediaWiki <https://www.mediawiki.org/wiki/MediaWiki>`__ format.
:Example:
:ref:`example-mediawiki-table-writer`
"""
FORMAT_NAME = "mediawiki"
__RE_TABLE_SEQUENCE: Final = re.compile(r"^[\s]+[*|#]+")
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.column_delimiter = "\n"
self.is_padding = False
self.is_write_header_separator_row = True
self.is_write_value_separator_row = True
self.is_write_opening_row = True
self.is_write_closing_row = True
self.update_preprocessor(line_break_handling=LineBreakHandling.NOP)
self._quoting_flags = copy.deepcopy(dp.NOT_QUOTING_FLAGS)
# experimental: This attribute may change in the future release
self.table_style = kwargs.get("table_style", "")
def _write_header(self) -> None:
if not self.is_write_header:
return
if typepy.is_not_null_string(self.table_name):
self._write_line("|+" + MultiByteStrDecoder(self.table_name).unicode_str)
super()._write_header()
def _write_value_row(
self, row: int, values: Sequence[str], value_dp_list: Sequence[DataProperty]
) -> None:
self._write_row(
row,
[
self.__modify_table_element(value, value_dp)
for value, value_dp in zip(values, value_dp_list)
],
)
def _get_opening_row_items(self) -> list[str]:
item = '{| class="wikitable"'
if self.table_style:
item += f' style="{self.table_style}"'
return [item]
def _get_header_row_separator_items(self) -> list[str]:
return ["|-"]
def _get_value_row_separator_items(self) -> list[str]:
return self._get_header_row_separator_items()
def _get_closing_row_items(self) -> list[str]:
return ["|}"]
def _apply_style_to_header_item(
self, col_dp: ColumnDataProperty, value_dp: DataProperty, style: Style
) -> str:
value: Final = self._styler.apply(col_dp.dp_to_str(value_dp), style=style)
str_format = "! {{:{:s}{:s}}}".format(
get_align_char(Align.CENTER), str(self._get_padding_len(col_dp, value_dp))
)
return str_format.format(value)
def __modify_table_element(self, value: str, value_dp: DataProperty) -> str:
if value_dp.align is Align.LEFT:
forma_stirng = "| {1:s}"
else:
forma_stirng = '| style="text-align:{0:s}"| {1:s}'
if self.__RE_TABLE_SEQUENCE.search(value) is not None:
value = "\n" + value.lstrip()
return forma_stirng.format(value_dp.align.align_string, value)

View File

@ -0,0 +1,241 @@
import copy
from typing import Any
import dataproperty
import typepy
from mbstrdecoder import MultiByteStrDecoder
from ...error import EmptyTableDataError
from ...style import ReStructuredTextStyler, StylerInterface
from .._table_writer import AbstractTableWriter
from ._text_writer import IndentationTextTableWriter
class RstTableWriter(IndentationTextTableWriter):
"""
A base class of reStructuredText table writer.
"""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.char_header_row_separator = "="
self.char_cross_point = "+"
self.char_left_cross_point = "+"
self.char_right_cross_point = "+"
self.char_top_left_cross_point = "+"
self.char_top_right_cross_point = "+"
self.char_bottom_left_cross_point = "+"
self.char_bottom_right_cross_point = "+"
self.char_header_row_cross_point = "+"
self.char_header_row_left_cross_point = "+"
self.char_header_row_right_cross_point = "+"
self.char_opening_row_cross_point = "+"
self.char_closing_row_cross_point = "+"
self.indent_string = kwargs.get("indent_string", " ")
self.is_write_header_separator_row = True
self.is_write_value_separator_row = True
self.is_write_opening_row = True
self.is_write_closing_row = True
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
self._init_cross_point_maps()
def write_table(self, **kwargs: Any) -> None:
with self._logger:
self._write_line(self._get_table_directive())
try:
self._verify_property()
except EmptyTableDataError:
self._logger.logger.debug("no tabular data found")
return
self._write_table(**kwargs)
if self.is_write_null_line_after_table:
self.write_null_line()
def _get_table_directive(self) -> str:
if typepy.is_null_string(self.table_name):
return ".. table::\n"
return f".. table:: {MultiByteStrDecoder(self.table_name).unicode_str}\n"
def _write_table(self, **kwargs: Any) -> None:
self.inc_indent_level()
super()._write_table(**kwargs)
self.dec_indent_level()
def _create_styler(self, writer: AbstractTableWriter) -> StylerInterface:
return ReStructuredTextStyler(writer)
class RstCsvTableWriter(RstTableWriter):
"""
A table class writer for reStructuredText
`CSV table <http://docutils.sourceforge.net/docs/ref/rst/directives.html#id4>`__
format.
:Example:
:ref:`example-rst-csv-table-writer`
"""
FORMAT_NAME = "rst_csv_table"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.column_delimiter = ", "
self.char_cross_point = ""
self.is_padding = False
self.is_write_header_separator_row = False
self.is_write_value_separator_row = False
self.is_write_closing_row = False
self._quoting_flags[typepy.Typecode.STRING] = True
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with reStructuredText CSV table format.
:Example:
:ref:`example-rst-csv-table-writer`
.. note::
- |None| values are written as an empty string
"""
IndentationTextTableWriter.write_table(self, **kwargs)
def _get_opening_row_items(self) -> list[str]:
directive = ".. csv-table::"
if typepy.is_null_string(self.table_name):
return [directive]
return [f"{directive} {MultiByteStrDecoder(self.table_name).unicode_str}"]
def _write_opening_row(self) -> None:
self.dec_indent_level()
super()._write_opening_row()
self.inc_indent_level()
def _write_header(self) -> None:
if not self.is_write_header:
return
if typepy.is_not_empty_sequence(self.headers):
self._write_line(
':header: "{:s}"'.format(
'", "'.join(MultiByteStrDecoder(header).unicode_str for header in self.headers)
)
)
self._write_line(
":widths: " + ", ".join(str(col_dp.ascii_char_width) for col_dp in self._column_dp_list)
)
self._write_line()
def _get_value_row_separator_items(self) -> list[str]:
return []
def _get_closing_row_items(self) -> list[str]:
return []
class RstGridTableWriter(RstTableWriter):
"""
A table writer class for reStructuredText
`Grid Tables <http://docutils.sourceforge.net/docs/ref/rst/restructuredtext.html#grid-tables>`__
format.
:Example:
:ref:`example-rst-grid-table-writer`
.. py:method:: write_table
|write_table| with reStructuredText grid tables format.
:Example:
:ref:`example-rst-grid-table-writer`
.. note::
- |None| values are written as an empty string
"""
FORMAT_NAME = "rst_grid_table"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return False
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.char_left_side_row = "|"
self.char_right_side_row = "|"
class RstSimpleTableWriter(RstTableWriter):
"""
A table writer class for reStructuredText
`Simple Tables
<http://docutils.sourceforge.net/docs/ref/rst/restructuredtext.html#simple-tables>`__
format.
:Example:
:ref:`example-rst-simple-table-writer`
.. py:method:: write_table
|write_table| with reStructuredText simple tables format.
:Example:
:ref:`example-rst-simple-table-writer`
.. note::
- |None| values are written as an empty string
"""
FORMAT_NAME = "rst_simple_table"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return False
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.column_delimiter = " "
self.char_cross_point = " "
self.char_opening_row_cross_point = " "
self.char_closing_row_cross_point = " "
self.char_header_row_cross_point = " "
self.char_header_row_left_cross_point = " "
self.char_header_row_right_cross_point = " "
self.char_opening_row = "="
self.char_closing_row = "="
self.is_write_value_separator_row = False
self._init_cross_point_maps()

View File

@ -0,0 +1,39 @@
import copy
from typing import Any
import dataproperty
from ._csv import CsvTableWriter
class SpaceAlignedTableWriter(CsvTableWriter):
"""
A table writer class for space-separated values (SSV) format.
:Example:
:ref:`example-space-aligned-table-writer`
.. py:method:: write_table
|write_table| with SSV format.
:Example:
:ref:`example-space-aligned-table-writer`
"""
FORMAT_NAME = "space_aligned"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.column_delimiter = " "
self.char_cross_point = " "
self.is_padding = True
self.is_formatting_float = kwargs.get("is_formatting_float", True)
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)

View File

@ -0,0 +1,648 @@
import enum
import io
import sys
from collections.abc import Sequence
from itertools import chain
from typing import IO, Any, Optional, Union, cast
import typepy
from dataproperty import ColumnDataProperty, DataProperty, LineBreakHandling
from ...error import EmptyTableDataError
from ...style import Cell, ColSeparatorStyleFilterFunc, Style, StylerInterface, TextStyler
from .._common import HEADER_ROW
from .._table_writer import AbstractTableWriter
from ._interface import IndentationInterface, TextWriterInterface
@enum.unique
class RowType(enum.Enum):
OPENING = "opening"
HEADER_SEPARATOR = "header separator"
MIDDLE = "middle"
CLOSING = "closing"
class TextTableWriter(AbstractTableWriter, TextWriterInterface):
"""
A base class for table writer with text formats.
.. figure:: ss/table_char.png
:scale: 60%
:alt: table_char
Character attributes that compose a table
.. py:attribute:: column_delimiter
:type: str
A column delimiter of a table.
.. py:attribute:: char_left_side_row
:type: str
A character of a left side of a row.
.. py:attribute:: char_right_side_row
:type: str
A character of a right side of a row.
.. py:attribute:: char_cross_point
:type: str
A character of the crossing point of column delimiter and row
delimiter.
.. py:attribute:: char_opening_row
:type: str
A character of the first line of a table.
.. py:attribute:: char_header_row_separator
:type: str
A character of a separator line of the header and
the body of the table.
.. py:attribute:: char_value_row_separator
:type: str
A character of a row separator line of the table.
.. py:attribute:: char_closing_row
:type: str
A character of the last line of a table.
.. py:attribute:: is_write_header_separator_row
:type: bool
Write a header separator line of the table if the value is |True|.
.. py:attribute:: is_write_value_separator_row
:type: bool
Write row separator line(s) of the table if the value is |True|.
.. py:attribute:: is_write_opening_row
:type: bool
Write an opening line of the table if the value is |True|.
.. py:attribute:: is_write_closing_row
:type: bool
Write a closing line of the table if the value is |True|.
.. py:attribute:: is_write_null_line_after_table
:type: bool
Write a blank line of after writing a table if the value is |True|.
.. py:attribute:: margin
:type: int
Margin size for each cells
"""
def __update_template(self) -> None:
self.__value_cell_margin_format = self.__make_margin_format(" ")
self.__opening_row_cell_format = self.__make_margin_format(self.char_opening_row)
self._header_row_separator_cell_format = self.__make_margin_format(
self.char_header_row_separator
)
self.__value_row_separator_cell_format = self.__make_margin_format(
self.char_value_row_separator
)
self.__closing_row_cell_format = self.__make_margin_format(self.char_closing_row)
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.stream = sys.stdout
self._set_chars("")
self.column_delimiter = "|"
self.char_opening_row = "-"
self.char_opening_row_cross_point = "-"
self.char_header_row_separator = "-"
self.char_header_row_cross_point = "-"
self.char_value_row_separator = "-"
self.char_closing_row = "-"
self.char_closing_row_cross_point = "-"
self._margin = kwargs.get("margin", 0)
self._dp_extractor.preprocessor.line_break_handling = LineBreakHandling.REPLACE
self.is_write_null_line_after_table = kwargs.get("is_write_null_line_after_table", False)
self._init_cross_point_maps()
self._col_separator_style_filters: list[ColSeparatorStyleFilterFunc] = []
if "theme" in kwargs:
self.set_theme(kwargs["theme"])
def __repr__(self) -> str:
return self.dumps()
@property
def margin(self) -> int:
return self._margin
@margin.setter
def margin(self, value: int) -> None:
if value < 0:
raise ValueError("margin value must be zero or greater")
if self._margin == value:
return
self._margin = value
self._clear_preprocess()
def _init_cross_point_maps(self) -> None:
self.__cross_point_maps = {
RowType.OPENING: self.char_opening_row_cross_point,
RowType.HEADER_SEPARATOR: self.char_header_row_cross_point,
RowType.MIDDLE: self.char_cross_point,
RowType.CLOSING: self.char_closing_row_cross_point,
}
self.__left_cross_point_maps = {
RowType.OPENING: self.char_top_left_cross_point,
RowType.HEADER_SEPARATOR: self.char_header_row_left_cross_point,
RowType.MIDDLE: self.char_left_cross_point,
RowType.CLOSING: self.char_bottom_left_cross_point,
}
self.__right_cross_point_maps = {
RowType.OPENING: self.char_top_right_cross_point,
RowType.HEADER_SEPARATOR: self.char_header_row_right_cross_point,
RowType.MIDDLE: self.char_right_cross_point,
RowType.CLOSING: self.char_bottom_right_cross_point,
}
def add_col_separator_style_filter(self, style_filter: ColSeparatorStyleFilterFunc) -> None:
"""Add a style filter function for columns to the writer.
Args:
style_filter:
A function that called for each cell in the table to apply a style
to table cells.
The function will be required to implement the following Protocol:
.. code-block:: python
class ColSeparatorStyleFilterFunc(Protocol):
def __call__(
self, left_cell: Optional[Cell], right_cell: Optional[Cell], **kwargs: Any
) -> Optional[Style]:
...
If more than one style filter function is added to the writer,
it will be called from the last one added.
These style functions should return |None| when not needed to apply styles.
If all of the style functions returned |None|,
:py:attr:`~.default_style` will be applied.
You can pass keyword arguments to style filter functions via
:py:attr:`~.style_filter_kwargs`. In default, the attribute includes:
- ``writer``: the writer instance that the caller of a ``style_filter function``
"""
self._col_separator_style_filters.insert(0, style_filter)
self._clear_preprocess()
def clear_theme(self) -> None:
"""Remove all of the style filters."""
super().clear_theme()
if not self._col_separator_style_filters:
return
self._col_separator_style_filters = []
self._clear_preprocess()
def write_null_line(self) -> None:
"""
Write a null line to the |stream|.
"""
self._write_line()
def write_table(self, **kwargs: Any) -> None:
"""
|write_table|.
.. note::
- |None| values are written as an empty string.
"""
try:
super().write_table(**kwargs)
except EmptyTableDataError:
raise
if self.is_write_null_line_after_table:
self.write_null_line()
def dump(self, output: Union[str, IO], close_after_write: bool = True, **kwargs: Any) -> None:
"""Write data to the output with tabular format.
During the executing this method,
:py:attr:`~pytablewriter.writer._table_writer.AbstractTableWriter.enable_ansi_escape`
attribute will be temporarily set to |False|.
Args:
output:
The value must either an output stream or a path to an output file.
close_after_write:
Close the output after write.
Defaults to |True|.
"""
try:
output.write # type: ignore
self.stream = output
except AttributeError:
self.stream = open(output, "w", encoding="utf-8") # type: ignore
stash = self.enable_ansi_escape
self.enable_ansi_escape = False
try:
self.write_table(**kwargs)
finally:
if close_after_write:
self.stream.close() # type: ignore
self.stream = sys.stdout
self.enable_ansi_escape = stash
def dumps(self, **kwargs: Any) -> str:
"""Get rendered tabular text from the table data.
Only available for text format table writers.
Args:
**kwargs:
Optional arguments that the writer takes.
Returns:
str: Rendered tabular text.
"""
old_stream = self.stream
try:
self.stream = io.StringIO()
self.write_table(**kwargs)
tabular_text = self.stream.getvalue()
finally:
self.stream = old_stream
return tabular_text
def _set_chars(self, c: str) -> None:
self.char_left_side_row = c
self.char_right_side_row = c
self.char_cross_point = c
self.char_left_cross_point = c
self.char_right_cross_point = c
self.char_top_left_cross_point = c
self.char_top_right_cross_point = c
self.char_bottom_left_cross_point = c
self.char_bottom_right_cross_point = c
self.char_opening_row = c
self.char_opening_row_cross_point = c
self.char_header_row_separator = c
self.char_header_row_cross_point = c
self.char_header_row_left_cross_point = c
self.char_header_row_right_cross_point = c
self.char_value_row_separator = c
self.char_closing_row = c
self.char_closing_row_cross_point = c
self._init_cross_point_maps()
def _create_styler(self, writer: AbstractTableWriter) -> StylerInterface:
return TextStyler(writer)
def _write_table_iter(self, **kwargs: Any) -> None:
super()._write_table_iter()
if self.is_write_null_line_after_table:
self.write_null_line()
def _write_table(self, **kwargs: Any) -> None:
self._preprocess()
self._write_opening_row()
try:
self._write_header()
self.__write_header_row_separator()
except ValueError:
pass
is_first_value_row = True
for row, (values, value_dp_list) in enumerate(
zip(self._table_value_matrix, self._table_value_dp_matrix)
):
try:
if is_first_value_row:
is_first_value_row = False
else:
if self.is_write_value_separator_row:
self._write_value_row_separator()
self._write_value_row(row, cast(list[str], values), value_dp_list)
except TypeError:
continue
self._write_closing_row()
def _get_opening_row_items(self) -> list[str]:
return self.__get_row_separator_items(self.__opening_row_cell_format, self.char_opening_row)
def _get_header_row_separator_items(self) -> list[str]:
return self.__get_row_separator_items(
self._header_row_separator_cell_format, self.char_header_row_separator
)
def _get_value_row_separator_items(self) -> list[str]:
return self.__get_row_separator_items(
self.__value_row_separator_cell_format, self.char_value_row_separator
)
def _get_closing_row_items(self) -> list[str]:
return self.__get_row_separator_items(self.__closing_row_cell_format, self.char_closing_row)
def __get_row_separator_items(self, margin_format: str, separator_char: str) -> list[str]:
return [
margin_format.format(separator_char * self._get_padding_len(col_dp))
for col_dp in self._column_dp_list
]
def _to_header_item(self, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
return self.__value_cell_margin_format.format(super()._to_header_item(col_dp, value_dp))
def _apply_style_to_row_item(
self, row_idx: int, col_dp: ColumnDataProperty, value_dp: DataProperty, style: Style
) -> str:
return self.__value_cell_margin_format.format(
super()._apply_style_to_row_item(row_idx, col_dp, value_dp, style)
)
def _write_raw_string(self, unicode_text: str) -> None:
self.stream.write(unicode_text)
def _write_raw_line(self, unicode_text: str = "") -> None:
self._write_raw_string(unicode_text + "\n")
def _write(self, text: str) -> None:
self._write_raw_string(text)
def _write_line(self, text: str = "") -> None:
self._write_raw_line(text)
def _fetch_col_separator_style(
self, left_cell: Optional[Cell], right_cell: Optional[Cell], default_style: Style
) -> Style:
for style_filter in self._col_separator_style_filters:
style = style_filter(left_cell, right_cell, **self.style_filter_kwargs)
if style:
return style
return default_style
def __to_column_delimiter(
self,
row: int,
left_col_dp: Optional[ColumnDataProperty],
right_col_dp: Optional[ColumnDataProperty],
col_delimiter: str,
) -> str:
left_cell = None
if left_col_dp:
left_cell = Cell(
row=row,
col=left_col_dp.column_index,
value=col_delimiter,
default_style=self._get_col_style(left_col_dp.column_index),
)
right_cell = None
if right_col_dp:
right_cell = Cell(
row=row,
col=right_col_dp.column_index,
value=col_delimiter,
default_style=self._get_col_style(right_col_dp.column_index),
)
style = self._fetch_col_separator_style(
left_cell=left_cell,
right_cell=right_cell,
default_style=self.default_style,
)
return self._styler.apply_terminal_style(col_delimiter, style=style)
def _write_row(self, row: int, values: Sequence[str]) -> None:
if typepy.is_empty_sequence(values):
return
col_delimiters = (
[
self.__to_column_delimiter(
row,
None,
self._column_dp_list[0],
self.char_left_side_row,
)
]
+ [
self.__to_column_delimiter(
row,
self._column_dp_list[col_idx],
self._column_dp_list[col_idx + 1],
self.column_delimiter,
)
for col_idx in range(len(self._column_dp_list) - 1)
]
+ [
self.__to_column_delimiter(
row,
self._column_dp_list[-1],
None,
self.char_right_side_row,
)
]
)
row_items = [""] * (len(col_delimiters) + len(values))
row_items[::2] = col_delimiters
row_items[1::2] = list(values)
self._write_line("".join(chain.from_iterable(row_items)))
def _write_header(self) -> None:
if not self.is_write_header:
return
if typepy.is_empty_sequence(self._table_headers):
raise ValueError("header is empty")
self._write_row(HEADER_ROW, self._table_headers)
def _write_value_row(
self, row: int, values: Sequence[str], value_dp_list: Sequence[DataProperty]
) -> None:
self._write_row(row, values)
def __write_separator_row(
self, values: Sequence[str], row_type: RowType = RowType.MIDDLE
) -> None:
if typepy.is_empty_sequence(values):
return
cross_point = self.__cross_point_maps[row_type]
left_cross_point = self.__left_cross_point_maps[row_type]
right_cross_point = self.__right_cross_point_maps[row_type]
left_cross_point = left_cross_point if left_cross_point else cross_point
right_cross_point = right_cross_point if right_cross_point else cross_point
if typepy.is_null_string(self.char_left_side_row):
left_cross_point = ""
if typepy.is_null_string(self.char_right_side_row):
right_cross_point = ""
self._write_line(left_cross_point + cross_point.join(values) + right_cross_point)
def _write_opening_row(self) -> None:
if not self.is_write_opening_row:
return
self.__write_separator_row(self._get_opening_row_items(), row_type=RowType.OPENING)
def __write_header_row_separator(self) -> None:
if any([not self.is_write_header, not self.is_write_header_separator_row]):
return
self.__write_separator_row(
self._get_header_row_separator_items(), row_type=RowType.HEADER_SEPARATOR
)
def _write_value_row_separator(self) -> None:
"""
Write row separator of the table which matched to the table type
regardless of the value of the
:py:attr:`.is_write_value_separator_row`.
"""
self.__write_separator_row(self._get_value_row_separator_items())
def _write_closing_row(self) -> None:
if not self.is_write_closing_row:
return
self.__write_separator_row(self._get_closing_row_items(), row_type=RowType.CLOSING)
def __make_margin_format(self, margin_char: str) -> str:
margin_str = margin_char * self._margin
return margin_str + "{:s}" + margin_str
def _preprocess_table_property(self) -> None:
super()._preprocess_table_property()
self.__update_template()
self._init_cross_point_maps()
class IndentationTextTableWriter(TextTableWriter, IndentationInterface):
"""A base class for table writer with indentation text formats.
Args:
indent_level (int): Indentation level. Defaults to ``0``.
.. py:attribute:: indent_string
Indentation string for each level.
"""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.set_indent_level(kwargs.get("indent_level", 0))
self.indent_string = kwargs.get("indent_string", "")
def set_indent_level(self, indent_level: int) -> None:
"""Set the indentation level.
Args:
indent_level (int): New indentation level.
"""
self._indent_level = indent_level
def inc_indent_level(self) -> None:
"""Increment the indentation level."""
self._indent_level += 1
def dec_indent_level(self) -> None:
"""Decrement the indentation level."""
self._indent_level -= 1
def write_table(self, **kwargs: Any) -> None:
"""
|write_table|.
Args:
indent (Optional[int]):
Indent level of an output.
Interpretation of indent level value differ format to format.
Some writer classes may ignore this value.
.. note::
- |None| values are written as an empty string.
"""
indent = kwargs.pop("indent", None)
if indent is not None:
self._logger.logger.debug(f"indent: {indent}")
self.set_indent_level(int(indent))
try:
super().write_table(**kwargs)
except EmptyTableDataError:
self._logger.logger.debug("no tabular data found")
return
def _get_indent_string(self) -> str:
return self.indent_string * self._indent_level
def _write(self, text: str) -> None:
self._write_raw_string(self._get_indent_string() + text)
def _write_line(self, text: str = "") -> None:
if typepy.is_not_null_string(text):
self._write_raw_line(self._get_indent_string() + text)
else:
self._write_raw_line("")

View File

@ -0,0 +1,83 @@
import warnings
from decimal import Decimal
from typing import Any
import typepy
from .._common import import_error_msg_template
from ._common import serialize_dp
from ._text_writer import TextTableWriter
class TomlTableWriter(TextTableWriter):
"""
A table writer class for
`TOML <https://github.com/toml-lang/toml>`__ data format.
:Example:
:ref:`example-toml-table-writer`
"""
FORMAT_NAME = "toml"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.is_formatting_float = False
self._is_require_table_name = True
self._is_require_header = True
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with
`TOML <https://github.com/toml-lang/toml>`__ format.
:raises pytablewriter.EmptyTableNameError:
If the |headers| is empty.
:Example:
:ref:`example-toml-table-writer`
"""
try:
import toml
class TomlTableEncoder(toml.encoder.TomlEncoder):
def __init__(self, _dict=dict, preserve=False): # type: ignore
super().__init__(_dict=_dict, preserve=preserve)
self.dump_funcs[str] = str
self.dump_funcs[Decimal] = toml.encoder._dump_float # type: ignore
except ImportError:
warnings.warn(import_error_msg_template.format("toml"))
raise
with self._logger:
self._verify_property()
self._preprocess()
body = []
for value_dp_list in self._table_value_dp_matrix:
row = {}
for header, value in zip(
self.headers,
[serialize_dp(value_dp) for value_dp in value_dp_list],
):
if typepy.is_null_string(value):
continue
row[header] = value
body.append(row)
self.stream.write(toml.dumps({self.table_name: body}, encoder=TomlTableEncoder()))

View File

@ -0,0 +1,23 @@
from typing import Any
from ._csv import CsvTableWriter
class TsvTableWriter(CsvTableWriter):
"""
A table writer class for tab separated values (TSV) format.
:Example:
:ref:`example-tsv-table-writer`
"""
FORMAT_NAME = "tsv"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.column_delimiter = "\t"

View File

@ -0,0 +1,120 @@
import copy
from typing import Any
import dataproperty as dp
from ._text_writer import IndentationTextTableWriter
class UnicodeTableWriter(IndentationTextTableWriter):
"""
A table writer class using Unicode characters.
:Example:
:ref:`example-unicode-table-writer`
"""
FORMAT_NAME = "unicode"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.column_delimiter = ""
self.char_left_side_row = ""
self.char_right_side_row = ""
self.char_cross_point = ""
self.char_left_cross_point = ""
self.char_right_cross_point = ""
self.char_header_row_cross_point = ""
self.char_header_row_left_cross_point = ""
self.char_header_row_right_cross_point = ""
self.char_top_left_cross_point = ""
self.char_top_right_cross_point = ""
self.char_bottom_left_cross_point = ""
self.char_bottom_right_cross_point = ""
self.char_opening_row = ""
self.char_opening_row_cross_point = ""
self.char_header_row_separator = ""
self.char_value_row_separator = ""
self.char_closing_row = ""
self.char_closing_row_cross_point = ""
self.indent_string = kwargs.get("indent_string", " ")
self.is_write_header_separator_row = True
self.is_write_value_separator_row = True
self.is_write_opening_row = True
self.is_write_closing_row = True
self._quoting_flags = copy.deepcopy(dp.NOT_QUOTING_FLAGS)
self._init_cross_point_maps()
class BoldUnicodeTableWriter(IndentationTextTableWriter):
"""
A table writer class using bold Unicode characters.
:Example:
:ref:`example-unicode-table-writer`
"""
FORMAT_NAME = "bold_unicode"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.table_name = ""
self.column_delimiter = ""
self.char_left_side_row = ""
self.char_right_side_row = ""
self.char_cross_point = ""
self.char_left_cross_point = ""
self.char_right_cross_point = ""
self.char_header_row_cross_point = ""
self.char_header_row_left_cross_point = ""
self.char_header_row_right_cross_point = ""
self.char_top_left_cross_point = ""
self.char_top_right_cross_point = ""
self.char_bottom_left_cross_point = ""
self.char_bottom_right_cross_point = ""
self.char_opening_row = ""
self.char_opening_row_cross_point = ""
self.char_header_row_separator = ""
self.char_value_row_separator = ""
self.char_closing_row = ""
self.char_closing_row_cross_point = ""
self.indent_string = kwargs.get("indent_string", " ")
self.is_write_header_separator_row = True
self.is_write_value_separator_row = True
self.is_write_opening_row = True
self.is_write_closing_row = True
self._quoting_flags = copy.deepcopy(dp.NOT_QUOTING_FLAGS)
self._init_cross_point_maps()

View File

@ -0,0 +1,71 @@
import copy
import warnings
from typing import Any, Union
import dataproperty
from .._common import import_error_msg_template
from ._common import serialize_dp
from ._text_writer import TextTableWriter
class YamlTableWriter(TextTableWriter):
"""
A table writer class for `YAML <https://yaml.org/>`__ format.
:Example:
:ref:`example-yaml-table-writer`
"""
FORMAT_NAME = "yaml"
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.is_padding = False
self._dp_extractor.float_type = float
self._quoting_flags = copy.deepcopy(dataproperty.NOT_QUOTING_FLAGS)
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return False
def write_table(self, **kwargs: Any) -> None:
"""
|write_table| with
YAML format.
:Example:
:ref:`example-yaml-table-writer`
"""
try:
import yaml
except ImportError:
warnings.warn(import_error_msg_template.format("yaml"))
raise
with self._logger:
self._verify_property()
self._preprocess()
if self.headers:
matrix: list[Union[dict[str, Any], list[Any]]] = [
dict(zip(self.headers, [serialize_dp(value_dp) for value_dp in value_dp_list]))
for value_dp_list in self._table_value_dp_matrix
]
else:
matrix = [
[serialize_dp(value_dp) for value_dp in value_dp_list]
for value_dp_list in self._table_value_dp_matrix
]
if self.table_name:
self._write(yaml.safe_dump({self.table_name: matrix}, default_flow_style=False))
else:
self._write(yaml.safe_dump(matrix, default_flow_style=False))

View File

@ -0,0 +1,12 @@
from ._javascript import JavaScriptTableWriter
from ._numpy import NumpyTableWriter
from ._pandas import PandasDataFrameWriter
from ._python import PythonCodeTableWriter
__all__ = (
"JavaScriptTableWriter",
"NumpyTableWriter",
"PandasDataFrameWriter",
"PythonCodeTableWriter",
)

View File

@ -0,0 +1,137 @@
import io
from datetime import datetime
from typing import Any, Final
from dataproperty import ColumnDataProperty, DataProperty, DefaultValue
from typepy import StrictLevel, Typecode
from ...._converter import strip_quote
from ...._function import quote_datetime_formatter
from ....sanitizer import sanitize_js_var_name
from .._common import bool_to_str
from ._sourcecode import SourceCodeTableWriter
def js_datetime_formatter(value: datetime) -> str:
try:
return f'new Date("{value.strftime(DefaultValue.DATETIME_FORMAT):s}")'
except ValueError:
# the datetime strftime() methods require year >= 1900
return f'new Date("{value}")'
class JavaScriptTableWriter(SourceCodeTableWriter):
"""
A table writer for class JavaScript format.
:Example:
:ref:`example-js-table-writer`
.. py:attribute:: variable_declaration
:type: str
:value: "const"
JavaScript variable declarations type.
The value must be either ``"var"``, ``"let"`` or ``"const"``.
.. py:method:: write_table
|write_table| with JavaScript format.
The tabular data are written as a nested list variable definition.
:raises pytablewriter.EmptyTableNameError:
If the |table_name| is empty.
:Example:
:ref:`example-js-table-writer`
.. note::
Specific values in the tabular data are converted when writing:
- |None|: written as ``null``
- |inf|: written as ``Infinity``
- |nan|: written as ``NaN``
- |datetime| instances determined by |is_datetime_instance_formatting| attribute:
- |True|: written as `dateutil.parser <https://dateutil.readthedocs.io/en/stable/parser.html>`__
- |False|: written as |str|
.. seealso::
:ref:`example-type-hint-js`
"""
FORMAT_NAME = "javascript"
__VALID_VAR_DECLARATION: Final = ("var", "let", "const")
__NONE_VALUE_DP: Final = DataProperty("null")
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
@property
def variable_declaration(self) -> str:
return self.__variable_declaration
@variable_declaration.setter
def variable_declaration(self, value: str) -> None:
value = value.strip().casefold()
if value not in self.__VALID_VAR_DECLARATION:
raise ValueError("declaration must be either var, let or const")
self.__variable_declaration = value
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.variable_declaration = "const"
self._dp_extractor.type_value_map = {
# Typecode.NONE: "null",
Typecode.INFINITY: "Infinity",
Typecode.NAN: "NaN",
}
self._dp_extractor.update_strict_level_map({Typecode.BOOL: StrictLevel.MAX})
self.register_trans_func(bool_to_str)
def get_variable_name(self, value: str) -> str:
return sanitize_js_var_name(value, "_").casefold()
def _write_table(self, **kwargs: Any) -> None:
if self.is_datetime_instance_formatting:
self._dp_extractor.datetime_formatter = js_datetime_formatter
else:
self._dp_extractor.datetime_formatter = quote_datetime_formatter
org_stream = self.stream
self.stream = io.StringIO()
self.inc_indent_level()
super()._write_table(**kwargs)
self.dec_indent_level()
js_matrix_var_def_text = self.stream.getvalue().rstrip("\n")
js_matrix_var_def_text = strip_quote(js_matrix_var_def_text, "true")
js_matrix_var_def_text = strip_quote(js_matrix_var_def_text, "false")
if self.is_write_closing_row:
js_matrix_var_def_line_list = js_matrix_var_def_text.splitlines()
js_matrix_var_def_line_list[-2] = js_matrix_var_def_line_list[-2].rstrip(",")
js_matrix_var_def_text = "\n".join(js_matrix_var_def_line_list)
self.stream.close()
self.stream = org_stream
self.dec_indent_level()
self._write_line(js_matrix_var_def_text)
self.inc_indent_level()
def _get_opening_row_items(self) -> list[str]:
return [f"{self.variable_declaration:s} {self.variable_name:s} = ["]
def _get_closing_row_items(self) -> list[str]:
return ["];"]
def _to_row_item(self, row_idx: int, col_dp: ColumnDataProperty, value_dp: DataProperty) -> str:
if value_dp.data is None:
value_dp = self.__NONE_VALUE_DP
return super()._to_row_item(row_idx, col_dp, value_dp)

View File

@ -0,0 +1,62 @@
from typing import Any
import typepy
from ._python import PythonCodeTableWriter
class NumpyTableWriter(PythonCodeTableWriter):
"""
A table writer class for ``NumPy`` source code format.
:Example:
:ref:`example-numpy-table-writer`
.. py:method:: write_table
|write_table| with ``NumPy`` format.
The tabular data are written as a variable definition of
``numpy.array``.
:raises pytablewriter.EmptyTableNameError:
If the |table_name| is empty.
:Example:
:ref:`example-numpy-table-writer`
.. note::
Specific values in the tabular data are converted when writing:
- |None|: written as ``None``
- |inf|: written as ``numpy.inf``
- |nan|: written as ``numpy.nan``
- |datetime| instances determined by |is_datetime_instance_formatting| attribute:
- |True|: written as `dateutil.parser <https://dateutil.readthedocs.io/en/stable/parser.html>`__
- |False|: written as |str|
.. seealso::
:ref:`example-type-hint-python`
"""
FORMAT_NAME = "numpy"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.import_numpy_as = "np"
self._dp_extractor.set_type_value(typepy.Typecode.INFINITY, f"{self.import_numpy_as:s}.inf")
self._dp_extractor.set_type_value(typepy.Typecode.NAN, f"{self.import_numpy_as:s}.nan")
def _get_opening_row_items(self) -> list[str]:
array_def = f"{self.import_numpy_as:s}.array(["
if typepy.is_not_null_string(self.table_name):
return [f"{self.variable_name} = {array_def}"]
return [array_def]
def _get_closing_row_items(self) -> list[str]:
return ["])"]

View File

@ -0,0 +1,86 @@
from typing import Any
import typepy
from mbstrdecoder import MultiByteStrDecoder
from ....error import EmptyTableNameError
from ._numpy import NumpyTableWriter
class PandasDataFrameWriter(NumpyTableWriter):
"""
A writer class for Pandas DataFrame format.
:Example:
:ref:`example-pandas-dataframe-writer`
.. py:attribute:: import_pandas_as
:type: str
:value: "pd"
Specify ``pandas`` module import name of an output source code.
.. py:attribute:: import_numpy_as
:type: str
:value: "np"
Specify ``numpy`` module import name of an output source code.
.. py:method:: write_table
|write_table| with Pandas DataFrame format.
The tabular data are written as a ``pandas.DataFrame`` class
instance definition.
:raises pytablewriter.EmptyTableNameError:
If the |table_name| is empty.
:Example:
:ref:`example-pandas-dataframe-writer`
.. note::
Specific values in the tabular data are converted when writing:
- |None|: written as ``None``
- |inf|: written as ``numpy.inf``
- |nan|: written as ``numpy.nan``
- |datetime| instances determined by |is_datetime_instance_formatting| attribute:
- |True|: written as `dateutil.parser <https://dateutil.readthedocs.io/en/stable/parser.html>`__
- |False|: written as |str|
.. seealso::
:ref:`example-type-hint-python`
"""
FORMAT_NAME = "pandas"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.import_pandas_as = "pd"
self.is_write_header = False
def _get_opening_row_items(self) -> list[str]:
return [f"{self.variable_name} = {self.import_pandas_as}.DataFrame(["]
def _get_closing_row_items(self) -> list[str]:
if typepy.is_not_empty_sequence(self.headers):
return [
"], columns=[{}])".format(
", ".join(
f'"{MultiByteStrDecoder(header).unicode_str}"' for header in self.headers
)
)
]
return ["])"]
def _verify_property(self) -> None:
super()._verify_property()
if typepy.is_null_string(self.table_name):
raise EmptyTableNameError("table_name must be a string of one or more characters")

View File

@ -0,0 +1,81 @@
from typing import Any
import typepy
from ...._function import dateutil_datetime_formatter, quote_datetime_formatter
from ....sanitizer import sanitize_python_var_name
from ._sourcecode import SourceCodeTableWriter
class PythonCodeTableWriter(SourceCodeTableWriter):
"""
A table writer class for Python source code format.
:Example:
:ref:`example-python-code-table-writer`
.. py:method:: write_table
|write_table| with Python format.
The tabular data are written as a nested list variable definition
for Python format.
:raises pytablewriter.EmptyTableNameError:
If the |table_name| is empty.
:Example:
:ref:`example-python-code-table-writer`
.. note::
Specific values in the tabular data are converted when writing:
- |None|: written as ``None``
- |inf|: written as ``float("inf")``
- |nan|: written as ``float("nan")``
- |datetime| instances determined by |is_datetime_instance_formatting| attribute:
- |True|: written as `dateutil.parser <https://dateutil.readthedocs.io/en/stable/parser.html>`__
- |False|: written as |str|
.. seealso::
:ref:`example-type-hint-python`
"""
FORMAT_NAME = "python"
@property
def format_name(self) -> str:
return self.FORMAT_NAME
@property
def support_split_write(self) -> bool:
return True
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._dp_extractor.type_value_map = {
typepy.Typecode.NONE: None,
typepy.Typecode.INFINITY: 'float("inf")',
typepy.Typecode.NAN: 'float("nan")',
}
def get_variable_name(self, value: str) -> str:
return sanitize_python_var_name(self.table_name, "_").lower()
def _write_table(self, **kwargs: Any) -> None:
if self.is_datetime_instance_formatting:
self._dp_extractor.datetime_formatter = dateutil_datetime_formatter
else:
self._dp_extractor.datetime_formatter = quote_datetime_formatter
self.inc_indent_level()
super()._write_table(**kwargs)
self.dec_indent_level()
def _get_opening_row_items(self) -> list[str]:
if typepy.is_not_null_string(self.table_name):
return [self.variable_name + " = ["]
return ["["]
def _get_closing_row_items(self) -> list[str]:
return ["]"]

View File

@ -0,0 +1,79 @@
import abc
from typing import Any
import typepy
from .._text_writer import IndentationTextTableWriter
class SourceCodeTableWriter(IndentationTextTableWriter):
"""
Base class of table writer with a source code (variable definition) format.
.. py:attribute:: is_datetime_instance_formatting
:type: bool
Write |datetime| values in the table as definition of |datetime| class
instances coincide with specific language if this value is |True|.
Write as |str| if this value is |False|.
"""
@abc.abstractmethod
def get_variable_name(self, value: str) -> str: # pragma: no cover
pass
@property
def variable_name(self) -> str:
"""
str: Return a valid variable name that converted from the |table_name|.
"""
return self.get_variable_name(self.table_name)
@property
def margin(self) -> int:
return self._margin
@margin.setter
def margin(self, value: int) -> None:
# margin setting must be ignored
return
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.indent_string = kwargs.get("indent_string", " ")
self.column_delimiter = ", "
self._margin = 0
self.char_left_side_row = "["
self.char_right_side_row = "],"
self.char_cross_point = ""
self.char_opening_row_cross_point = ""
self.char_closing_row_cross_point = ""
self.is_padding = False
self.is_write_header_separator_row = False
self.is_write_opening_row = True
self.is_write_closing_row = True
self.is_formatting_float = False
self.is_datetime_instance_formatting = True
self._quoting_flags[typepy.Typecode.DATETIME] = False
self._is_require_table_name = True
self._init_cross_point_maps()
def _get_value_row_separator_items(self) -> list[str]:
return []
def _write_opening_row(self) -> None:
self.dec_indent_level()
super()._write_opening_row()
self.inc_indent_level()
def _write_closing_row(self) -> None:
self.dec_indent_level()
super()._write_closing_row()
self.inc_indent_level()