# Copyright (C) 2021 The InstanceLib Authors. All Rights Reserved.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import functools
import itertools
from os import PathLike
from typing import (
Any,
Callable,
Dict,
FrozenSet,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)
from uuid import UUID
import numpy.typing as npt
import pandas as pd
from instancelib.environment.memory import MemoryEnvironment
from instancelib.labels.memory import MemoryLabelProvider
from ..environment import AbstractEnvironment
from ..environment.text import TextEnvironment
from ..instances.base import Instance, InstanceProvider
from ..instances.text import MemoryTextInstance, TextInstanceProvider
from ..labels.base import LabelProvider
from ..typehints import DT, KT, LT, RT, VT
from ..utils.func import list_unzip3, single_or_collection
[docs]def identity_mapper(value: Any) -> Optional[str]:
"""Coerces any value to its string represenation
Parameters
----------
value : Any
Any value that can be coerced into a string
Returns
-------
Optional[str]
The string representation of the value. If
coercion somehow failed, it will return None.
"""
if isinstance(value, str):
return value
coerced = str(value)
if not coerced:
return None
return coerced
[docs]def build_environment(
df: pd.DataFrame,
label_mapper: Callable[[Any], Optional[str]],
labels: Optional[Iterable[str]],
data_cols: Sequence[str],
label_cols: Sequence[str],
) -> AbstractEnvironment[
MemoryTextInstance[int, npt.NDArray[Any]],
Union[int, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
"""Build an environment from a data frame
Parameters
----------
df : pd.DataFrame
A data frame that contains all texts and labels
label_mapping : Mapping[int, str]
A mapping from indices to label strings
data_cols : Sequence[str]
A sequence of columns that contain the texts
label_col : str
The name of the column that contains the label data
Returns
-------
MemoryEnvironment[int, str, npt.NDArray[Any], str]
A MemoryEnvironment that contains the
"""
labelfunc = functools.partial(
inv_transform_mapping, label_cols, label_mapper=label_mapper
)
indices, texts, true_labels = extract_data(df, data_cols, labelfunc)
if labels is None:
labels = frozenset(itertools.chain.from_iterable(true_labels))
environment = TextEnvironment[int, npt.NDArray[Any], str].from_data(
labels, indices, texts, true_labels, []
)
return environment
[docs]def build_environment_with_id(
df: pd.DataFrame,
label_mapper: Callable[[Any], Optional[str]],
labels: Optional[Iterable[str]],
id_col: str,
data_cols: Sequence[str],
label_cols: Sequence[str],
) -> AbstractEnvironment[
MemoryTextInstance[Any, npt.NDArray[Any]],
Union[Any, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
labelfunc = functools.partial(
inv_transform_mapping, label_cols, label_mapper=label_mapper
)
indices, texts, true_labels = extract_data_with_id(df, id_col, data_cols, labelfunc)
if labels is None:
labels = frozenset(itertools.chain.from_iterable(true_labels))
environment = TextEnvironment[int, npt.NDArray[Any], str].from_data(
labels, indices, texts, true_labels, []
)
return environment
[docs]def read_excel_dataset(
path: "Union[str, PathLike[str]]",
data_cols: Sequence[str],
label_cols: Sequence[str],
labels: Optional[Iterable[str]] = None,
label_mapper: Callable[[Any], Optional[str]] = identity_mapper,
) -> AbstractEnvironment[
MemoryTextInstance[int, npt.NDArray[Any]],
Union[int, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
"""Read csv datasets that contain text data
Parameters
----------
path : Union[str, PathLike[str]]
The path to the csv file
data_cols : Sequence[str]
The columns that contain the text data
label_cols : Sequence[str]
The columns that contain the columns
labels : Optional[Iterable[str]], optional
The set of labels that are possible.
If None, the set will be inferred from data
This parameter is by default None
label_mapper : Callable[[Any], Optional[str]], optional
A function that transferm labels into another representation
This paramater is by default :func:`identity_mapper`, which just
outputs its input.
Returns
-------
AbstractEnvironment[TextInstance[int, npt.NDArray[Any]], Union[int, UUID], str, npt.NDArray[Any], str, str]
An environment that contains all the information from the CSV file
"""
df: pd.DataFrame = pd.read_excel(path) # type: ignore
env = build_environment(df, label_mapper, labels, data_cols, label_cols)
return env
[docs]def read_csv_dataset(
path: "Union[str, PathLike[str]]",
data_cols: Sequence[str],
label_cols: Sequence[str],
labels: Optional[Iterable[str]] = None,
label_mapper: Callable[[Any], Optional[str]] = identity_mapper,
) -> AbstractEnvironment[
MemoryTextInstance[int, npt.NDArray[Any]],
Union[int, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
"""Read Excel filse that contain text data
Parameters
----------
path : Union[str, PathLike[str]]
The path to the Excel file
data_cols : Sequence[str]
The columns that contain the text data
label_cols : Sequence[str]
The columns that contain the columns
labels : Optional[Iterable[str]], optional
The set of labels that are possible.
If None, the set will be inferred from data
This parameter is by default None
label_mapper : Callable[[Any], Optional[str]], optional
A function that transferm labels into another representation
This paramater is by default :func:`identity_mapper`, which just
outputs its input.
Returns
-------
AbstractEnvironment[TextInstance[int, npt.NDArray[Any]], Union[int, UUID], str, npt.NDArray[Any], str, str]
An environment that contains all the information from the Excel file
"""
df: pd.DataFrame = pd.read_csv(path) # type: ignore
env = build_environment(df, label_mapper, labels, data_cols, label_cols)
return env
[docs]def pandas_to_env(
df: Union[pd.DataFrame, Dict[str, pd.DataFrame]],
data_cols: Union[str, Sequence[str]],
label_cols: Union[str, Sequence[str]],
labels: Optional[Iterable[str]] = None,
) -> AbstractEnvironment[
MemoryTextInstance[Any, npt.NDArray[Any]],
Union[Any, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
l_data_cols = single_or_collection(data_cols)
l_label_cols = single_or_collection(label_cols)
if isinstance(df, dict):
env = build_from_multiple_dfs(
df, identity_mapper, labels, l_data_cols, l_label_cols
)
else:
env = build_environment(df, identity_mapper, labels, l_data_cols, l_label_cols)
return env
[docs]def pandas_to_env_with_id(
df: Union[pd.DataFrame, Dict[str, pd.DataFrame]],
id_col: str,
data_cols: Union[str, Sequence[str]],
label_cols: Union[str, Sequence[str]],
labels: Optional[Iterable[str]] = None,
) -> AbstractEnvironment[
MemoryTextInstance[Any, npt.NDArray[Any]],
Union[Any, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
l_data_cols = single_or_collection(data_cols)
l_label_cols = single_or_collection(label_cols)
if isinstance(df, dict):
env = build_from_multiple_dfs_with_ids(
df, identity_mapper, labels, id_col, l_data_cols, l_label_cols
)
else:
env = build_environment_with_id(
df, identity_mapper, labels, id_col, l_data_cols, l_label_cols
)
return env
IT = TypeVar("IT", bound="Instance[Any, Any, Any, Any]")
[docs]def to_dicts(
triples: Iterator[Tuple[KT, IT, FrozenSet[LT]]]
) -> Tuple[Mapping[KT, IT], Mapping[KT, FrozenSet[LT]]]:
keys, instances, labels = list_unzip3(triples)
instance_dict = dict(zip(keys, instances))
label_dict = dict(zip(keys, labels))
return instance_dict, label_dict
[docs]def to_environment(
prov_builder: Callable[[Mapping[KT, IT]], InstanceProvider[IT, KT, DT, VT, RT]],
labelprov_builder: Callable[[Mapping[KT, FrozenSet[LT]]], LabelProvider[KT, LT]],
dictionaries: Tuple[Mapping[KT, IT], Mapping[KT, FrozenSet[LT]]],
) -> AbstractEnvironment[IT, KT, DT, VT, RT, LT]:
instances, labels = dictionaries
ins_provider = prov_builder(instances)
lbl_provider = labelprov_builder(labels)
return MemoryEnvironment(ins_provider, lbl_provider)
[docs]def build_from_multiple_dfs(
df_dict: Dict[str, pd.DataFrame],
label_mapper: Callable[[Any], Optional[str]],
labels: Optional[Iterable[str]],
data_cols: Sequence[str],
label_cols: Sequence[str],
) -> AbstractEnvironment[
MemoryTextInstance[str, npt.NDArray[Any]],
Union[str, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
"""Build an environment from a data frame
Parameters
----------
df : pd.DataFrame
A data frame that contains all texts and labels
label_mapping : Mapping[int, str]
A mapping from indices to label strings
data_cols : Sequence[str]
A sequence of columns that contain the texts
label_col : str
The name of the column that contains the label data
Returns
-------
MemoryEnvironment[int, str, npt.NDArray[Any], str]
A MemoryEnvironment that contains the
"""
labelfunc = functools.partial(
inv_transform_mapping, label_cols, label_mapper=label_mapper
)
indices_table: Dict[str, List[str]] = dict()
indices: List[str] = list()
texts: List[str] = list()
true_labels: List[FrozenSet[str]] = list()
for df_key, df in df_dict.items():
idxs, df_texts, df_true_labels = extract_data(df, data_cols, labelfunc)
indices_table[df_key] = [f"{df_key}_{idx}" for idx in idxs]
indices = indices + indices_table[df_key]
texts = texts + df_texts
true_labels = true_labels + df_true_labels
if labels is None:
labels = frozenset(itertools.chain.from_iterable(true_labels))
environment = TextEnvironment[str, npt.NDArray[Any], str].from_data(
labels, indices, texts, true_labels, []
)
for key, split_indices in indices_table.items():
environment[key] = environment.create_bucket(split_indices)
return environment
[docs]def build_from_multiple_dfs_with_ids(
df_dict: Dict[str, pd.DataFrame],
label_mapper: Callable[[Any], Optional[str]],
labels: Optional[Iterable[str]],
id_col: str,
data_cols: Sequence[str],
label_cols: Sequence[str],
) -> AbstractEnvironment[
MemoryTextInstance[str, npt.NDArray[Any]],
Union[str, UUID],
str,
npt.NDArray[Any],
str,
str,
]:
"""Build an environment from a data frame
Parameters
----------
df : pd.DataFrame
A data frame that contains all texts and labels
label_mapping : Mapping[int, str]
A mapping from indices to label strings
data_cols : Sequence[str]
A sequence of columns that contain the texts
label_col : str
The name of the column that contains the label data
Returns
-------
MemoryEnvironment[int, str, npt.NDArray[Any], str]
A MemoryEnvironment that contains the
"""
labelfunc = functools.partial(
inv_transform_mapping, label_cols, label_mapper=label_mapper
)
indices_table: Dict[str, List[str]] = dict()
indices: List[str] = list()
texts: List[str] = list()
true_labels: List[FrozenSet[str]] = list()
for df_key, df in df_dict.items():
indices_table[df_key], df_texts, df_true_labels = extract_data_with_id(
df, id_col, data_cols, labelfunc
)
indices = indices + indices_table[df_key]
texts = texts + df_texts
true_labels = true_labels + df_true_labels
if labels is None:
labels = frozenset(itertools.chain.from_iterable(true_labels))
environment = TextEnvironment[str, npt.NDArray[Any], str].from_data(
labels, indices, texts, true_labels, []
)
for key, split_indices in indices_table.items():
environment[key] = environment.create_bucket(split_indices)
return environment
[docs]def text_concatenation(*cols: str) -> Callable[[pd.Series], str]:
def callable(row: pd.Series) -> str:
text = " ".join([col for col in cols if row[col]])
return text
return callable
[docs]def no_vector() -> Callable[[pd.Series], Optional[npt.NDArray[Any]]]:
def callable(row: pd.Series) -> Optional[npt.NDArray[Any]]:
return None
return callable
[docs]def id_col(col: str) -> Callable[[pd.Series, Any], Any]:
def callable(row: pd.Series, idx: Any) -> Any:
identifier = row[col]
return identifier
return callable
[docs]def id_index() -> Callable[[pd.Series, Any], Any]:
def callable(row: pd.Series, idx: Any) -> Any:
return idx
return callable
[docs]def id_index_prefix(prefix: str) -> Callable[[pd.Series, Any], str]:
def callable(row: pd.Series, idx: Any) -> str:
return f"{prefix}_{idx}"
return callable
[docs]def text_builder(
identifier: KT,
data: str,
vector: VT,
representation: str,
row: pd.Series,
idx: Any,
) -> MemoryTextInstance[KT, VT]:
return MemoryTextInstance(identifier, data, vector, representation)
[docs]def text_from_pandas_multilabel(
df_dict: Dict[str, pd.DataFrame],
text_cols: Sequence[str],
label_cols: Sequence[str],
labelset: FrozenSet[str],
):
def instances() -> Iterator[
Tuple[str, MemoryTextInstance[str, Any], FrozenSet[str]]
]:
for name, df in df_dict.items():
df_instances = instance_extractor(
df,
id_index_prefix(name),
text_concatenation(*text_cols),
no_vector(),
text_concatenation(*text_cols),
one_hot_encoded_extractor(*label_cols),
text_builder,
)
yield from df_instances
dicts = to_dicts(instances())
pass