Deploy site

This commit is contained in:
Gitea Actions
2025-06-11 03:00:30 +02:00
commit b4a252bc51
2329 changed files with 367195 additions and 0 deletions

View File

@ -0,0 +1,29 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
from .__version__ import __author__, __copyright__, __email__, __license__, __version__
from ._common import convert_idx_to_alphabet
from ._constant import PatternMatch
from ._converter import to_value_matrix
from ._core import TableData
from ._logger import set_logger
from .error import DataError, InvalidHeaderNameError, InvalidTableNameError, NameValidationError
__all__ = (
"__author__",
"__copyright__",
"__email__",
"__license__",
"__version__",
"convert_idx_to_alphabet",
"set_logger",
"to_value_matrix",
"PatternMatch",
"TableData",
"DataError",
"InvalidHeaderNameError",
"InvalidTableNameError",
"NameValidationError",
)

View File

@ -0,0 +1,9 @@
from typing import Final
__author__: Final = "Tsuyoshi Hombashi"
__copyright__: Final = f"Copyright 2017-2024, {__author__}"
__license__: Final = "MIT License"
__version__ = "1.3.4"
__maintainer__: Final = __author__
__email__: Final = "tsuyoshi.hombashi@gmail.com"

View File

@ -0,0 +1,12 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
def convert_idx_to_alphabet(idx: int) -> str:
if idx < 26:
return chr(65 + idx)
div, mod = divmod(idx, 26)
return convert_idx_to_alphabet(div - 1) + convert_idx_to_alphabet(mod)

View File

@ -0,0 +1,11 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import enum
@enum.unique
class PatternMatch(enum.Enum):
OR = 0
AND = 1

View File

@ -0,0 +1,36 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
from collections.abc import Sequence
from typing import Any
from .error import DataError
Row = tuple[int, Any]
def to_value_matrix(headers: Sequence[str], value_matrix: Sequence[Any]) -> list[Row]:
if not value_matrix:
return []
return [_to_row(headers, values, row_idx)[1] for row_idx, values in enumerate(value_matrix)]
def _to_row(headers: Sequence[str], values: Any, row_idx: int) -> Row:
if headers:
try:
values = values._asdict()
except AttributeError:
pass
try:
return (row_idx, [values.get(header) for header in headers])
except (TypeError, AttributeError):
pass
if not isinstance(values, (tuple, list)):
raise DataError(f"row must be a list or tuple: actual={type(values)}")
return (row_idx, values)

View File

