Deploy site

This commit is contained in:
Gitea Actions
2025-06-10 03:00:57 +02:00
commit 70bff17031
2329 changed files with 367195 additions and 0 deletions

View File

@ -0,0 +1,45 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
from .__version__ import __author__, __copyright__, __email__, __license__, __version__
from ._align import Align
from ._align_getter import align_getter
from ._column import ColumnDataProperty
from ._common import MAX_STRICT_LEVEL_MAP, MIN_STRICT_LEVEL_MAP, NOT_QUOTING_FLAGS, DefaultValue
from ._container import MinMaxContainer
from ._dataproperty import DataProperty
from ._extractor import DataPropertyExtractor, DataPropertyMatrix, MatrixFormatting
from ._formatter import Format
from ._function import calc_ascii_char_width, get_integer_digit, get_number_of_digit
from ._line_break import LineBreakHandling
from ._preprocessor import Preprocessor
from .logger import set_logger
__all__ = (
"Align",
"align_getter",
"ColumnDataProperty",
"DataProperty",
"DataPropertyExtractor",
"DataPropertyMatrix",
"Format",
"LineBreakHandling",
"MatrixFormatting",
"MinMaxContainer",
"Preprocessor",
"calc_ascii_char_width",
"get_integer_digit",
"get_number_of_digit",
"MAX_STRICT_LEVEL_MAP",
"MIN_STRICT_LEVEL_MAP",
"NOT_QUOTING_FLAGS",
"DefaultValue",
"set_logger",
"__author__",
"__copyright__",
"__email__",
"__license__",
"__version__",
)

View File

@ -0,0 +1,9 @@
from typing import Final
__author__: Final = "Tsuyoshi Hombashi"
__copyright__: Final = f"Copyright 2016-2024, {__author__}"
__license__: Final = "MIT License"
__version__ = "1.1.0"
__maintainer__: Final = __author__
__email__: Final = "tsuyoshi.hombashi@gmail.com"

View File

@ -0,0 +1,25 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import enum
@enum.unique
class Align(enum.Enum):
AUTO = (1 << 0, "auto")
LEFT = (1 << 1, "left")
RIGHT = (1 << 2, "right")
CENTER = (1 << 3, "center")
@property
def align_code(self) -> int:
return self.__align_code
@property
def align_string(self) -> str:
return self.__align_string
def __init__(self, code: int, string: str) -> None:
self.__align_code = code
self.__align_string = string

View File

@ -0,0 +1,33 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
from typing import Dict
from typepy import Typecode
from ._align import Align
class AlignGetter:
@property
def typecode_align_table(self):
raise NotImplementedError()
@typecode_align_table.setter
def typecode_align_table(self, x: Dict[Typecode, Align]) -> None:
self.__typecode_align_table = x
def get_align_from_typecode(self, typecode: Typecode) -> Align:
return self.__typecode_align_table.get(typecode, self.default_align)
def __init__(self) -> None:
self.typecode_align_table = {
Typecode.STRING: Align.LEFT,
Typecode.INTEGER: Align.RIGHT,
Typecode.REAL_NUMBER: Align.RIGHT,
}
self.default_align = Align.LEFT
align_getter = AlignGetter()

View File

@ -0,0 +1,98 @@
from typing import Final, Optional
from typepy import (
Bool,
DateTime,
Dictionary,
Infinity,
Integer,
IpAddress,
List,
Nan,
NoneType,
NullString,
RealNumber,
String,
Typecode,
)
from typepy.type import AbstractType
from ._formatter import Formatter
from ._interface import DataPeropertyInterface
class DataPeropertyBase(DataPeropertyInterface):
__slots__ = (
"_datetime_format_str",
"_decimal_places",
"_east_asian_ambiguous_width",
"_formatter",
"_typecode",
"__format_str",
)
__TYPE_CLASS_TABLE: Final[dict[Typecode, type[AbstractType]]] = {
Typecode.BOOL: Bool,
Typecode.DATETIME: DateTime,
Typecode.DICTIONARY: Dictionary,
Typecode.INTEGER: Integer,
Typecode.INFINITY: Infinity,
Typecode.IP_ADDRESS: IpAddress,
Typecode.LIST: List,
Typecode.NAN: Nan,
Typecode.NONE: NoneType,
Typecode.NULL_STRING: NullString,
Typecode.REAL_NUMBER: RealNumber,
Typecode.STRING: String,
}
@property
def type_class(self) -> type[AbstractType]:
return self.__TYPE_CLASS_TABLE[self.typecode]
@property
def typecode(self) -> Typecode:
"""
``typepy.Typecode`` that corresponds to the type of the ``data``.
:return:
One of the Enum value that are defined ``typepy.Typecode``.
:rtype: typepy.Typecode
"""
assert self._typecode
return self._typecode
@property
def typename(self) -> str:
return self.typecode.name
def __init__(
self,
format_flags: Optional[int],
is_formatting_float: bool,
datetime_format_str: str,
east_asian_ambiguous_width: int,
) -> None:
self._decimal_places: Optional[int] = None
self._east_asian_ambiguous_width = east_asian_ambiguous_width
self._typecode: Optional[Typecode] = None
self._datetime_format_str = datetime_format_str
self.__format_str = ""
self._formatter = Formatter(
format_flags=format_flags,
datetime_format_str=self._datetime_format_str,
is_formatting_float=is_formatting_float,
)
@property
def format_str(self) -> str:
if self.__format_str:
return self.__format_str
self.__format_str = self._formatter.make_format_str(self.typecode, self.decimal_places)
return self.__format_str

View File

