import torch
from torch import nn
from torch.nn.utils.rnn import pad_packed_sequence, pack_padded_sequence
from baize.torch import load_net
import torch.nn.functional as F
import json
import os
from typing import List
from transformers.modeling_outputs import ModelOutput
from transformers import PretrainedConfig
from typing import Optional
from ..base_model import BaseModel
from ..utils import torch_utils as mytorch
from .harnn import HAM
__all__ = ["LM", "ElmoLM", "ElmoLMForPreTraining", "ElmoLMForPropertyPrediction", "ElmoLMForKnowledgePrediction"]
[docs]class LM(nn.Module):
"""
Parameters
----------
rnn_type:str
Legal types including RNN, LSTM, GRU, BiLSTM
vocab_size: int
embedding_dim: int
hidden_size: int
num_layers
bidirectional
embedding
model_params
kwargs
Examples
--------
>>> import torch
>>> seq_idx = torch.LongTensor([[1, 2, 3], [1, 2, 0], [3, 0, 0]])
>>> seq_len = torch.LongTensor([3, 2, 1])
>>> lm = LM("RNN", 4, 3, 2)
>>> output, hn = lm(seq_idx, seq_len)
>>> output.shape
torch.Size([3, 3, 2])
>>> hn.shape
torch.Size([1, 3, 2])
>>> lm = LM("RNN", 4, 3, 2, num_layers=2)
>>> output, hn = lm(seq_idx, seq_len)
>>> output.shape
torch.Size([3, 3, 2])
>>> hn.shape
torch.Size([2, 3, 2])
"""
def __init__(self, rnn_type: str, vocab_size: int, embedding_dim: int, hidden_size: int, num_layers=1,
bidirectional=False, embedding=None, model_params=None, use_pack_pad=True, **kwargs):
super(LM, self).__init__()
rnn_type = rnn_type.upper()
self.embedding = torch.nn.Embedding(vocab_size, embedding_dim) if embedding is None else embedding
self.use_pack_pad = use_pack_pad
self.c = False
if rnn_type == "RNN":
self.rnn = torch.nn.RNN(
embedding_dim, hidden_size, num_layers, bidirectional=bidirectional, **kwargs
)
elif rnn_type == "LSTM":
self.rnn = torch.nn.LSTM(
embedding_dim, hidden_size, num_layers, bidirectional=bidirectional, **kwargs
)
self.c = True
elif rnn_type == "GRU":
self.rnn = torch.nn.GRU(
embedding_dim, hidden_size, num_layers, bidirectional=bidirectional, **kwargs
)
elif rnn_type == "BILSTM":
bidirectional = True
self.rnn = torch.nn.LSTM(
embedding_dim, hidden_size, num_layers, bidirectional=bidirectional, **kwargs
)
self.c = True
else:
raise TypeError("Unknown rnn_type %s" % rnn_type)
self.num_layers = num_layers
self.bidirectional = bidirectional
if bidirectional is True:
self.num_layers *= 2
self.hidden_size = hidden_size
if model_params:
load_net(model_params, self, allow_missing=True)
[docs] def forward(self, seq_idx, seq_len):
"""
Parameters
----------
seq_idx:Tensor
a list of indices
seq_len:Tensor
length
Returns
--------
sequence
a PackedSequence object
"""
seq = self.embedding(seq_idx)
if self.use_pack_pad:
seq_or_pack = pack_padded_sequence(seq, seq_len.cpu(), batch_first=True, enforce_sorted=False)
else:
seq_or_pack = seq
h0 = torch.zeros(self.num_layers, seq.shape[0], self.hidden_size).to(seq_idx.device)
if self.c is True:
c0 = torch.zeros(self.num_layers, seq.shape[0], self.hidden_size).to(seq_idx.device)
output, (hn, _) = self.rnn(seq_or_pack, (h0, c0))
else:
output, hn = self.rnn(seq_or_pack, h0)
if self.use_pack_pad:
output, _ = pad_packed_sequence(output, batch_first=True)
return output, hn
class ElmoLMOutput(ModelOutput):
"""
Output type of [`ElmoLM`]
Parameters
----------
pred_forward: of shape (batch_size, sequence_length)
pred_backward: of shape (batch_size, sequence_length)
forward_output: of shape (batch_size, sequence_length, hidden_size)
backward_output: of shape (batch_size, sequence_length, hidden_size)
"""
pred_forward: torch.FloatTensor = None
pred_backward: torch.FloatTensor = None
forward_output: torch.FloatTensor = None
backward_output: torch.FloatTensor = None
[docs]class ElmoLM(BaseModel):
base_model_prefix = 'elmo'
def __init__(self, vocab_size: int, embedding_dim: int, hidden_size: int, num_layers: int = 2,
dropout_rate: float = 0.5, use_pack_pad=False, **kwargs):
super(ElmoLM, self).__init__()
self.LM_layer = LM("BiLSTM", vocab_size, embedding_dim, hidden_size, num_layers=num_layers,
use_pack_pad=use_pack_pad, **kwargs)
self.pred_layer = nn.Linear(hidden_size, vocab_size)
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.hidden_size = hidden_size
self.dropout = nn.Dropout(dropout_rate)
self.config = {k: v for k, v in locals().items() if k != "self" and k != "__class__" and k != "kwargs"}
self.config.update(kwargs)
self.config['architecture'] = 'ElmoLM'
self.config = PretrainedConfig.from_dict(self.config)
[docs] def forward(self, seq_idx=None, seq_len=None) -> ModelOutput:
"""
Parameters
----------
seq_idx:Tensor, of shape (batch_size, sequence_length)
a list of indices
seq_len:Tensor, of shape (batch_size)
length
Returns
----------
ElmoLMOutput
pred_forward: of shape (batch_size, sequence_length)
pred_backward: of shape (batch_size, sequence_length)
forward_output: of shape (batch_size, sequence_length, hidden_size)
backward_output: of shape (batch_size, sequence_length, hidden_size)
"""
lm_output, _ = self.LM_layer(seq_idx, seq_len)
forward_output = lm_output[:, :, :self.hidden_size]
backward_output = lm_output[:, :, self.hidden_size:]
forward_output = self.dropout(forward_output)
backward_output = self.dropout(backward_output)
pred_forward = self.pred_layer(forward_output)
pred_backward = self.pred_layer(backward_output)
return ElmoLMOutput(
pred_forward=pred_forward,
pred_backward=pred_backward,
forward_output=forward_output,
backward_output=backward_output
)
[docs] @classmethod
def from_config(cls, config_path, **kwargs):
with open(config_path, "r", encoding="utf-8") as rf:
model_config = json.load(rf)
model_config.update(kwargs)
return cls(
vocab_size=model_config['vocab_size'],
embedding_dim=model_config['embedding_dim'],
hidden_size=model_config['hidden_size'],
dropout_rate=model_config['dropout_rate'],
batch_first=model_config['batch_first']
)
class ElmoLMForPreTrainingOutput(ModelOutput):
"""
Output type of [`ElmoLMForPreTraining`].
Parameters
----------
loss:
pred_forward: of shape (batch_size, sequence_length)
pred_backward: of shape (batch_size, sequence_length)
forward_output: of shape (batch_size, sequence_length, hidden_size)
backward_output: of shape (batch_size, sequence_length, hidden_size)
"""
loss: torch.FloatTensor = None
pred_forward: torch.FloatTensor = None
pred_backward: torch.FloatTensor = None
forward_output: torch.FloatTensor = None
backward_output: torch.FloatTensor = None
[docs]class ElmoLMForPreTraining(BaseModel):
base_model_prefix = 'elmo'
def __init__(self, vocab_size: int, embedding_dim: int, hidden_size: int, dropout_rate: float = 0.5,
batch_first=True, use_pack_pad=False, **kwargs):
super(ElmoLMForPreTraining, self).__init__()
self.elmo = ElmoLM(
vocab_size=vocab_size,
embedding_dim=embedding_dim,
hidden_size=hidden_size,
dropout_rate=dropout_rate,
batch_first=batch_first,
use_pack_pad=use_pack_pad,
**kwargs
)
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.hidden_size = hidden_size
self.criterion = nn.CrossEntropyLoss()
self.config = {k: v for k, v in locals().items() if k != "self" and k != "__class__" and k != "kwargs"}
self.config.update(kwargs)
self.config['architecture'] = 'ElmoLMForPreTraining'
self.config = PretrainedConfig.from_dict(self.config)
[docs] def forward(self, seq_idx=None, seq_len=None) -> ModelOutput:
"""
Parameters
----------
seq_idx:Tensor, of shape (batch_size, sequence_length)
a list of indices
seq_len:Tensor, of shape (batch_size)
length
pred_mask : Tensor, of shape(batch_size, sequence_length)
idx_mask : Tensor, of shape (batch_size, sequence_length)
Returns
-------
ElmoLMForPreTrainingOutput
loss
pred_forward: of shape (batch_size, sequence_length)
pred_backward: of shape (batch_size, sequence_length)
forward_output: of shape (batch_size, sequence_length, hidden_size)
backward_output: of shape (batch_size, sequence_length, hidden_size)
"""
batch_size, idx_len = seq_idx.shape
max_len = seq_len.max().item() if self.config.use_pack_pad is True else idx_len
# Note:
# pred_mask matters when LM use pack_pad,
# but it will break down for parallel GPU because of different seq_len between gpus
pred_mask = torch.arange(max_len, device=seq_idx.device)[None, :] < seq_len[:, None]
idx_mask = torch.arange(idx_len, device=seq_idx.device)[None, :] < seq_len[:, None]
pred_forward_mask = pred_mask.clone()
pred_forward_mask[torch.arange(batch_size).unsqueeze(1), seq_len.unsqueeze(1) - 1] = False
pred_backward_mask = pred_mask.clone()
pred_backward_mask[torch.arange(batch_size).unsqueeze(1), 0] = False
idx_forward_mask = idx_mask.clone()
idx_forward_mask[torch.arange(batch_size).unsqueeze(1), 0] = False
idx_backward_mask = idx_mask.clone()
idx_backward_mask[torch.arange(batch_size).unsqueeze(1), seq_len.unsqueeze(1) - 1] = False
outputs = self.elmo(seq_idx, seq_len)
pred_forward, pred_backward = outputs.pred_forward, outputs.pred_backward
flat_pred_forward = pred_forward[pred_forward_mask]
flat_pred_backward = pred_backward[pred_backward_mask]
# _, flat_pred_idx_forward = torch.max(flat_pred_forward, dim=1)
# _, flat_pred_idx_backward = torch.max(flat_pred_backward, dim=1)
flat_y_backward = seq_idx[idx_backward_mask]
flat_y_forward = seq_idx[idx_forward_mask]
# diff_forward = torch.sum(flat_pred_idx_forward - flat_y_forward)
# diff_backward = torch.sum(flat_pred_idx_backward - flat_y_backward)
forward_loss = self.criterion(flat_pred_forward, flat_y_forward)
backward_loss = self.criterion(flat_pred_backward, flat_y_backward)
loss = forward_loss + backward_loss
return ElmoLMForPreTrainingOutput(
loss=loss,
pred_forward=pred_forward,
pred_backward=pred_backward,
forward_output=outputs.forward_output,
backward_output=outputs.backward_output
)
[docs] @classmethod
def from_config(cls, config_path, **kwargs):
with open(config_path, "r", encoding="utf-8") as rf:
model_config = json.load(rf)
model_config.update(kwargs)
return cls(
vocab_size=model_config['vocab_size'],
embedding_dim=model_config['embedding_dim'],
hidden_size=model_config['hidden_size'],
dropout_rate=model_config['dropout_rate'],
batch_first=model_config['batch_first']
)
class PropertyPredictionOutput(ModelOutput):
loss: torch.FloatTensor = None
logits: torch.FloatTensor = None
[docs]class ElmoLMForPropertyPrediction(BaseModel):
base_model_prefix = 'elmo'
def __init__(self, vocab_size: int, embedding_dim: int, hidden_size: int, dropout_rate: float = 0.5,
batch_first=True, head_dropout=0.5, **kwargs):
super(ElmoLMForPropertyPrediction, self).__init__()
self.elmo = ElmoLM(
vocab_size=vocab_size,
embedding_dim=embedding_dim,
hidden_size=hidden_size,
dropout_rate=dropout_rate,
batch_first=batch_first
)
self.head_dropout = head_dropout
self.dropout = nn.Dropout(head_dropout)
self.classifier = nn.Linear(2 * hidden_size, 1)
self.sigmoid = nn.Sigmoid()
self.criterion = nn.MSELoss()
self.config = {k: v for k, v in locals().items() if k != "self" and k != "__class__" and k != "kwargs"}
self.config.update(kwargs)
self.config['architecture'] = 'ElmoLMForPreTraining'
self.config = PretrainedConfig.from_dict(self.config)
[docs] def forward(self, seq_idx=None, seq_len=None, labels=None) -> ModelOutput:
outputs = self.elmo(seq_idx, seq_len)
item_embeds = torch.cat(
(outputs.forward_output[torch.arange(len(seq_len)), torch.tensor(seq_len) - 1],
outputs.backward_output[torch.arange(len(seq_len)), 0]),
dim=-1)
item_embeds = self.dropout(item_embeds)
logits = self.sigmoid(self.classifier(item_embeds))
loss = None
if labels is not None:
loss = self.criterion(logits, labels)
return PropertyPredictionOutput(
loss=loss,
logits=logits
)
[docs] @classmethod
def from_config(cls, config_path, **kwargs):
with open(config_path, "r", encoding="utf-8") as rf:
model_config = json.load(rf)
model_config.update(kwargs)
return cls(
vocab_size=model_config.get('vocab_size'),
embedding_dim=model_config.get('embedding_dim'),
hidden_size=model_config.get('hidden_size'),
dropout_rate=model_config.get('dropout_rate'),
batch_first=model_config.get('batch_first'),
head_dropout=model_config.get('head_dropout', 0.5),
)
class KnowledgePredictionOutput(ModelOutput):
loss: torch.FloatTensor = None
logits: torch.FloatTensor = None
[docs]class ElmoLMForKnowledgePrediction(BaseModel):
base_model_prefix = 'elmo'
def __init__(self, vocab_size: int,
embedding_dim: int,
hidden_size: int,
num_classes_list: List[int],
num_total_classes: int,
dropout_rate: float = 0.5,
batch_first=True,
head_dropout: Optional[float] = 0.5,
flat_cls_weight: Optional[float] = 0.5,
attention_unit_size: Optional[int] = 256,
fc_hidden_size: Optional[int] = 512,
beta: Optional[float] = 0.5,
**kwargs):
super(ElmoLMForKnowledgePrediction, self).__init__()
self.elmo = ElmoLM(
vocab_size=vocab_size,
embedding_dim=embedding_dim,
hidden_size=hidden_size,
dropout_rate=dropout_rate,
batch_first=batch_first
)
self.head_dropout = head_dropout
self.dropout = nn.Dropout(head_dropout)
self.sigmoid = nn.Sigmoid()
self.criterion = nn.MSELoss()
self.flat_classifier = nn.Linear(in_features=2 * hidden_size, out_features=num_total_classes)
self.ham_classifier = HAM(
num_classes_list=num_classes_list,
num_total_classes=num_total_classes,
sequence_model_hidden_size=hidden_size * 2,
attention_unit_size=attention_unit_size,
fc_hidden_size=fc_hidden_size,
beta=beta,
dropout_rate=dropout_rate
)
self.flat_cls_weight = flat_cls_weight
self.num_classes_list = num_classes_list
self.num_total_classes = num_total_classes
self.config = {k: v for k, v in locals().items() if k != "self" and k != "__class__" and k != "kwargs"}
self.config.update(kwargs)
self.config['architecture'] = 'ElmoLMForPreTraining'
self.config = PretrainedConfig.from_dict(self.config)
[docs] def forward(self, seq_idx=None, seq_len=None, labels=None) -> ModelOutput:
outputs = self.elmo(seq_idx, seq_len)
item_embeds = torch.cat(
(outputs.forward_output[torch.arange(len(seq_len)), torch.tensor(seq_len) - 1],
outputs.backward_output[torch.arange(len(seq_len)), 0]),
dim=-1)
tokens_embeds = torch.cat((outputs.forward_output, outputs.backward_output), dim=-1)
item_embeds = self.dropout(item_embeds)
tokens_embeds = self.dropout(tokens_embeds)
flat_logits = self.sigmoid(self.flat_classifier(item_embeds))
ham_outputs = self.ham_classifier(tokens_embeds)
ham_logits = self.sigmoid(ham_outputs.scores)
logits = self.flat_cls_weight * flat_logits + (1 - self.flat_cls_weight) * ham_logits
loss = None
if labels is not None:
labels = torch.sum(torch.nn.functional.one_hot(labels, num_classes=self.num_total_classes), dim=1)
labels = labels.float()
loss = self.criterion(logits, labels)
return KnowledgePredictionOutput(
loss=loss,
logits=logits
)
[docs] @classmethod
def from_config(cls, config_path, **kwargs):
with open(config_path, "r", encoding="utf-8") as rf:
model_config = json.load(rf)
model_config.update(kwargs)
return cls(
vocab_size=model_config.get('vocab_size'),
embedding_dim=model_config.get('embedding_dim'),
hidden_size=model_config.get('hidden_size'),
num_total_classes=model_config.get('num_total_classes'),
num_classes_list=model_config.get('num_classes_list'),
dropout_rate=model_config.get('dropout_rate'),
batch_first=model_config.get('batch_first'),
head_dropout=model_config.get('head_dropout', 0.5),
flat_cls_weight=model_config.get('flat_cls_weight', 0.5),
attention_unit_size=model_config.get('attention_unit_size', 256),
fc_hidden_size=model_config.get('fc_hidden_size', 512),
beta=model_config.get('beta', 0.5),
)