@ -0,0 +1,511 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import copy
import re
from collections import OrderedDict, namedtuple
from collections.abc import Iterator, Sequence
from typing import TYPE_CHECKING, Any, Optional, Union
import dataproperty as dp
import typepy
from dataproperty import DataPropertyMatrix
from dataproperty.typing import TypeHint
from typepy import Nan
from ._constant import PatternMatch
from ._converter import to_value_matrix
from ._logger import logger # type: ignore
if TYPE_CHECKING:
import pandas
class TableData:
"""
Class to represent a table data structure.
:param table_name: Name of the table.
:param headers: Table header names.
:param rows: Data of the table.
"""
def __init__(
self,
table_name: Optional[str],
headers: Sequence[str],
rows: Sequence,
dp_extractor: Optional[dp.DataPropertyExtractor] = None,
type_hints: Optional[Sequence[Union[str, TypeHint]]] = None,
max_workers: Optional[int] = None,
max_precision: Optional[int] = None,
) -> None:
self.__table_name = table_name
self.__value_matrix: list[list[Any]] = []
self.__value_dp_matrix: Optional[DataPropertyMatrix] = None
if rows:
self.__rows = rows
else:
self.__rows = []
if dp_extractor:
self.__dp_extractor = copy.deepcopy(dp_extractor)
else:
self.__dp_extractor = dp.DataPropertyExtractor(max_precision=max_precision)
if type_hints:
self.__dp_extractor.column_type_hints = type_hints
self.__dp_extractor.strip_str_header = '"'
if max_workers:
self.__dp_extractor.max_workers = max_workers
if not headers:
self.__dp_extractor.headers = []
else:
self.__dp_extractor.headers = headers
def __repr__(self) -> str:
element_list = [f"table_name={self.table_name}"]
try:
element_list.append("headers=[{}]".format(", ".join(self.headers)))
except TypeError:
element_list.append("headers=None")
element_list.extend([f"cols={self.num_columns}", f"rows={self.num_rows}"])
return ", ".join(element_list)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, TableData):
return False
return self.equals(other, cmp_by_dp=False)
def __ne__(self, other: Any) -> bool:
if not isinstance(other, TableData):
return True
return not self.equals(other, cmp_by_dp=False)
@property
def table_name(self) -> Optional[str]:
"""str: Name of the table."""
return self.__table_name
@table_name.setter
def table_name(self, value: Optional[str]) -> None:
self.__table_name = value
@property
def headers(self) -> Sequence[str]:
"""Sequence[str]: Table header names."""
return self.__dp_extractor.headers
@property
def rows(self) -> Sequence:
"""Sequence: Original rows of tabular data."""
return self.__rows
@property
def value_matrix(self) -> DataPropertyMatrix:
"""DataPropertyMatrix: Converted rows of tabular data."""
if self.__value_matrix:
return self.__value_matrix
self.__value_matrix = [
[value_dp.data for value_dp in value_dp_list] for value_dp_list in self.value_dp_matrix
]
return self.__value_matrix
@property
def has_value_dp_matrix(self) -> bool:
return self.__value_dp_matrix is not None
@property
def max_workers(self) -> int:
return self.__dp_extractor.max_workers
@max_workers.setter
def max_workers(self, value: Optional[int]) -> None:
self.__dp_extractor.max_workers = value
@property
def num_rows(self) -> Optional[int]:
"""Optional[int]:
Number of rows in the tabular data.
|None| if the ``rows`` is neither list nor tuple.
"""
try:
return len(self.rows)
except TypeError:
return None
@property
def num_columns(self) -> Optional[int]:
if typepy.is_not_empty_sequence(self.headers):
return len(self.headers)
try:
return len(self.rows[0])
except TypeError:
return None
except IndexError:
return 0
@property
def value_dp_matrix(self) -> DataPropertyMatrix:
"""DataPropertyMatrix: DataProperty for table data."""
if self.__value_dp_matrix is None:
self.__value_dp_matrix = self.__dp_extractor.to_dp_matrix(
to_value_matrix(self.headers, self.rows)
)
return self.__value_dp_matrix
@property
def header_dp_list(self) -> list[dp.DataProperty]:
return self.__dp_extractor.to_header_dp_list()
@property
def column_dp_list(self) -> list[dp.ColumnDataProperty]:
return self.__dp_extractor.to_column_dp_list(self.value_dp_matrix)
@property
def dp_extractor(self) -> dp.DataPropertyExtractor:
return self.__dp_extractor
def is_empty_header(self) -> bool:
"""bool: |True| if the data :py:attr:`.headers` is empty."""
return typepy.is_empty_sequence(self.headers)
def is_empty_rows(self) -> bool:
"""
:return: |True| if the tabular data has no rows.
:rtype: bool
"""
return self.num_rows == 0
def is_empty(self) -> bool:
"""
:return:
|True| if the data :py:attr:`.headers` or
:py:attr:`.value_matrix` is empty.
:rtype: bool
"""
return any([self.is_empty_header(), self.is_empty_rows()])
def equals(self, other: "TableData", cmp_by_dp: bool = True) -> bool:
if cmp_by_dp:
return self.__equals_dp(other)
return self.__equals_raw(other)
def __equals_base(self, other: "TableData") -> bool:
compare_item_list = [self.table_name == other.table_name]
if self.num_rows is not None:
compare_item_list.append(self.num_rows == other.num_rows)
return all(compare_item_list)
def __equals_raw(self, other: "TableData") -> bool:
if not self.__equals_base(other):
return False
if self.headers != other.headers:
return False
for lhs_row, rhs_row in zip(self.rows, other.rows):
if len(lhs_row) != len(rhs_row):
return False
if not all(
[
lhs == rhs
for lhs, rhs in zip(lhs_row, rhs_row)
if not Nan(lhs).is_type() and not Nan(rhs).is_type()
]
):
return False
return True
def __equals_dp(self, other: "TableData") -> bool:
if not self.__equals_base(other):
return False
if self.header_dp_list != other.header_dp_list:
return False
if self.value_dp_matrix is None or other.value_dp_matrix is None:
return False
for lhs_list, rhs_list in zip(self.value_dp_matrix, other.value_dp_matrix):
if len(lhs_list) != len(rhs_list):
return False
if any([lhs != rhs for lhs, rhs in zip(lhs_list, rhs_list)]):
return False
return True
def in_tabledata_list(self, other: Sequence["TableData"], cmp_by_dp: bool = True) -> bool:
for table_data in other:
if self.equals(table_data, cmp_by_dp=cmp_by_dp):
return True
return False
def validate_rows(self) -> None:
"""
:raises ValueError:
"""
invalid_row_idx_list = []
for row_idx, row in enumerate(self.rows):
if isinstance(row, (list, tuple)) and len(self.headers) != len(row):
invalid_row_idx_list.append(row_idx)
if isinstance(row, dict):
if not all([header in row for header in self.headers]):
invalid_row_idx_list.append(row_idx)
if not invalid_row_idx_list:
return
for invalid_row_idx in invalid_row_idx_list:
logger.debug(f"invalid row (line={invalid_row_idx}): {self.rows[invalid_row_idx]}")
raise ValueError(
"table header length and row length are mismatch:\n"
+ f" header(len={len(self.headers)}): {self.headers}\n"
+ " # of miss match rows: {} ouf of {}\n".format(
len(invalid_row_idx_list), self.num_rows
)
)
def as_dict(self, default_key: str = "table") -> dict[str, list["OrderedDict[str, Any]"]]:
"""
Args:
default_key:
Key of a returning dictionary when the ``table_name`` is empty.
Returns:
dict: Table data as a |dict| instance.
Sample Code:
.. code:: python
from tabledata import TableData
TableData(
"sample",
["a", "b"],
[[1, 2], [3.3, 4.4]]
).as_dict()
Output:
.. code:: json
{'sample': [OrderedDict([('a', 1), ('b', 2)]), OrderedDict([('a', 3.3), ('b', 4.4)])]}
""" # noqa
dict_body = []
for row in self.value_matrix:
if not row:
continue
values = [
(header, value) for header, value in zip(self.headers, row) if value is not None
]
if not values:
continue
dict_body.append(OrderedDict(values))
table_name = self.table_name
if not table_name:
table_name = default_key
return {table_name: dict_body}
def as_tuple(self) -> Iterator[tuple]:
"""
:return: Rows of the tuple.
:rtype: list of |namedtuple|
:Sample Code:
.. code:: python
from tabledata import TableData
records = TableData(
"sample",
["a", "b"],
[[1, 2], [3.3, 4.4]]
).as_tuple()
for record in records:
print(record)
:Output:
.. code-block:: none
Row(a=1, b=2)
Row(a=Decimal('3.3'), b=Decimal('4.4'))
"""
Row = namedtuple("Row", self.headers) # type: ignore
for value_dp_list in self.value_dp_matrix:
if typepy.is_empty_sequence(value_dp_list):
continue
row = Row(*(value_dp.data for value_dp in value_dp_list))
yield row
def as_dataframe(self) -> "pandas.DataFrame":
"""
:return: Table data as a ``pandas.DataFrame`` instance.
:rtype: pandas.DataFrame
:Sample Code:
.. code-block:: python
from tabledata import TableData
TableData(
"sample",
["a", "b"],
[[1, 2], [3.3, 4.4]]
).as_dataframe()
:Output:
.. code-block:: none
a b
0 1 2
1 3.3 4.4
:Dependency Packages:
- `pandas <https://pandas.pydata.org/>`__
"""
try:
from pandas import DataFrame
except ImportError:
raise RuntimeError("required 'pandas' package to execute as_dataframe method")
dataframe = DataFrame(self.value_matrix)
if not self.is_empty_header():
dataframe.columns = self.headers
return dataframe
def transpose(self) -> "TableData":
return TableData(
self.table_name,
self.headers,
[row for row in zip(*self.rows)],
max_workers=self.max_workers,
)
def filter_column(
self,
patterns: Optional[str] = None,
is_invert_match: bool = False,
is_re_match: bool = False,
pattern_match: PatternMatch = PatternMatch.OR,
) -> "TableData":
logger.debug(
"filter_column: patterns={}, is_invert_match={}, "
"is_re_match={}, pattern_match={}".format(
patterns, is_invert_match, is_re_match, pattern_match
)
)
if not patterns:
return self
match_header_list = []
match_column_matrix = []
if pattern_match == PatternMatch.OR:
match_method = any
elif pattern_match == PatternMatch.AND:
match_method = all
else:
raise ValueError(f"unknown matching: {pattern_match}")
for header, column in zip(self.headers, zip(*self.rows)):
is_match_list = []
for pattern in patterns:
is_match = self.__is_match(header, pattern, is_re_match)
is_match_list.append(
any([is_match and not is_invert_match, not is_match and is_invert_match])
)
if match_method(is_match_list):
match_header_list.append(header)
match_column_matrix.append(column)
logger.debug(
"filter_column: table={}, match_header_list={}".format(
self.table_name, match_header_list
)
)
return TableData(
self.table_name,
match_header_list,
list(zip(*match_column_matrix)),
max_workers=self.max_workers,
)
@staticmethod
def from_dataframe(
dataframe: "pandas.DataFrame",
table_name: str = "",
type_hints: Optional[Sequence[TypeHint]] = None,
max_workers: Optional[int] = None,
) -> "TableData":
"""
Initialize TableData instance from a pandas.DataFrame instance.
:param pandas.DataFrame dataframe:
:param str table_name: Table name to create.
"""
return TableData(
table_name,
list(dataframe.columns.values),
dataframe.values.tolist(),
type_hints=type_hints,
max_workers=max_workers,
)
@staticmethod
def __is_match(header: str, pattern: str, is_re_match: bool) -> bool:
if is_re_match:
return re.search(pattern, header) is not None
return header == pattern