@ -0,0 +1,352 @@
from typing import Any, Optional
from mbstrdecoder import MultiByteStrDecoder
from typepy import Integer, StrictLevel, Typecode, TypeConversionError
from ._align import Align
from ._align_getter import align_getter
from ._base import DataPeropertyBase
from ._common import DefaultValue
from ._container import ListContainer, MinMaxContainer
from ._dataproperty import DataProperty
from ._function import calc_ascii_char_width
from .typing import FloatType
class ColumnDataProperty(DataPeropertyBase):
__slots__ = (
"__header_ascii_char_width",
"__body_ascii_char_width",
"__column_index",
"__dp_list",
"__float_type",
"__format_map",
"__is_calculate",
"__max_precision",
"__minmax_integer_digits",
"__minmax_decimal_places",
"__minmax_additional_format_len",
"__typecode_bitmap",
)
@property
def align(self) -> Align:
return align_getter.get_align_from_typecode(self.typecode)
@property
def bit_length(self) -> Optional[int]:
if self.typecode != Typecode.INTEGER:
return None
bit_length = 0
for value_dp in self.__dp_list:
try:
bit_length = max(bit_length, int.bit_length(value_dp.data))
except TypeError:
pass
return bit_length
@property
def column_index(self) -> int:
return self.__column_index
@property
def decimal_places(self) -> Optional[int]:
return self._decimal_places
@property
def ascii_char_width(self) -> int:
return max(self.__header_ascii_char_width, self.__body_ascii_char_width)
@property
def minmax_integer_digits(self) -> MinMaxContainer:
return self.__minmax_integer_digits
@property
def minmax_decimal_places(self) -> ListContainer:
return self.__minmax_decimal_places
@property
def minmax_additional_format_len(self) -> MinMaxContainer:
return self.__minmax_additional_format_len
def __init__(
self,
column_index: int,
float_type: Optional[FloatType],
min_width: int = 0,
format_flags: Optional[int] = None,
is_formatting_float: bool = True,
datetime_format_str: str = DefaultValue.DATETIME_FORMAT,
east_asian_ambiguous_width: int = 1,
max_precision: int = DefaultValue.MAX_PRECISION,
) -> None:
super().__init__(
format_flags=format_flags,
is_formatting_float=is_formatting_float,
datetime_format_str=datetime_format_str,
east_asian_ambiguous_width=east_asian_ambiguous_width,
)
self.__header_ascii_char_width = 0
self.__body_ascii_char_width = min_width
self.__column_index = column_index
self.__float_type = float_type
self.__is_calculate = True
self.__dp_list: list[DataProperty] = []
self.__minmax_integer_digits = MinMaxContainer()
self.__minmax_decimal_places = ListContainer()
self.__minmax_additional_format_len = MinMaxContainer()
self.__max_precision = max_precision
self.__typecode_bitmap = Typecode.NONE.value
self.__calc_typecode_from_bitmap()
self.__format_map: dict[Typecode, str] = self._formatter.make_format_map(
decimal_places=self._decimal_places
)
def __repr__(self) -> str:
element_list = []
if self.column_index is not None:
element_list.append(f"column={self.column_index}")
element_list.extend(
[
f"type={self.typename}",
f"align={self.align.align_string}",
f"ascii_width={self.ascii_char_width}",
]
)
if Integer(self.bit_length).is_type():
element_list.append(f"bit_len={self.bit_length}")
if self.minmax_integer_digits.has_value():
if self.minmax_integer_digits.is_same_value():
value = f"int_digits={self.minmax_integer_digits.min_value}"
else:
value = f"int_digits=({self.minmax_integer_digits})"
element_list.append(value)
if self.minmax_decimal_places.has_value():
if self.minmax_decimal_places.is_same_value():
value = f"decimal_places={self.minmax_decimal_places.min_value}"
else:
value = f"decimal_places=({self.minmax_decimal_places})"
element_list.append(value)
if not self.minmax_additional_format_len.is_zero():
if self.minmax_additional_format_len.is_same_value():
value = f"extra_len={self.minmax_additional_format_len.min_value}"
else:
value = f"extra_len=({self.minmax_additional_format_len})"
element_list.append(value)
return ", ".join(element_list)
def dp_to_str(self, value_dp: DataProperty) -> str:
if value_dp.typecode == Typecode.STRING:
return str(value_dp.data)
try:
value = self.__preprocess_value_before_tostring(value_dp)
except TypeConversionError:
return self.__format_map.get(value_dp.typecode, "{:s}").format(value_dp.data)
to_string_format_str = self.__get_tostring_format(value_dp)
try:
return to_string_format_str.format(value)
except (ValueError, TypeError):
pass
try:
return MultiByteStrDecoder(value).unicode_str
except ValueError:
pass
return str(value)
def extend_width(self, ascii_char_width: int) -> None:
self.extend_header_width(ascii_char_width)
self.extend_body_width(ascii_char_width)
def extend_header_width(self, ascii_char_width: int) -> None:
self.__header_ascii_char_width += ascii_char_width
def extend_body_width(self, ascii_char_width: int) -> None:
self.__body_ascii_char_width += ascii_char_width
def update_header(self, header_db: DataProperty) -> None:
self.__header_ascii_char_width = header_db.ascii_char_width
def update_body(self, value_dp: DataProperty) -> None:
if value_dp.is_include_ansi_escape:
assert value_dp.no_ansi_escape_dp
value_dp = value_dp.no_ansi_escape_dp
self.__typecode_bitmap |= value_dp.typecode.value
self.__calc_typecode_from_bitmap()
if value_dp.typecode in (Typecode.REAL_NUMBER, Typecode.INTEGER):
self.__minmax_integer_digits.update(value_dp.integer_digits)
self.__minmax_decimal_places.update(value_dp.decimal_places)
self.__update_decimal_places()
self.__minmax_additional_format_len.update(value_dp.additional_format_len)
self.__dp_list.append(value_dp)
self.__update_ascii_char_width()
def merge(self, column_dp: "ColumnDataProperty") -> None:
self.__typecode_bitmap |= column_dp.typecode.value
self.__calc_typecode_from_bitmap()
self.__minmax_integer_digits.merge(column_dp.minmax_integer_digits)
self.__minmax_decimal_places.merge(column_dp.minmax_decimal_places)
self.__update_decimal_places()
self.__minmax_additional_format_len.merge(column_dp.minmax_additional_format_len)
self.__body_ascii_char_width = max(self.__body_ascii_char_width, column_dp.ascii_char_width)
self.__update_ascii_char_width()
def begin_update(self) -> None:
self.__is_calculate = False
def end_update(self) -> None:
self.__is_calculate = True
self.__calc_typecode_from_bitmap()
self.__update_decimal_places()
self.__update_ascii_char_width()
def __is_not_single_typecode(self, typecode_bitmap: int) -> bool:
return bool(
self.__typecode_bitmap & typecode_bitmap and self.__typecode_bitmap & ~typecode_bitmap
)
def __is_float_typecode(self) -> bool:
FLOAT_TYPECODE_BMP = (
Typecode.REAL_NUMBER.value | Typecode.INFINITY.value | Typecode.NAN.value
)
NUMBER_TYPECODE_BMP = FLOAT_TYPECODE_BMP | Typecode.INTEGER.value
if self.__is_not_single_typecode(NUMBER_TYPECODE_BMP | Typecode.NULL_STRING.value):
return False
if (
bin(self.__typecode_bitmap & (FLOAT_TYPECODE_BMP | Typecode.NULL_STRING.value)).count(
"1"
)
>= 2
):
return True
if bin(self.__typecode_bitmap & NUMBER_TYPECODE_BMP).count("1") >= 2:
return True
return False
def __calc_body_ascii_char_width(self) -> int:
width_list = [self.__body_ascii_char_width]
for value_dp in self.__dp_list:
if value_dp.is_include_ansi_escape:
assert value_dp.no_ansi_escape_dp
value_dp = value_dp.no_ansi_escape_dp
width_list.append(
calc_ascii_char_width(self.dp_to_str(value_dp), self._east_asian_ambiguous_width)
)
return max(width_list)
def __calc_decimal_places(self) -> Optional[int]:
if self.minmax_decimal_places.max_value is None:
return None
return min(self.__max_precision, int(self.minmax_decimal_places.max_value))
def __get_tostring_format(self, value_dp: DataProperty) -> str:
if self.typecode == Typecode.STRING:
return self.__format_map.get(value_dp.typecode, "{:s}")
return self.__format_map.get(self.typecode, "{:s}")
def __get_typecode_from_bitmap(self) -> Typecode:
if self.__is_float_typecode():
return Typecode.REAL_NUMBER
if any(
[
self.__is_not_single_typecode(Typecode.BOOL.value),
self.__is_not_single_typecode(Typecode.DATETIME.value),
]
):
return Typecode.STRING
typecode_list = [
Typecode.STRING,
Typecode.REAL_NUMBER,
Typecode.INTEGER,
Typecode.DATETIME,
Typecode.DICTIONARY,
Typecode.IP_ADDRESS,
Typecode.LIST,
Typecode.BOOL,
Typecode.INFINITY,
Typecode.NAN,
Typecode.NULL_STRING,
]
for typecode in typecode_list:
if self.__typecode_bitmap & typecode.value:
return typecode
if self.__typecode_bitmap == Typecode.NONE.value:
return Typecode.NONE
return Typecode.STRING
def __update_ascii_char_width(self) -> None:
if not self.__is_calculate:
return
self.__body_ascii_char_width = self.__calc_body_ascii_char_width()
def __update_decimal_places(self) -> None:
if not self.__is_calculate:
return
self._decimal_places = self.__calc_decimal_places()
self.__format_map = self._formatter.make_format_map(decimal_places=self._decimal_places)
def __calc_typecode_from_bitmap(self) -> None:
if not self.__is_calculate:
return
self._typecode = self.__get_typecode_from_bitmap()
def __preprocess_value_before_tostring(self, value_dp: DataProperty) -> Any:
if self.typecode == value_dp.typecode or self.typecode in [
Typecode.STRING,
Typecode.BOOL,
Typecode.DATETIME,
]:
return value_dp.data
return self.type_class(
value_dp.data,
strict_level=StrictLevel.MIN,
float_type=self.__float_type,
strip_ansi_escape=False,
).convert()

View File

@ -0,0 +1,74 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import copy
import itertools
from datetime import datetime
from decimal import Decimal
from typing import Final
from typepy import StrictLevel, Typecode
from .typing import StrictLevelMap, TypeValueMap
NOT_QUOTING_FLAGS: Final = {
Typecode.BOOL: False,
Typecode.DATETIME: False,
Typecode.DICTIONARY: False,
Typecode.INFINITY: False,
Typecode.INTEGER: False,
Typecode.IP_ADDRESS: False,
Typecode.LIST: False,
Typecode.NAN: False,
Typecode.NULL_STRING: False,
Typecode.NONE: False,
Typecode.REAL_NUMBER: False,
Typecode.STRING: False,
}
MAX_STRICT_LEVEL_MAP: Final[StrictLevelMap] = dict(
itertools.product(list(Typecode), [StrictLevel.MAX])
)
MIN_STRICT_LEVEL_MAP: Final[StrictLevelMap] = dict(
itertools.product(list(Typecode), [StrictLevel.MIN])
)
class DefaultValue:
DATETIME_FORMAT: Final = "%Y-%m-%dT%H:%M:%S%z"
FLOAT_TYPE: Final = Decimal
INF_VALUE: Final = FLOAT_TYPE("inf")
NAN_VALUE: Final = FLOAT_TYPE("nan")
QUOTING_FLAGS: Final = copy.deepcopy(NOT_QUOTING_FLAGS)
STRICT_LEVEL_MAP: Final[StrictLevelMap] = {
"default": StrictLevel.MAX,
Typecode.BOOL: StrictLevel.MAX,
Typecode.DATETIME: StrictLevel.MAX,
Typecode.DICTIONARY: StrictLevel.MAX,
Typecode.REAL_NUMBER: 1,
Typecode.INFINITY: StrictLevel.MIN,
Typecode.INTEGER: 1,
Typecode.IP_ADDRESS: StrictLevel.MAX,
Typecode.LIST: StrictLevel.MAX,
Typecode.NAN: StrictLevel.MIN,
Typecode.NONE: StrictLevel.MAX,
Typecode.NULL_STRING: StrictLevel.MIN,
Typecode.STRING: StrictLevel.MIN,
}
TYPE_VALUE_MAP: Final[TypeValueMap] = {
Typecode.NONE: None,
Typecode.INFINITY: INF_VALUE,
Typecode.NAN: NAN_VALUE,
}
MAX_WORKERS: Final = 1
MAX_PRECISION: Final = 100
def default_datetime_formatter(value: datetime) -> str:
return value.strftime(DefaultValue.DATETIME_FORMAT)

