from typing import Optional, Union, List, Dict, Any, Iterable, Tuple
import traceback
import torch
import os
import json
import pandas as pd
from datasets import Dataset as HFDataset, load_from_disk
from torch.utils.data import Dataset
from ..Tokenizer import get_tokenizer
from ..ModelZoo.utils import pad_sequence
from ..SIF import EDU_SPYMBOLS
__all__ = ["EduVocab", "EduDataset", "PretrainedEduTokenizer"]
[docs]class EduVocab(object):
"""The vocabulary container for a corpus.
Parameters
----------
vocab_path : str, optional
vocabulary path to initialize this container, by default None
corpus_items : List[str], optional
corpus items to update this vocabulary, by default None
bos_token : str, optional
token representing for the start of a sentence, by default "[BOS]"
eos_token : str, optional
token representing for the end of a sentence, by default "[EOS]"
pad_token : str, optional
token representing for padding, by default "[PAD]"
unk_token : str, optional
token representing for unknown word, by default "[UNK]"
specials : List[str], optional
spacials tokens in vocabulary, by default None
lower : bool, optional
wheather to lower the corpus items, by default False
trim_min_count : int, optional
the lower bound number for adding a word into vocabulary, by default 1
"""
def __init__(self, vocab_path: str = None, corpus_items: List[str] = None, bos_token: str = "[BOS]",
eos_token: str = "[EOS]", pad_token: str = "[PAD]", unk_token: str = "[UNK]",
specials: List[str] = None, lower: bool = False, trim_min_count: int = 1, **kwargs):
super(EduVocab, self).__init__()
self._tokens = []
self.idx_to_token = dict()
self.token_to_idx = dict()
self.frequencies = dict()
# 定义特殊词
self.bos_token = bos_token
self.eos_token = eos_token
self.pad_token = pad_token
self.unk_token = unk_token
self._special_tokens = [self.pad_token, self.unk_token, self.bos_token, self.eos_token]
if specials:
self._special_tokens += specials
for st in self._special_tokens:
self._add(st)
# 加载词典
if vocab_path is not None:
self.load_vocab(vocab_path)
elif corpus_items is not None:
self.set_vocab(corpus_items, lower, trim_min_count)
self.bos_idx = self.token_to_idx[self.bos_token]
self.eos_idx = self.token_to_idx[self.eos_token]
self.pad_idx = self.token_to_idx[self.pad_token]
self.unk_idx = self.token_to_idx[self.unk_token]
def __len__(self):
return len(self._tokens)
@property
def vocab_size(self):
return len(self._tokens)
@property
def special_tokens(self):
return self._special_tokens
@property
def tokens(self):
return self._tokens
[docs] def to_idx(self, token):
"""convert token to index"""
return self.token_to_idx.get(token, self.unk_idx)
[docs] def to_token(self, idx):
"""convert index to index"""
return self.idx_to_token.get(idx, self.unk_token)
[docs] def convert_sequence_to_idx(self, tokens, bos=False, eos=False):
"""convert sentence of tokens to sentence of indexs"""
res = [self.to_idx(t) for t in tokens]
if bos is True:
res = [self.bos_idx] + res
if eos is True:
res = res + [self.eos_idx]
return res
[docs] def convert_sequence_to_token(self, idxs, **kwargs):
"""convert sentence of indexs to sentence of tokens"""
return [self.to_token(i) for i in idxs]
[docs] def set_vocab(self, corpus_items: List[str], lower: bool = False, trim_min_count: int = 1, silent=True):
"""Update the vocabulary with the tokens in corpus items
Parameters
----------
corpus_items : List[str], optional
corpus items to update this vocabulary, by default None
lower : bool, optional
wheather to lower the corpus items, by default False
trim_min_count : int, optional
the lower bound number for adding a word into vocabulary, by default 1
"""
word2cnt = dict()
for item in corpus_items:
for word in item:
word = word.lower() if lower else word
word2cnt[word] = word2cnt.get(word, 0) + 1
words = [w for w, c in word2cnt.items() if c >= trim_min_count and w not in self._special_tokens]
for token in words:
self._add(token)
if not silent:
keep_word_cnts = sum(word2cnt[w] for w in words)
all_word_cnts = sum(word2cnt.values())
print(f"save words(trim_min_count={trim_min_count}): {len(words)}/{len(word2cnt)} = {len(words) / len(word2cnt):.4f}\
with frequency {keep_word_cnts}/{all_word_cnts}={keep_word_cnts / all_word_cnts:.4f}")
[docs] def load_vocab(self, vocab_path: str):
"""Load the vocabulary from vocab_file
Parameters
----------
vocab_path : str
path to save vocabulary file
"""
with open(vocab_path, "r", encoding="utf-8") as file:
self._tokens = file.read().strip().split('\n')
self.token_to_idx = {token: idx for idx, token in enumerate(self._tokens)}
self.idx_to_token = {idx: token for idx, token in enumerate(self._tokens)}
[docs] def save_vocab(self, vocab_path: str):
"""Save the vocabulary into vocab_file
Parameters
----------
vocab_path : str
path to save vocabulary file
"""
with open(vocab_path, 'w', encoding='utf-8') as file:
for i in range(self.vocab_size):
token = self._tokens[i]
file.write(f"{token}\n")
def _add(self, token: str):
if token not in self._tokens:
idx = len(self._tokens)
self._tokens.append(token)
self.idx_to_token[idx] = token
self.token_to_idx[token] = idx
[docs] def add_specials(self, tokens: List[str]):
"""Add special tokens into vocabulary"""
for token in tokens:
if token not in self._special_tokens:
self._special_tokens += [token]
self._add(token)
[docs] def add_tokens(self, tokens: List[str]):
"""Add tokens into vocabulary"""
for token in tokens:
self._add(token)
# to do: how to handle tokenizer with formulas or pictures.
[docs]class PretrainedEduTokenizer(object):
"""This base class is in charge of preparing the inputs for a model
Parameters
----------
vocab_path : str, optional
_description_, by default None
max_length : int, optional
used to clip the sentence out of max_length, by default None
tokenize_method : str, optional
default: "space"
- when text is already seperated by space, use "space"
- when text is raw string format, use Tokenizer defined in get_tokenizer(), such as "pure_text" and "text"
add_specials : Tuple[list, bool], optional
by default None
- For bool, it means whether to add EDU_SPYMBOLS to vocabulary
- For list, it means the added special tokens besides EDU_SPYMBOLS
"""
def __init__(self, vocab_path: str = None, max_length: int = 250, tokenize_method: str = "pure_text",
add_specials: Tuple[list, bool] = False, **kwargs):
self._set_basic_tokenizer(tokenize_method, **kwargs)
if isinstance(add_specials, bool):
add_specials = EDU_SPYMBOLS if add_specials else []
else:
add_specials = EDU_SPYMBOLS + add_specials
self.max_length = max_length
self.vocab = EduVocab(vocab_path=vocab_path, specials=add_specials, **kwargs)
self.config = {k: v for k, v in locals().items() if k not in ["self", "__class__", "vocab_path"]}
def __call__(self, items: Tuple[list, str, dict], key=lambda x: x, padding: Tuple[bool, str] = True,
max_length=None, return_tensors=True, return_text=False, **kwargs) -> Dict[str, Any]:
"""
Parameters
----------
items: list or str or dict
the question items
key: function
determine how to get the text of each item
padding: bool
whether to pad the seq_idx
return_tensors: bool
whether to return data as tensors (would ignore text tokens)
return_text: bool
whether to return text tokens
Returns
-------
ret: dict
{"seq_idx": None, "seq_len": None}
or {"seq_token": None, seq_idx": None, "seq_len": None}.
The shape of element is (batch, seq) or (batch,).
Notes:
-------
Be Make sure Tokenizer output batched tensors by default
"""
batch_max_length = None
max_length = self.max_length if max_length is None else max_length
if isinstance(padding, str):
if padding == "max_length":
batch_max_length = max_length
padding = True
elif padding == "longest":
padding = True
elif padding == "do_not_pad":
padding = False
else:
raise ValueError("'padding' must be `bool` or `string` in ['max_length', 'longest', 'do_not_pad']")
token_items = self.tokenize(items, key)
if isinstance(items, dict) or isinstance(items, str):
token_items = [token_items]
if max_length is not None:
token_items = [seq[:max_length] for seq in token_items]
seqs = [self.vocab.convert_sequence_to_idx(token_item,
bos=kwargs.get("bos", False),
eos=kwargs.get("eos", False)) for token_item in token_items]
lengths = [len(seq) for seq in seqs]
ret = {
"seq_idx": pad_sequence(seqs, pad_val=self.vocab.pad_idx, max_length=batch_max_length) if padding else seqs,
"seq_len": lengths
}
if isinstance(items, dict) or isinstance(items, str):
ret = {k: v[0] for k, v in ret.items()}
token_items = token_items[0]
if return_tensors:
ret = {key: torch.as_tensor(val) for key, val in ret.items()}
if return_text:
ret["seq_token"] = token_items
return ret
def __len__(self):
return len(self.vocab)
def _set_basic_tokenizer(self, tokenize_method: str, **kwargs):
self.tokenize_method = tokenize_method
self.text_tokenizer = get_tokenizer(tokenize_method, **kwargs)
[docs] def tokenize(self, items: Tuple[list, str, dict], key=lambda x: x, **kwargs):
"""
Parameters
----------
items: list or str or dict
the question items
key: function
determine how to get the text of each item
Returns
-------
tokens: list
the token of items
"""
if isinstance(items, str) or isinstance(items, dict):
return self._tokenize(items, key=key)
else:
return [self._tokenize(item, key=key) for item in items]
[docs] def encode(self, items: Tuple[str, dict, List[str], List[dict]], key=lambda x: x, **kwargs):
if isinstance(items, str) or isinstance(items, dict):
return self.vocab.convert_sequence_to_idx(self.tokenize(key(items)), **kwargs)
else:
return [self.vocab.convert_sequence_to_idx(self.tokenize(key(item)), **kwargs) for item in items]
[docs] def decode(self, token_ids: list, key=lambda x: x, **kwargs):
if isinstance(token_ids[0], list):
return [self.vocab.convert_sequence_to_token(key(item), **kwargs) for item in token_ids]
else:
return self.vocab.convert_sequence_to_token(key(token_ids), **kwargs)
def _pad(self):
raise NotImplementedError
def _tokenize(self, item: Tuple[str, dict], key=lambda x: x):
token_item = self.text_tokenizer._tokenize(item, key=key)
if len(token_item) == 0:
token_item = [self.vocab.unk_token]
if len(token_item) > self.max_length:
token_item = token_item[:self.max_length]
return token_item
[docs] @classmethod
def from_pretrained(cls, tokenizer_config_dir: str, **kwargs):
"""Load tokenizer from local files
Parameters:
-----------
tokenizer_config_dir: str
The dir path containing tokenizer_config.json and vocab.list
"""
tokenizer_config_path = os.path.join(tokenizer_config_dir, "tokenizer_config.json")
pretrained_vocab_path = os.path.join(tokenizer_config_dir, "vocab.txt")
with open(tokenizer_config_path, "r", encoding="utf-8") as rf:
tokenizer_config = json.load(rf)
tokenizer_config.update(kwargs)
return cls(
vocab_path=pretrained_vocab_path,
**tokenizer_config)
[docs] def save_pretrained(self, tokenizer_config_dir: str):
"""Save tokenizer into local files
Parameters:
-----------
tokenizer_config_dir: str
save tokenizer params in `/tokenizer_config.json` and save words in `/vocab.list`
"""
if not os.path.exists(tokenizer_config_dir):
os.makedirs(tokenizer_config_dir, exist_ok=True)
tokenizer_config_path = os.path.join(tokenizer_config_dir, "tokenizer_config.json")
vocab_path = os.path.join(tokenizer_config_dir, "vocab.txt")
self.vocab.save_vocab(vocab_path)
with open(tokenizer_config_path, "w", encoding="utf-8") as wf:
json.dump(self.config, wf, ensure_ascii=False, indent=2)
@property
def vocab_size(self):
return len(self.vocab)
[docs] def set_vocab(self, items: list, key=lambda x: x, lower: bool = False,
trim_min_count: int = 1, do_tokenize: bool = True):
"""Update the vocabulary with the tokens in corpus items
Parameters
----------
items: list
can be the list of str, or list of dict
key: function, optional
determine how to get the text of each item
lower : bool, optional
wheather to lower the corpus items, by default False
trim_min_count : int, optional
the lower bound number for adding a word into vocabulary, by default 1
do_tokenize : bool, optional
wheather tokenize items before updating vocab, by default True
Returns
-------
list
token_items
"""
token_items = self.tokenize(items, key) if do_tokenize else [key(item) for item in items]
self.vocab.set_vocab(corpus_items=token_items, trim_min_count=trim_min_count, lower=lower)
return token_items
[docs] def add_specials(self, tokens):
"""Add special tokens into vocabulary"""
self.vocab.add_specials(tokens)
[docs] def add_tokens(self, tokens):
"""Add tokens into vocabulary"""
self.vocab.add_tokens(tokens)
[docs]class EduDataset(Dataset):
"""The base class implements a Dataset, which package the `datasets.Dataset`
and provide more convenience, including parallel preprocessing, offline loadding and so on.
Parameters
----------
tokenizer :
PretrainedEduTokenizer or model-specific Pretrained Tokenizer
ds_disk_path : HFDataset, optional
the dataset_path to save dataset used by `datasets.Dataset`, by default None
items : Union[List[dict], List[str]], optional
input items to process, by default None
stem_key : str, optional
the content of items to process, by default "text"
label_key : Optional[str], optional
the labels of items to process, by default None
feature_keys : Optional[List[str]], optional
the additional features of items to remain, by default None
num_processor : int, optional
specific the number of cpus for parallel speedup, by default None
"""
def __init__(self, tokenizer, ds_disk_path: HFDataset = None,
items: Union[List[dict], List[str]] = None,
stem_key: str = "text", label_key: Optional[str] = None,
feature_keys: Optional[List[str]] = None,
num_processor: int = None, **kwargs):
self.tokenizer = tokenizer
feature_keys = [] if feature_keys is None else feature_keys
if items is not None:
assert ds_disk_path is None
if isinstance(items[0], dict):
assert stem_key is not None
raw_columns = set(items[0].keys())
if isinstance(items[0], str):
assert stem_key is None and label_key is None
stem_key = "text"
raw_columns = set([stem_key])
work_columns = set([stem_key] + feature_keys + ([label_key] if label_key is not None else []))
redundant_columns = raw_columns - work_columns
# 在线预处理特征
items = items if isinstance(items[0], dict) else [{"text": i} for i in items]
df = pd.DataFrame(items)
df.drop(columns=list(redundant_columns), inplace=True)
self.ds = HFDataset.from_pandas(df)
"""Note: map will break down for super large data which is greater than 4GB """
self.ds = self.ds.map(lambda sample: tokenizer(sample[stem_key], return_tensors=False),
num_proc=num_processor,
batched=True, batch_size=1000)
remove_columns = [stem_key]
else:
# 离线加载工作特征
assert ds_disk_path is not None
self.ds = load_from_disk(ds_disk_path)
reserve_columns = list(tokenizer("edunlp", return_tensors=False).keys())\
+ feature_keys + ([label_key] if label_key is not None else [])
remove_columns = list(set(self.ds.column_names) - set(reserve_columns))
# 工作特征
self.work_ds = self.ds.remove_columns(remove_columns) if len(remove_columns) > 0 else self.ds
if label_key is not None:
self.work_ds = self.work_ds.rename_columns({
label_key: "labels",
})
def __getitem__(self, index):
return self.work_ds[index]
def __len__(self):
return self.work_ds.num_rows
[docs] def to_disk(self, ds_disk_path):
"""Save the processed dataset into local files"""
self.ds.save_to_disk(ds_disk_path)
[docs] def collect_fn(self):
raise NotImplementedError