View File

@ -0,0 +1,4 @@
from ._logger import logger, set_logger # type: ignore
__all__ = ("logger", "set_logger")

View File

@ -0,0 +1,40 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import warnings
from typing import Final
import dataproperty
from ._null_logger import NullLogger # type: ignore
MODULE_NAME: Final = "tabledata"
try:
from loguru import logger
logger.disable(MODULE_NAME)
except ImportError:
logger = NullLogger()
def set_logger(is_enable: bool, propagation_depth: int = 1) -> None:
if is_enable:
logger.enable(MODULE_NAME)
else:
logger.disable(MODULE_NAME)
if propagation_depth <= 0:
return
dataproperty.set_logger(is_enable, propagation_depth - 1)
def set_log_level(log_level): # type: ignore
warnings.warn(
"'set_log_level' method is deprecated and will be removed in the future. ",
DeprecationWarning,
)
return

View File

@ -0,0 +1,44 @@
# type: ignore
class NullLogger:
level_name = None
def remove(self, handler_id=None): # pragma: no cover
pass
def add(self, sink, **kwargs): # pragma: no cover
pass
def disable(self, name): # pragma: no cover
pass
def enable(self, name): # pragma: no cover
pass
def critical(self, __message, *args, **kwargs): # pragma: no cover
pass
def debug(self, __message, *args, **kwargs): # pragma: no cover
pass
def error(self, __message, *args, **kwargs): # pragma: no cover
pass
def exception(self, __message, *args, **kwargs): # pragma: no cover
pass
def info(self, __message, *args, **kwargs): # pragma: no cover
pass
def log(self, __level, __message, *args, **kwargs): # pragma: no cover
pass
def success(self, __message, *args, **kwargs): # pragma: no cover
pass
def trace(self, __message, *args, **kwargs): # pragma: no cover
pass
def warning(self, __message, *args, **kwargs): # pragma: no cover
pass