View File

@ -0,0 +1,196 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import abc
from collections.abc import Sequence
from decimal import Decimal
from typing import Any, Final, Optional, Union
from typepy import RealNumber
T = Union[int, float, Decimal]
NAN: Final = Decimal("nan")
class AbstractContainer(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def min_value(self) -> Optional[Decimal]: # pragma: no cover
pass
@property
@abc.abstractmethod
def max_value(self) -> Optional[Decimal]: # pragma: no cover
pass
@abc.abstractmethod
def mean(self) -> Decimal: # pragma: no cover
pass
@abc.abstractmethod
def update(self, value: Optional[T]) -> None: # pragma: no cover
pass
@abc.abstractmethod
def merge(self, value: "AbstractContainer") -> None: # pragma: no cover
pass
def __repr__(self) -> str:
if not self.has_value():
return "None"
return ", ".join([f"min={self.min_value}", f"max={self.max_value}"])
def has_value(self) -> bool:
return self.min_value is not None and self.max_value is not None
def is_same_value(self) -> bool:
return self.has_value() and self.min_value == self.max_value
def is_zero(self) -> bool:
return self.has_value() and self.min_value == 0 and self.max_value == 0
class ListContainer(AbstractContainer):
__slots__ = ("__value_list",)
@property
def min_value(self) -> Optional[Decimal]:
try:
return min(self.__value_list)
except ValueError:
return None
@property
def max_value(self) -> Optional[Decimal]:
try:
return max(self.__value_list)
except ValueError:
return None
@property
def value_list(self) -> list[Decimal]:
return self.__value_list
def __init__(self, value_list: Optional[list[Decimal]] = None) -> None:
if value_list is None:
self.__value_list: list[Decimal] = []
return
for value in value_list:
self.update(value)
def mean(self) -> Decimal:
try:
return Decimal(sum(self.__value_list) / len(self.__value_list))
except ZeroDivisionError:
return NAN
def update(self, value: Union[int, float, Decimal, None]) -> None:
if value is None:
return
store_value = RealNumber(value).try_convert()
if store_value is None:
return
self.__value_list.append(store_value)
def merge(self, value: "AbstractContainer") -> None:
if not isinstance(value, ListContainer):
return
for v in value.value_list:
self.update(v)
class MinMaxContainer(AbstractContainer):
__slots__ = ("__min_value", "__max_value")
def __init__(self, value_list: Optional[Sequence[Decimal]] = None) -> None:
self.__min_value: Optional[Decimal] = None
self.__max_value: Optional[Decimal] = None
if value_list is None:
return
for value in value_list:
self.update(value)
@property
def min_value(self) -> Optional[Decimal]:
return self.__min_value
@property
def max_value(self) -> Optional[Decimal]:
return self.__max_value
def __eq__(self, other: Any) -> bool:
if not isinstance(other, MinMaxContainer):
return False
return all([self.min_value == other.min_value, self.max_value == other.max_value])
def __ne__(self, other: Any) -> bool:
if not isinstance(other, MinMaxContainer):
return True
return any([self.min_value != other.min_value, self.max_value != other.max_value])
def __contains__(self, x: T) -> bool:
if self.min_value is None:
return False
if self.max_value is None:
return False
return self.min_value <= x <= self.max_value
def diff(self) -> Decimal:
if self.min_value is None:
return NAN
if self.max_value is None:
return NAN
try:
return self.max_value - self.min_value
except TypeError:
return NAN
def mean(self) -> Decimal:
if self.min_value is None:
return NAN
if self.max_value is None:
return NAN
try:
return (self.max_value + self.min_value) * Decimal("0.5")
except TypeError:
return NAN
def update(self, value: Optional[T]) -> None:
if value is None:
return
decimal_value = Decimal(value)
if self.__min_value is None:
self.__min_value = decimal_value
else:
self.__min_value = min(self.__min_value, decimal_value)
if self.__max_value is None:
self.__max_value = decimal_value
else:
self.__max_value = max(self.__max_value, decimal_value)
def merge(self, value: "AbstractContainer") -> None:
if not isinstance(value, MinMaxContainer):
return
self.update(value.min_value)
self.update(value.max_value)

View File

@ -0,0 +1,90 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import re
from typing import Any, Final, Optional
from typepy import Typecode, TypeConversionError
from ._common import MAX_STRICT_LEVEL_MAP, DefaultValue
from ._dataproperty import DataProperty
from ._preprocessor import Preprocessor
from .typing import DateTimeFormatter, FloatType, StrictLevelMap, TypeValueMap
class DataPropertyConverter:
__RE_QUOTE_LINE: Final = re.compile(r"^\s*[\"'].*[\"']\s*$") # noqa: w605
__RE_QUOTE_CHAR: Final = re.compile("[\"']")
def __init__(
self,
preprocessor: Preprocessor,
datetime_format_str: str,
datetime_formatter: Optional[DateTimeFormatter] = None,
type_value_map: Optional[TypeValueMap] = None,
quoting_flags: Optional[dict[Typecode, bool]] = None,
float_type: Optional[FloatType] = None,
strict_level_map: Optional[StrictLevelMap] = None,
) -> None:
self.__preprocessor = preprocessor
self.__type_value_map: TypeValueMap = (
type_value_map if type_value_map else DefaultValue.TYPE_VALUE_MAP
)
self.__quoting_flags: dict[Typecode, bool] = (
quoting_flags if quoting_flags else DefaultValue.QUOTING_FLAGS
)
self.__datetime_formatter = datetime_formatter
self.__datetime_format_str = datetime_format_str
self.__float_type = float_type
self.__strict_level_map = strict_level_map
def convert(self, dp_value: DataProperty) -> DataProperty:
try:
return self.__create_dataproperty(self.__convert_value(dp_value))
except TypeConversionError:
pass
if not self.__quoting_flags.get(dp_value.typecode):
if self.__preprocessor.is_escape_html_tag:
return self.__create_dataproperty(dp_value.to_str())
return dp_value
return self.__create_dataproperty(self.__apply_quote(dp_value.typecode, dp_value.to_str()))
def __create_dataproperty(self, value: Any) -> DataProperty:
return DataProperty(
value,
preprocessor=self.__preprocessor,
float_type=self.__float_type,
datetime_format_str=self.__datetime_format_str,
strict_level_map=MAX_STRICT_LEVEL_MAP,
)
def __apply_quote(self, typecode: Typecode, data: Any) -> Any:
if not self.__quoting_flags.get(typecode):
return data
try:
if self.__RE_QUOTE_LINE.search(data):
return data
except TypeError:
return data
return '"{}"'.format(self.__RE_QUOTE_CHAR.sub('\\"', data.replace("\\", "\\\\")))
def __convert_value(self, dp_value: DataProperty) -> Any:
if dp_value.typecode in self.__type_value_map:
return self.__apply_quote(dp_value.typecode, self.__type_value_map[dp_value.typecode])
if dp_value.typecode == Typecode.DATETIME and self.__datetime_formatter:
try:
return self.__apply_quote(
dp_value.typecode, self.__datetime_formatter(dp_value.data)
)
except TypeError:
raise TypeConversionError
raise TypeConversionError("no need to convert")

View File

@ -0,0 +1,381 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
from decimal import Decimal
from typing import Any, Final, Optional, cast
import typepy
from mbstrdecoder import MultiByteStrDecoder
from typepy import (
Bool,
DateTime,
Dictionary,
Infinity,
Integer,
IpAddress,
Nan,
NoneType,
NullString,
RealNumber,
StrictLevel,
String,
Typecode,
TypeConversionError,
)
from typepy.type import AbstractType
from ._align import Align
from ._align_getter import align_getter
from ._base import DataPeropertyBase
from ._common import DefaultValue
from ._function import calc_ascii_char_width, get_number_of_digit
from ._preprocessor import Preprocessor
from .typing import FloatType, StrictLevelMap, TypeHint
class DataProperty(DataPeropertyBase):
__slots__ = (
"__data",
"__no_ansi_escape_data",
"__align",
"__integer_digits",
"__additional_format_len",
"__length",
"__ascii_char_width",
)
__type_class_list: Final[list[type[AbstractType]]] = [
NoneType,
Integer,
Infinity,
Nan,
IpAddress,
RealNumber,
Bool,
typepy.List,
Dictionary,
DateTime,
NullString,
String,
]
def __init__(
self,
data: Any,
preprocessor: Optional[Preprocessor] = None,
type_hint: TypeHint = None,
float_type: Optional[FloatType] = None,
format_flags: Optional[int] = None,
datetime_format_str: str = DefaultValue.DATETIME_FORMAT,
strict_level_map: Optional[StrictLevelMap] = None,
east_asian_ambiguous_width: int = 1,
) -> None:
super().__init__(
format_flags=format_flags,
is_formatting_float=True,
datetime_format_str=datetime_format_str,
east_asian_ambiguous_width=east_asian_ambiguous_width,
)
self.__additional_format_len: Optional[int] = None
self.__align: Optional[Align] = None
self.__ascii_char_width: Optional[int] = None
self.__integer_digits: Optional[int] = None
self.__length: Optional[int] = None
if preprocessor is None:
preprocessor = Preprocessor()
data, no_ansi_escape_data = preprocessor.preprocess(data)
self.__set_data(data, type_hint, float_type, strict_level_map)
if no_ansi_escape_data is None or len(data) == len(no_ansi_escape_data):
self.__no_ansi_escape_data: Optional[DataProperty] = None
else:
self.__no_ansi_escape_data = DataProperty(no_ansi_escape_data, float_type=float_type)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, DataProperty):
return False
if self.typecode != other.typecode:
return False
if self.typecode == Typecode.NAN:
return True
return self.data == other.data
def __ne__(self, other: Any) -> bool:
if not isinstance(other, DataProperty):
return True
if self.typecode != other.typecode:
return True
if self.typecode == Typecode.NAN:
return False
return self.data != other.data
def __repr__(self) -> str:
element_list = []
if self.typecode == Typecode.DATETIME:
element_list.append(f"data={str(self.data):s}")
else:
try:
element_list.append("data=" + self.to_str())
except UnicodeEncodeError:
element_list.append(f"data={MultiByteStrDecoder(self.data).unicode_str}")
element_list.extend(
[
f"type={self.typename:s}",
f"align={self.align.align_string}",
f"ascii_width={self.ascii_char_width:d}",
]
)
if Integer(self.length).is_type():
element_list.append(f"length={self.length}")
if Integer(self.integer_digits).is_type():
element_list.append(f"int_digits={self.integer_digits}")
if Integer(self.decimal_places).is_type():
element_list.append(f"decimal_places={self.decimal_places}")
if Integer(self.additional_format_len).is_type():
element_list.append(f"extra_len={self.additional_format_len}")
return ", ".join(element_list)
@property
def align(self) -> Align:
if not self.__align:
if self.is_include_ansi_escape:
assert self.no_ansi_escape_dp
self.__align = self.no_ansi_escape_dp.align
else:
self.__align = align_getter.get_align_from_typecode(self.typecode)
assert self.__align
return self.__align
@property
def decimal_places(self) -> Optional[int]:
"""
:return:
Decimal places if the ``data`` type either ``float`` or
``decimal.Decimal``. Returns ``0`` if the ``data`` type is ``int``.
Otherwise, returns ``float("nan")``.
:rtype: int
"""
if self._decimal_places is None:
self.__set_digit()
return self._decimal_places
@property
def data(self) -> Any:
"""
:return: Original data value.
:rtype: Original data type.
"""
return self.__data
@property
def is_include_ansi_escape(self) -> bool:
if self.no_ansi_escape_dp is None:
return False
return self.length != self.no_ansi_escape_dp.length
@property
def no_ansi_escape_dp(self) -> Optional["DataProperty"]:
return self.__no_ansi_escape_data
@property
def length(self) -> Optional[int]:
"""
:return: Length of the ``data``.
:rtype: int
"""
if self.__length is None:
self.__length = self.__get_length()
return self.__length
@property
def ascii_char_width(self) -> int:
if self.__ascii_char_width is None:
self.__ascii_char_width = self.__calc_ascii_char_width()
return self.__ascii_char_width
@property
def integer_digits(self) -> Optional[int]:
"""
:return:
Integer digits if the ``data`` type either
``int``/``float``/``decimal.Decimal``.
Otherwise, returns ``None``.
:rtype: int
"""
if self.__integer_digits is None:
self.__set_digit()
return self.__integer_digits
@property
def additional_format_len(self) -> int:
if self.__additional_format_len is None:
self.__additional_format_len = self.__get_additional_format_len()
return self.__additional_format_len
def get_padding_len(self, ascii_char_width: int) -> int:
if self.typecode in (Typecode.LIST, Typecode.DICTIONARY):
unicode_str_len = DataProperty(MultiByteStrDecoder(str(self.data)).unicode_str).length
assert unicode_str_len
return max(
ascii_char_width - (self.ascii_char_width - unicode_str_len),
0,
)
try:
return max(ascii_char_width - (self.ascii_char_width - cast(int, self.length)), 0)
except TypeError:
return ascii_char_width
def to_str(self) -> str:
return self.format_str.format(self.data)
def __get_additional_format_len(self) -> int:
if not RealNumber(self.data, strip_ansi_escape=False).is_type():
return 0
format_len = 0
if Decimal(self.data) < 0:
# for minus character
format_len += 1
return format_len
def __get_base_float_len(self) -> int:
assert self.integer_digits is not None
assert self.decimal_places is not None
if any([self.integer_digits < 0, self.decimal_places < 0]):
raise ValueError("integer digits and decimal places must be greater or equals to zero")
float_len = self.integer_digits + self.decimal_places
if self.decimal_places > 0:
# for dot
float_len += 1
return float_len
def __get_length(self) -> Optional[int]:
if self.typecode in (Typecode.DICTIONARY, Typecode.LIST, Typecode.STRING):
return len(self.data)
return None
def __calc_ascii_char_width(self) -> int:
if self.typecode == Typecode.INTEGER:
return cast(int, self.integer_digits) + self.additional_format_len
if self.typecode == Typecode.REAL_NUMBER:
return self.__get_base_float_len() + self.additional_format_len
if self.typecode == Typecode.DATETIME:
try:
return len(self.to_str())
except ValueError:
# reach to this line if the year <1900.
# the datetime strftime() methods require year >= 1900.
return len(str(self.data))
if self.is_include_ansi_escape:
assert self.no_ansi_escape_dp
return self.no_ansi_escape_dp.ascii_char_width
try:
unicode_str = MultiByteStrDecoder(self.data).unicode_str
except ValueError:
unicode_str = self.to_str()
return calc_ascii_char_width(unicode_str, self._east_asian_ambiguous_width)
def __set_data(
self,
data: Any,
type_hint: TypeHint,
float_type: Optional[FloatType],
strict_level_map: Optional[StrictLevelMap],
) -> None:
if float_type is None:
float_type = DefaultValue.FLOAT_TYPE
if strict_level_map is None:
strict_level_map = DefaultValue.STRICT_LEVEL_MAP
if type_hint:
type_obj = type_hint(
data, strict_level=StrictLevel.MIN, float_type=float_type, strip_ansi_escape=False
)
self._typecode = type_obj.typecode
self.__data = type_obj.try_convert()
if type_hint(
self.__data,
strict_level=StrictLevel.MAX,
float_type=float_type,
strip_ansi_escape=False,
).is_type():
return
for type_class in self.__type_class_list:
strict_level = strict_level_map.get(
type_class(None, 0).typecode, strict_level_map.get("default", StrictLevel.MAX)
)
if self.__try_convert_type(data, type_class, strict_level, float_type):
return
raise TypeConversionError(
f"failed to convert: data={data}, strict_level={strict_level_map}"
)
def __set_digit(self) -> None:
integer_digits, decimal_places = get_number_of_digit(self.__data)
self.__integer_digits = integer_digits
self._decimal_places = decimal_places
def __try_convert_type(
self,
data: Any,
type_class: type[AbstractType],
strict_level: int,
float_type: Optional[FloatType],
) -> bool:
type_obj = type_class(data, strict_level, float_type=float_type, strip_ansi_escape=False)
try:
self.__data = type_obj.convert()
except TypeConversionError:
return False
self._typecode = type_obj.typecode
return True

View File

@ -0,0 +1,817 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import copy
import enum
import sys
import typing
from collections import Counter
from collections.abc import Sequence
from decimal import Decimal
from typing import Any, Optional, Union, cast
import typepy
from typepy import (
Bool,
DateTime,
Dictionary,
Infinity,
Integer,
IpAddress,
Nan,
NoneType,
NullString,
RealNumber,
StrictLevel,
String,
Typecode,
is_empty_sequence,
)
from typepy.type import AbstractType
from ._column import ColumnDataProperty
from ._common import MIN_STRICT_LEVEL_MAP, DefaultValue
from ._converter import DataPropertyConverter
from ._dataproperty import DataProperty
from ._formatter import Format
from ._preprocessor import Preprocessor
from .logger import logger # type: ignore
from .typing import (
DateTimeFormatter,
StrictLevelMap,
TransFunc,
TypeHint,
TypeValueMap,
normalize_type_hint,
)
DataPropertyMatrix = list[list[DataProperty]]
@enum.unique
class MatrixFormatting(enum.Enum):
# raise exception if the matrix is not properly formatted
EXCEPTION = 1 << 1
# trim to the minimum size column
TRIM = 1 << 2
# Append None values to columns so that it is the same as the maximum
# column size.
FILL_NONE = 1 << 3
HEADER_ALIGNED = 1 << 4
class DataPropertyExtractor:
"""
.. py:attribute:: quoting_flags
Configurations to add double quote to for each items in a matrix,
where |Typecode| of table-value is |True| in the ``quote_flag_table``
mapping table. ``quote_flag_table`` should be a dictionary.
And is ``{ Typecode : bool }``. Defaults to:
.. code-block:: json
:caption: The default values
{
Typecode.BOOL: False,
Typecode.DATETIME: False,
Typecode.DICTIONARY: False,
Typecode.INFINITY: False,
Typecode.INTEGER: False,
Typecode.IP_ADDRESS: False,
Typecode.LIST: False,
Typecode.NAN: False,
Typecode.NULL_STRING: False,
Typecode.NONE: False,
Typecode.REAL_NUMBER: False,
Typecode.STRING: False,
}
"""
def __init__(self, max_precision: Optional[int] = None) -> None:
self.max_workers = DefaultValue.MAX_WORKERS
if max_precision is None:
self.__max_precision = DefaultValue.MAX_PRECISION
else:
self.__max_precision = max_precision
self.__headers: Sequence[str] = []
self.__default_type_hint: TypeHint = None
self.__col_type_hints: list[TypeHint] = []
self.__strip_str_header: Optional[str] = None
self.__is_formatting_float = True
self.__min_col_ascii_char_width = 0
self.__default_format_flags = Format.NONE
self.__format_flags_list: Sequence[int] = []
self.__float_type: Union[type[float], type[Decimal], None] = None
self.__datetime_format_str = DefaultValue.DATETIME_FORMAT
self.__strict_level_map = copy.deepcopy(
cast(dict[Union[Typecode, str], int], DefaultValue.STRICT_LEVEL_MAP)
)
self.__east_asian_ambiguous_width = 1
self.__preprocessor = Preprocessor()
self.__type_value_map: TypeValueMap = copy.deepcopy(DefaultValue.TYPE_VALUE_MAP)
self.__trans_func_list: list[TransFunc] = []
self.__quoting_flags = copy.deepcopy(DefaultValue.QUOTING_FLAGS)
self.__datetime_formatter: Optional[DateTimeFormatter] = None
self.__matrix_formatting = MatrixFormatting.TRIM
self.__dp_converter: DataPropertyConverter
self.__clear_cache()
def __clear_cache(self) -> None:
self.__update_dp_converter()
self.__dp_cache_zero = self.__to_dp_raw(0)
self.__dp_cache_one = self.__to_dp_raw(1)
self.__dp_cache_true = self.__to_dp_raw(True)
self.__dp_cache_false = self.__to_dp_raw(False)
self.__dp_cache_map = {None: self.__to_dp_raw(None), "": self.__to_dp_raw("")}
@property
def headers(self) -> Sequence[str]:
return self.__headers
@headers.setter
def headers(self, value: Sequence[str]) -> None:
if self.__headers == value:
return
self.__headers = value
self.__clear_cache()
@property
def default_type_hint(self) -> TypeHint:
return self.__default_type_hint
@default_type_hint.setter
def default_type_hint(self, value: TypeHint) -> None:
if self.__default_type_hint == value:
return
self.__default_type_hint = value
self.__clear_cache()
@property
def column_type_hints(self) -> list[TypeHint]:
return self.__col_type_hints
@column_type_hints.setter
def column_type_hints(self, value: Sequence[Union[str, TypeHint]]) -> None:
normalized_type_hints: list[TypeHint] = []
for type_hint in value:
type_hint = normalize_type_hint(type_hint)
if type_hint not in (
Bool,
DateTime,
Dictionary,
Infinity,
Integer,
IpAddress,
typepy.List,
Nan,
NoneType,
RealNumber,
String,
NullString,
None,
):
raise ValueError(f"invalid type hint: {type(type_hint)}")
normalized_type_hints.append(type_hint)
if self.__col_type_hints == normalized_type_hints:
return
self.__col_type_hints = normalized_type_hints
self.__clear_cache()
@property
def is_formatting_float(self) -> bool:
return self.__is_formatting_float
@is_formatting_float.setter
def is_formatting_float(self, value: bool) -> None:
self.__is_formatting_float = value
@property
def max_precision(self) -> int:
return self.__max_precision
@max_precision.setter
def max_precision(self, value: int) -> None:
if self.__max_precision == value:
return
self.__max_precision = value
self.__clear_cache()
@property
def preprocessor(self) -> Preprocessor:
return self.__preprocessor
@preprocessor.setter
def preprocessor(self, value: Preprocessor) -> None:
if self.preprocessor == value:
return
self.__preprocessor = value
self.__update_dp_converter()
@property
def strip_str_header(self) -> Optional[str]:
return self.__strip_str_header
@strip_str_header.setter
def strip_str_header(self, value: str) -> None:
if self.__strip_str_header == value:
return
self.__strip_str_header = value
self.__clear_cache()
@property
def min_column_width(self) -> int:
return self.__min_col_ascii_char_width
@min_column_width.setter
def min_column_width(self, value: int) -> None:
if self.__min_col_ascii_char_width == value:
return
self.__min_col_ascii_char_width = value
self.__clear_cache()
@property
def default_format_flags(self) -> int:
return self.__default_format_flags
@default_format_flags.setter
def default_format_flags(self, value: int) -> None:
if self.__default_format_flags == value:
return
self.__default_format_flags = value
self.__clear_cache()
@property
def format_flags_list(self) -> Sequence[int]:
return self.__format_flags_list
@format_flags_list.setter
def format_flags_list(self, value: Sequence[int]) -> None:
if self.__format_flags_list == value:
return
self.__format_flags_list = value
self.__clear_cache()
@property
def float_type(self) -> Union[type[float], type[Decimal], None]:
return self.__float_type
@float_type.setter
def float_type(self, value: Union[type[float], type[Decimal]]) -> None:
if self.__float_type == value:
return
self.__float_type = value
self.__clear_cache()
@property
def datetime_format_str(self) -> str:
return self.__datetime_format_str
@datetime_format_str.setter
def datetime_format_str(self, value: str) -> None:
if self.__datetime_format_str == value:
return
self.__datetime_format_str = value
self.__clear_cache()
@property
def strict_level_map(self) -> StrictLevelMap:
return self.__strict_level_map
@strict_level_map.setter
def strict_level_map(self, value: StrictLevelMap) -> None:
if self.__strict_level_map == value:
return
self.__strict_level_map = cast(dict[Union[Typecode, str], int], value)
self.__clear_cache()
@property
def east_asian_ambiguous_width(self) -> int:
return self.__east_asian_ambiguous_width
@east_asian_ambiguous_width.setter
def east_asian_ambiguous_width(self, value: int) -> None:
if self.__east_asian_ambiguous_width == value:
return
self.__east_asian_ambiguous_width = value
self.__clear_cache()
@property
def type_value_map(self) -> TypeValueMap:
return self.__type_value_map
@type_value_map.setter
def type_value_map(self, value: TypeValueMap) -> None:
if self.__type_value_map == value:
return
self.__type_value_map = value
self.__clear_cache()
def set_type_value(self, key: Typecode, value: Union[float, str, Decimal, None]) -> None:
self.__type_value_map[key] = value
self.__clear_cache()
def register_trans_func(self, trans_func: TransFunc) -> None:
self.__trans_func_list.insert(0, trans_func)
self.__clear_cache()
@property
def quoting_flags(self) -> dict[Typecode, bool]:
return self.__quoting_flags
@quoting_flags.setter
def quoting_flags(self, value: dict[Typecode, bool]) -> None:
if self.__quoting_flags == value:
return
self.__quoting_flags = value
self.__clear_cache()
@property
def datetime_formatter(self) -> Optional[DateTimeFormatter]:
return self.__datetime_formatter
@datetime_formatter.setter
def datetime_formatter(self, value: Optional[DateTimeFormatter]) -> None:
if self.__datetime_formatter == value:
return
self.__datetime_formatter = value
self.__clear_cache()
@property
def matrix_formatting(self) -> MatrixFormatting:
return self.__matrix_formatting
@matrix_formatting.setter
def matrix_formatting(self, value: MatrixFormatting) -> None:
if self.__matrix_formatting == value:
return
self.__matrix_formatting = value
self.__clear_cache()
@property
def max_workers(self) -> int:
assert self.__max_workers
return self.__max_workers
@max_workers.setter
def max_workers(self, value: Optional[int]) -> None:
try:
from _multiprocessing import SemLock, sem_unlink # noqa
except ImportError:
logger.debug("This platform lacks a functioning sem_open implementation")
value = 1
if "pytest" in sys.modules and value != 1:
logger.debug("set max_workers to 1 to avoid deadlock when executed from pytest")
value = 1
self.__max_workers = value
if not self.__max_workers:
self.__max_workers = DefaultValue.MAX_WORKERS
def to_dp(self, value: Any) -> DataProperty:
self.__update_dp_converter()
return self.__to_dp(value)
def to_dp_list(self, values: Sequence[Any]) -> list[DataProperty]:
if is_empty_sequence(values):
return []
self.__update_dp_converter()
return self._to_dp_list(values)
def to_column_dp_list(
self,
value_dp_matrix: Any,
previous_column_dp_list: Optional[Sequence[ColumnDataProperty]] = None,
) -> list[ColumnDataProperty]:
col_dp_list = self.__get_col_dp_list_base()
logger.debug("converting to column dataproperty:")
logs = [" params:"]
if self.headers:
logs.append(f" headers={len(self.headers)}")
logs.extend(
[
" prev_col_count={}".format(
len(previous_column_dp_list) if previous_column_dp_list else None
),
f" matrix_formatting={self.matrix_formatting}",
]
)
if self.column_type_hints:
logs.append(
" column_type_hints=({})".format(
", ".join(
[
type_hint.__name__ if type_hint else "none"
for type_hint in self.column_type_hints
]
)
)
)
else:
logs.append(" column_type_hints=()")
for log in logs:
logger.debug(log)
logger.debug(" results:")
for col_idx, value_dp_list in enumerate(zip(*value_dp_matrix)):
try:
col_dp_list[col_idx]
except IndexError:
col_dp_list.append(
ColumnDataProperty(
column_index=col_idx,
float_type=self.float_type,
min_width=self.min_column_width,
format_flags=self.__get_format_flags(col_idx),
is_formatting_float=self.is_formatting_float,
datetime_format_str=self.datetime_format_str,
east_asian_ambiguous_width=self.east_asian_ambiguous_width,
max_precision=self.__max_precision,
)
)
col_dp = col_dp_list[col_idx]
col_dp.begin_update()
try:
col_dp.merge(previous_column_dp_list[col_idx]) # type: ignore
except (TypeError, IndexError):
pass
for value_dp in value_dp_list:
col_dp.update_body(value_dp)
col_dp.end_update()
logger.debug(f" {str(col_dp):s}")
return col_dp_list
def to_dp_matrix(self, value_matrix: Sequence[Sequence[Any]]) -> DataPropertyMatrix:
self.__update_dp_converter()
logger.debug(f"max_workers={self.max_workers}, preprocessor={self.__preprocessor}")
value_matrix = self.__strip_data_matrix(value_matrix)
if self.__is_dp_matrix(value_matrix):
logger.debug("already a dataproperty matrix")
return value_matrix # type: ignore
if self.max_workers <= 1:
return self.__to_dp_matrix_st(value_matrix)
return self.__to_dp_matrix_mt(value_matrix)
def to_header_dp_list(self) -> list[DataProperty]:
self.__update_dp_converter()
preprocessor = copy.deepcopy(self.__preprocessor)
preprocessor.strip_str = self.strip_str_header
return self._to_dp_list(
self.headers,
type_hint=String,
preprocessor=preprocessor,
strict_level_map=MIN_STRICT_LEVEL_MAP,
)
def update_preprocessor(self, **kwargs: Any) -> bool:
is_updated = self.__preprocessor.update(**kwargs)
self.__update_dp_converter()
return is_updated
def update_strict_level_map(self, value: StrictLevelMap) -> bool:
org = copy.deepcopy(self.__strict_level_map)
self.__strict_level_map.update(value)
if org == self.__strict_level_map:
return False
self.__clear_cache()
return True
"""
def update_dict(self, lhs: Mapping, rhs: Mapping) -> bool:
is_updated = False
for key, value in rhs.items():
if key not in lhs:
lhs[]
continue
if getattr(lhs, key) == value:
continue
setattr(lhs, key, value)
is_updated = True
return is_updated
"""
@staticmethod
def __is_dp_matrix(value: Any) -> bool:
try:
return isinstance(value[0][0], DataProperty)
except (TypeError, IndexError):
return False
def __get_col_type_hint(self, col_idx: int) -> TypeHint:
try:
return self.column_type_hints[col_idx]
except (TypeError, IndexError):
return self.default_type_hint
def __get_format_flags(self, col_idx: int) -> int:
try:
return self.format_flags_list[col_idx]
except (TypeError, IndexError):
return self.__default_format_flags
def __to_dp(
self,
data: Any,
type_hint: TypeHint = None,
preprocessor: Optional[Preprocessor] = None,
strict_level_map: Optional[StrictLevelMap] = None,
) -> DataProperty:
for trans_func in self.__trans_func_list:
data = trans_func(data)
if type_hint:
return self.__to_dp_raw(
data,
type_hint=type_hint,
preprocessor=preprocessor,
strict_level_map=strict_level_map,
)
try:
if data in self.__dp_cache_map:
return self.__dp_cache_map[data]
except TypeError:
# unhashable type
pass
if data == 0:
if data is False:
return self.__dp_cache_false
return self.__dp_cache_zero
if data == 1:
if data is True:
return self.__dp_cache_true
return self.__dp_cache_one
return self.__to_dp_raw(
data, type_hint=type_hint, preprocessor=preprocessor, strict_level_map=strict_level_map
)
def __to_dp_raw(
self,
data: Any,
type_hint: TypeHint = None,
preprocessor: Optional[Preprocessor] = None,
strict_level_map: Optional[StrictLevelMap] = None,
) -> DataProperty:
if preprocessor:
preprocessor = Preprocessor(
dequote=preprocessor.dequote,
line_break_handling=preprocessor.line_break_handling,
line_break_repl=preprocessor.line_break_repl,
strip_str=preprocessor.strip_str,
is_escape_formula_injection=preprocessor.is_escape_formula_injection,
)
else:
preprocessor = Preprocessor(
dequote=self.preprocessor.dequote,
line_break_handling=self.preprocessor.line_break_handling,
line_break_repl=self.preprocessor.line_break_repl,
strip_str=self.preprocessor.strip_str,
is_escape_formula_injection=self.__preprocessor.is_escape_formula_injection,
)
value_dp = DataProperty(
data,
preprocessor=preprocessor,
type_hint=(type_hint if type_hint is not None else self.default_type_hint),
float_type=self.float_type,
datetime_format_str=self.datetime_format_str,
strict_level_map=(strict_level_map if type_hint is not None else self.strict_level_map),
east_asian_ambiguous_width=self.east_asian_ambiguous_width,
)
return self.__dp_converter.convert(value_dp)
def __to_dp_matrix_st(self, value_matrix: Sequence[Sequence[Any]]) -> DataPropertyMatrix:
return list(
zip( # type: ignore
*(
_to_dp_list_helper(
self,
col_idx,
values,
self.__get_col_type_hint(col_idx),
self.__preprocessor,
)[1]
for col_idx, values in enumerate(zip(*value_matrix))
)
)
)
def __to_dp_matrix_mt(self, value_matrix: Sequence[Sequence[Any]]) -> DataPropertyMatrix:
from concurrent import futures
col_data_map = {}
with futures.ProcessPoolExecutor(self.max_workers) as executor:
future_list = [
executor.submit(
_to_dp_list_helper,
self,
col_idx,
values,
self.__get_col_type_hint(col_idx),
self.__preprocessor,
)
for col_idx, values in enumerate(zip(*value_matrix))
]
for future in futures.as_completed(future_list):
col_idx, value_dp_list = future.result()
col_data_map[col_idx] = value_dp_list
return list(
zip(*(col_data_map[col_idx] for col_idx in sorted(col_data_map))) # type: ignore
)
def _to_dp_list(
self,
data_list: Sequence[Any],
type_hint: TypeHint = None,
preprocessor: Optional[Preprocessor] = None,
strict_level_map: Optional[StrictLevelMap] = None,
) -> list[DataProperty]:
if is_empty_sequence(data_list):
return []
type_counter: typing.Counter[type[AbstractType]] = Counter()
dp_list = []
for data in data_list:
expect_type_hint: TypeHint = type_hint
if type_hint is None:
try:
expect_type_hint, _count = type_counter.most_common(1)[0]
if not expect_type_hint(
data, float_type=self.float_type, strict_level=StrictLevel.MAX
).is_type():
expect_type_hint = None
except IndexError:
pass
dataprop = self.__to_dp(
data=data,
type_hint=expect_type_hint,
preprocessor=preprocessor if preprocessor else self.__preprocessor,
strict_level_map=strict_level_map,
)
type_counter[dataprop.type_class] += 1
dp_list.append(dataprop)
return dp_list
def __strip_data_matrix(self, data_matrix: Sequence[Sequence[Any]]) -> Sequence[Sequence[Any]]:
header_col_size = len(self.headers) if self.headers else 0
try:
col_size_list = [len(data_list) for data_list in data_matrix]
except TypeError:
return []
if self.headers:
min_col_size = min([header_col_size] + col_size_list)
max_col_size = max([header_col_size] + col_size_list)
elif col_size_list:
min_col_size = min(col_size_list)
max_col_size = max(col_size_list)
else:
min_col_size = 0
max_col_size = 0
if self.matrix_formatting == MatrixFormatting.EXCEPTION:
if min_col_size != max_col_size:
raise ValueError(
"nonuniform column size found: min={}, max={}".format(
min_col_size, max_col_size
)
)
return data_matrix
if self.matrix_formatting == MatrixFormatting.HEADER_ALIGNED:
if header_col_size > 0:
format_col_size = header_col_size
else:
format_col_size = max_col_size
elif self.matrix_formatting == MatrixFormatting.TRIM:
format_col_size = min_col_size
elif self.matrix_formatting == MatrixFormatting.FILL_NONE:
format_col_size = max_col_size
else:
raise ValueError(f"unknown matrix formatting: {self.matrix_formatting}")
return [
list(data_matrix[row_idx][:format_col_size]) + [None] * (format_col_size - col_size)
for row_idx, col_size in enumerate(col_size_list)
]
def __get_col_dp_list_base(self) -> list[ColumnDataProperty]:
header_dp_list = self.to_header_dp_list()
col_dp_list = []
for col_idx, header_dp in enumerate(header_dp_list):
col_dp = ColumnDataProperty(
column_index=col_idx,
float_type=self.float_type,
min_width=self.min_column_width,
format_flags=self.__get_format_flags(col_idx),
is_formatting_float=self.is_formatting_float,
datetime_format_str=self.datetime_format_str,
east_asian_ambiguous_width=self.east_asian_ambiguous_width,
max_precision=self.__max_precision,
)
col_dp.update_header(header_dp)
col_dp_list.append(col_dp)
return col_dp_list
def __update_dp_converter(self) -> None:
preprocessor = Preprocessor(
line_break_handling=self.__preprocessor.line_break_handling,
line_break_repl=self.preprocessor.line_break_repl,
is_escape_html_tag=self.__preprocessor.is_escape_html_tag,
is_escape_formula_injection=self.__preprocessor.is_escape_formula_injection,
)
self.__dp_converter = DataPropertyConverter(
preprocessor=preprocessor,
type_value_map=self.type_value_map,
quoting_flags=self.quoting_flags,
datetime_formatter=self.datetime_formatter,
datetime_format_str=self.datetime_format_str,
float_type=self.float_type,
strict_level_map=self.strict_level_map,
)
def _to_dp_list_helper(
extractor: DataPropertyExtractor,
col_idx: int,
data_list: Sequence[Any],
type_hint: TypeHint,
preprocessor: Preprocessor,
) -> tuple[int, list[DataProperty]]:
return (
col_idx,
extractor._to_dp_list(data_list, type_hint=type_hint, preprocessor=preprocessor),
)