View File

@ -0,0 +1,27 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
class NameValidationError(ValueError):
"""
Exception raised when a name is invalid.
"""
class InvalidTableNameError(NameValidationError):
"""
Exception raised when a table name is invalid.
"""
class InvalidHeaderNameError(NameValidationError):
"""
Exception raised when a table header name is invalid.
"""
class DataError(ValueError):
"""
Exception raised when data is invalid as tabular data.
"""

View File

@ -0,0 +1,207 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import abc
import warnings
from collections.abc import Sequence
import typepy
from dataproperty.typing import TypeHint
from ._core import TableData
from ._logger import logger # type: ignore
from .error import InvalidHeaderNameError, InvalidTableNameError
class TableDataNormalizerInterface(metaclass=abc.ABCMeta):
"""
Interface class to validate and normalize data of |TableData|.
"""
@abc.abstractmethod
def validate(self) -> None: # pragma: no cover
pass
@abc.abstractmethod
def normalize(self) -> TableData: # pragma: no cover
pass
class AbstractTableDataNormalizer(TableDataNormalizerInterface):
@property
def _type_hints(self) -> list[TypeHint]:
return self._tabledata.dp_extractor.column_type_hints
def __init__(self, tabledata: TableData) -> None:
self._tabledata = tabledata
def validate(self) -> None:
if not self._tabledata.table_name:
raise ValueError("table_name must not be empty")
self._validate_table_name(self._tabledata.table_name)
self._validate_headers()
def sanitize(self): # type: ignore
warnings.warn(
"'sanitize' method is deprecated and will be removed in the future."
" use 'normalize' method instead.",
DeprecationWarning,
)
return self.normalize()
def normalize(self) -> TableData:
"""
:return: Sanitized table data.
:rtype: tabledata.TableData
"""
logger.debug(f"normalize: {type(self).__name__}")
normalize_headers = self._normalize_headers()
return TableData(
self.__normalize_table_name(),
normalize_headers,
self._normalize_rows(normalize_headers),
dp_extractor=self._tabledata.dp_extractor,
type_hints=self._type_hints,
max_workers=self._tabledata.max_workers,
)
@abc.abstractmethod
def _preprocess_table_name(self) -> str:
"""
This method is always called before table name validation.
You must return preprocessed table name.
"""
@abc.abstractmethod
def _validate_table_name(self, table_name: str) -> None:
"""
Must raise :py:class:`~.InvalidTableNameError`
when you consider the table name invalid.
:param str header: Table name to validate.
:raises tabledata.InvalidTableNameError:
If the table name is invalid.
|raises_validate_table_name|
"""
@abc.abstractmethod
def _normalize_table_name(self, table_name: str) -> str:
"""
Must return a valid table name.
The table name must be considered to be a valid name by
:py:meth:`~._validate_table_name` method.
This method called when :py:meth:`~._validate_table_name` method raise
:py:class:`~.InvalidTableNameError`.
:param str table_name: Table name to normalize.
:return: Sanitized table name.
:rtype: str
"""
@abc.abstractmethod
def _preprocess_header(self, col_idx: int, header: str) -> str:
"""
This method is always called before a header validation.
You must return preprocessed header.
"""
@abc.abstractmethod
def _validate_header(self, header: str) -> None:
"""
No operation.
This method called for each table header. Override this method
in a subclass if you want to detect invalid table header elements.
Raise :py:class:`~.InvalidHeaderNameError` if an invalid
header element found.
:param str header: Table header name.
:raises tabledata.InvalidHeaderNameError:
If the ``header`` is invalid.
"""
@abc.abstractmethod
def _normalize_header(self, header: str) -> str:
"""
Must return a valid header name.
This method called when :py:meth:`~._validate_header` method raise
:py:class:`~.InvalidHeaderNameError`.
Override this method in subclass if you want to rename invalid
table header element.
:param str header: Header name to normalize.
:return: Renamed header name.
:rtype: str
"""
def _normalize_rows(self, normalize_headers: Sequence[str]) -> list:
return list(self._tabledata.rows)
def _validate_headers(self) -> None:
for header in self._tabledata.headers:
self._validate_header(header)
def __normalize_table_name(self) -> str:
preprocessed_table_name = self._preprocess_table_name()
try:
self._validate_table_name(preprocessed_table_name)
new_table_name = preprocessed_table_name
except InvalidTableNameError:
new_table_name = self._normalize_table_name(preprocessed_table_name)
self._validate_table_name(new_table_name)
return new_table_name
def _normalize_headers(self) -> list[str]:
new_header_list = []
for col_idx, header in enumerate(self._tabledata.headers):
header = self._preprocess_header(col_idx, header)
try:
self._validate_header(header)
new_header = header
except InvalidHeaderNameError:
new_header = self._normalize_header(header)
self._validate_header(new_header)
new_header_list.append(new_header)
return new_header_list
class TableDataNormalizer(AbstractTableDataNormalizer):
def _preprocess_table_name(self) -> str:
if not self._tabledata.table_name:
return ""
return self._tabledata.table_name
def _validate_table_name(self, table_name: str) -> None:
try:
typepy.String(table_name).validate()
except TypeError as e:
raise InvalidTableNameError(e)
def _normalize_table_name(self, table_name: str) -> str:
return str(typepy.String(table_name).force_convert())
def _preprocess_header(self, col_idx: int, header: str) -> str:
return header
def _validate_header(self, header: str) -> None:
try:
typepy.String(header).validate()
except TypeError as e:
raise InvalidHeaderNameError(e)
def _normalize_header(self, header: str) -> str:
return str(typepy.String(header).force_convert())