View File

@ -0,0 +1,98 @@
import copy
from decimal import Decimal
from typing import Final, Optional, Union
from typepy import Nan, Typecode
DecimalPlaces = Union[float, Decimal]
class Format:
NONE: Final = 0
THOUSAND_SEPARATOR: Final = 1
class Formatter:
__slots__ = ("__is_formatting_float", "__format_flags", "__datetime_format_str")
_BLANK_CURLY_BRACES_FORMAT_MAP: Final[dict[Typecode, str]] = {
Typecode.NONE: "{}",
Typecode.IP_ADDRESS: "{}",
Typecode.BOOL: "{}",
Typecode.DICTIONARY: "{}",
Typecode.LIST: "{}",
}
def __init__(
self,
datetime_format_str: str,
is_formatting_float: Optional[bool] = True,
format_flags: Optional[int] = None,
) -> None:
if format_flags is not None:
self.__format_flags = format_flags
else:
self.__format_flags = Format.NONE
self.__datetime_format_str = datetime_format_str
self.__is_formatting_float = is_formatting_float
def make_format_map(
self, decimal_places: Optional[DecimalPlaces] = None
) -> dict[Typecode, str]:
format_map = copy.copy(self._BLANK_CURLY_BRACES_FORMAT_MAP)
format_map.update(
{
Typecode.INTEGER: self.make_format_str(Typecode.INTEGER),
Typecode.REAL_NUMBER: self.make_format_str(Typecode.REAL_NUMBER, decimal_places),
Typecode.INFINITY: self.make_format_str(Typecode.INFINITY),
Typecode.NAN: self.make_format_str(Typecode.NAN),
Typecode.DATETIME: self.make_format_str(Typecode.DATETIME),
}
)
return format_map
def make_format_str(
self, typecode: Typecode, decimal_places: Optional[DecimalPlaces] = None
) -> str:
format_str = self._BLANK_CURLY_BRACES_FORMAT_MAP.get(typecode)
if format_str is not None:
return format_str
if typecode == Typecode.INTEGER:
return self.__get_integer_format()
if typecode in (Typecode.REAL_NUMBER, Typecode.INFINITY, Typecode.NAN):
return self.__get_realnumber_format(decimal_places)
if typecode == Typecode.DATETIME:
return "{:" + self.__datetime_format_str + "}"
return "{:s}"
def __get_base_format_str(self) -> str:
if self.__format_flags & Format.THOUSAND_SEPARATOR:
return ","
return ""
def __get_integer_format(self) -> str:
return "{:" + self.__get_base_format_str() + "d}"
def __get_realnumber_format(self, decimal_places: Optional[DecimalPlaces]) -> str:
if not self.__is_formatting_float:
return "{}"
base_format = self.__get_base_format_str()
if decimal_places is None or Nan(decimal_places).is_type():
return "{:" + base_format + "f}"
try:
return "{:" + f"{base_format:s}.{decimal_places:d}f" + "}"
except ValueError:
pass
return "{:" + base_format + "f}"

View File

@ -0,0 +1,116 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import decimal
import re
from decimal import Decimal
from typing import Any, Final, Optional, Union
from typepy import Integer, RealNumber, TypeConversionError
_ansi_escape: Final = re.compile(r"(\x9b|\x1b\[)[0-?]*[ -\/]*[@-~]", re.IGNORECASE)
def get_integer_digit(value: Any) -> int:
float_type: Final = RealNumber(value)
with decimal.localcontext() as ctx:
ctx.prec = 60
ctx.rounding = decimal.ROUND_HALF_DOWN
try:
abs_value = abs(float_type.convert())
except TypeConversionError:
try:
abs_value = abs(Integer(value).convert())
except TypeConversionError:
raise ValueError(
f"the value must be a number: value='{value}' type='{type(value)}'"
)
return len(str(abs_value))
if abs_value.is_zero():
return 1
try:
return len(str(abs_value.quantize(Decimal("1."), rounding=decimal.ROUND_DOWN)))
except decimal.InvalidOperation:
return len(str(abs_value))
class DigitCalculator:
REGEXP_COMMON_LOG: Final = re.compile(r"[\d\.]+[eE]\-\d+")
REGEXP_SPLIT: Final = re.compile(r"[eE]\-")
def get_decimal_places(self, value: Union[str, float, int, Decimal]) -> int:
if Integer(value).is_type():
return 0
float_digit_len = 0
abs_value = abs(float(value))
text_value = str(abs_value)
float_text = "0"
if text_value.find(".") != -1:
float_text = text_value.split(".")[1]
float_digit_len = len(float_text)
elif self.REGEXP_COMMON_LOG.search(text_value):
float_text = self.REGEXP_SPLIT.split(text_value)[1]
float_digit_len = int(float_text)
return float_digit_len
_digit_calculator = DigitCalculator()
def get_number_of_digit(
value: Any, max_decimal_places: int = 99
) -> tuple[Optional[int], Optional[int]]:
try:
integer_digits = get_integer_digit(value)
except (ValueError, TypeError, OverflowError):
return (None, None)
try:
decimal_places: Optional[int] = min(
_digit_calculator.get_decimal_places(value), max_decimal_places
)
except (ValueError, TypeError):
decimal_places = None
return (integer_digits, decimal_places)
def _validate_eaaw(east_asian_ambiguous_width: int) -> None:
if east_asian_ambiguous_width in (1, 2):
return
raise ValueError(
"invalid east_asian_ambiguous_width: expected=1 or 2, actual={}".format(
east_asian_ambiguous_width
)
)
def strip_ansi_escape(unicode_str: str) -> str:
return _ansi_escape.sub("", unicode_str)
def calc_ascii_char_width(unicode_str: str, east_asian_ambiguous_width: int = 1) -> int:
import unicodedata
width = 0
for char in unicode_str:
char_width = unicodedata.east_asian_width(char)
if char_width in "WF":
width += 2
elif char_width == "A":
_validate_eaaw(east_asian_ambiguous_width)
width += east_asian_ambiguous_width
else:
width += 1
return width

View File

@ -0,0 +1,34 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import abc
from typing import Optional
from typepy import Typecode
from ._align import Align
class DataPeropertyInterface(metaclass=abc.ABCMeta):
__slots__ = ()
@property
@abc.abstractmethod
def align(self) -> Align: # pragma: no cover
pass
@property
@abc.abstractmethod
def decimal_places(self) -> Optional[int]: # pragma: no cover
pass
@property
@abc.abstractmethod
def typecode(self) -> Typecode: # pragma: no cover
pass
@property
@abc.abstractmethod
def typename(self) -> str: # pragma: no cover
pass

View File

@ -0,0 +1,8 @@
from enum import Enum, unique
@unique
class LineBreakHandling(Enum):
NOP = 0
REPLACE = 1
ESCAPE = 2

View File

@ -0,0 +1,173 @@
import html
import re
from typing import Any, Final, Optional, Union
from mbstrdecoder import MultiByteStrDecoder
from ._function import strip_ansi_escape
from ._line_break import LineBreakHandling
_RE_LINE_BREAK: Final = re.compile(r"\r\n|\n")
_RE_FORMULA_PREFIX: Final = re.compile(r"^[-\+=@]")
def normalize_lbh(value: Optional[LineBreakHandling]) -> LineBreakHandling:
if isinstance(value, LineBreakHandling):
return value
if value is None:
return LineBreakHandling.NOP
return LineBreakHandling[value.upper()] # type: ignore
class Preprocessor:
@property
def line_break_handling(self) -> Optional[LineBreakHandling]:
return self.__line_break_handling
@line_break_handling.setter
def line_break_handling(self, value: Optional[LineBreakHandling]) -> None:
self.__line_break_handling = normalize_lbh(value)
def __init__(
self,
strip_str: Optional[Union[str, bytes]] = None,
replace_tabs_with_spaces: bool = True,
tab_length: int = 2,
line_break_handling: Optional[LineBreakHandling] = None,
line_break_repl: str = " ",
dequote: bool = False,
is_escape_html_tag: bool = False,
is_escape_formula_injection: bool = False,
) -> None:
self.strip_str = strip_str
self.replace_tabs_with_spaces = replace_tabs_with_spaces
self.tab_length = tab_length
self.line_break_handling = line_break_handling
self.line_break_repl = line_break_repl
self.dequote = dequote
self.is_escape_html_tag = is_escape_html_tag
self.is_escape_formula_injection = is_escape_formula_injection
def __repr__(self) -> str:
return ", ".join(
[
f"strip_str={self.strip_str!r}",
f"replace_tabs_with_spaces={self.replace_tabs_with_spaces}",
f"tab_length={self.tab_length}",
f"line_break_handling={self.line_break_handling}",
f"line_break_repl={self.line_break_repl}",
f"escape_html_tag={self.is_escape_html_tag}",
f"escape_formula_injection={self.is_escape_formula_injection}",
]
)
def preprocess(self, data: Any) -> tuple:
data, no_ansi_escape_data = self.__preprocess_string(
self.__preprocess_data(data, self.strip_str),
)
return (data, no_ansi_escape_data)
def update(self, **kwargs: Any) -> bool:
is_updated = False
for key, value in kwargs.items():
if not hasattr(self, key):
continue
if getattr(self, key) == value:
continue
setattr(self, key, value)
is_updated = True
return is_updated
def __preprocess_string(self, raw_data: Any) -> tuple[Any, Optional[str]]:
data = raw_data
if not isinstance(data, str):
return (data, None)
if self.replace_tabs_with_spaces:
try:
data = data.replace("\t", " " * self.tab_length)
except (TypeError, AttributeError, ValueError):
pass
if self.is_escape_html_tag:
try:
data = html.escape(data)
except AttributeError:
return (data, None)
data = self.__process_line_break(data)
data = self.__escape_formula_injection(data)
data = self.__dequote(data)
try:
return (data, strip_ansi_escape(data))
except TypeError:
return (data, None)
@staticmethod
def __preprocess_data(data: Any, strip_str: Optional[Union[str, bytes]]) -> Any:
if strip_str is None:
return data
try:
return data.strip(strip_str)
except AttributeError:
return data
except UnicodeDecodeError:
return MultiByteStrDecoder(data).unicode_str.strip(str(strip_str))
except TypeError:
# reach here when data and strip_str type are different
if isinstance(data, bytes):
return MultiByteStrDecoder(data).unicode_str.strip(str(strip_str))
elif isinstance(strip_str, bytes):
return data.strip(MultiByteStrDecoder(strip_str).unicode_str)
def __dequote(self, s: str) -> str:
if not self.dequote or not s:
return s
try:
if (s[0] == s[-1]) and s.startswith(("'", '"')):
if s.count(s[0]) == 2:
return s[1:-1]
except TypeError:
pass
return s
def __process_line_break(self, data: str) -> str:
lbh = self.line_break_handling
if lbh == LineBreakHandling.NOP:
return data
try:
if lbh == LineBreakHandling.REPLACE:
return _RE_LINE_BREAK.sub(self.line_break_repl, data)
if lbh == LineBreakHandling.ESCAPE:
return data.replace("\n", "\\n").replace("\r", "\\r")
except (TypeError, AttributeError):
return data
raise ValueError(f"unexpected line_break_handling: {lbh}")
def __escape_formula_injection(self, data: str) -> str:
if not self.is_escape_formula_injection:
return data
try:
if _RE_FORMULA_PREFIX.search(data):
return "'" + data
except (TypeError, AttributeError):
return data
return data

View File

@ -0,0 +1,7 @@
from ._logger import logger, set_logger # type: ignore
__all__ = (
"logger",
"set_logger",
)

View File

@ -0,0 +1,22 @@
"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
from ._null_logger import NullLogger
MODULE_NAME = "dataproperty"
try:
from loguru import logger # type: ignore
logger.disable(MODULE_NAME)
except ImportError:
logger = NullLogger()
def set_logger(is_enable: bool, propagation_depth: int = 1) -> None:
if is_enable:
logger.enable(MODULE_NAME)
else:
logger.disable(MODULE_NAME)

View File

@ -0,0 +1,41 @@
class NullLogger:
level_name = None
def remove(self, handler_id=None): # pragma: no cover
pass
def add(self, sink, **kwargs): # pragma: no cover
pass
def disable(self, name): # pragma: no cover
pass
def enable(self, name): # pragma: no cover
pass
def critical(self, __message, *args, **kwargs): # pragma: no cover
pass
def debug(self, __message, *args, **kwargs): # pragma: no cover
pass
def error(self, __message, *args, **kwargs): # pragma: no cover
pass
def exception(self, __message, *args, **kwargs): # pragma: no cover
pass
def info(self, __message, *args, **kwargs): # pragma: no cover
pass
def log(self, __level, __message, *args, **kwargs): # pragma: no cover
pass
def success(self, __message, *args, **kwargs): # pragma: no cover
pass
def trace(self, __message, *args, **kwargs): # pragma: no cover
pass
def warning(self, __message, *args, **kwargs): # pragma: no cover
pass

View File

@ -0,0 +1,63 @@
from collections.abc import Mapping
from datetime import datetime
from decimal import Decimal
from typing import Any, Callable, Final, Optional, Union
from typepy import (
Bool,
DateTime,
Dictionary,
Infinity,
Integer,
IpAddress,
List,
Nan,
NoneType,
NullString,
RealNumber,
String,
Typecode,
)
from typepy.type import AbstractType
TypeHint = Optional[type[AbstractType]]
TransFunc = Callable[[Any], Any]
DateTimeFormatter = Callable[[datetime], str]
FloatType = Union[type[Decimal], type[float]]
StrictLevelMap = Mapping[Union[str, Typecode], int]
TypeValueMap = dict[Typecode, Union[float, str, Decimal, None]]
_type_hint_map: Final = {
# high frequently used types
"int": Integer,
"float": RealNumber,
"realnumber": RealNumber,
"str": String,
# low frequently used types
"bool": Bool,
"datetime": DateTime,
"dict": Dictionary,
"inf": Infinity,
"ip": IpAddress,
"list": List,
"nan": Nan,
"none": NoneType,
"nullstr": NullString,
}
def normalize_type_hint(type_hint: Union[str, TypeHint]) -> TypeHint:
if not type_hint:
return None
if not isinstance(type_hint, str):
return type_hint
type_hint = type_hint.strip().casefold()
for key, value in _type_hint_map.items():
if type_hint.startswith(key):
return value
raise ValueError(f"unknown typehint: {type_hint}")