Source code for easytransfer.preprocessors.classification_regression_preprocessor

# coding=utf-8
# Copyright (c) 2019 Alibaba PAI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from collections import OrderedDict
import tensorflow as tf
from .preprocessor import Preprocessor, PreprocessorConfig, truncate_seq_pair
from .tokenization import convert_to_unicode


[docs]class ClassificationRegressionPreprocessorConfig(PreprocessorConfig): def __init__(self, **kwargs): super(ClassificationRegressionPreprocessorConfig, self).__init__(**kwargs) self.input_schema = kwargs.get("input_schema") self.sequence_length = kwargs.get("sequence_length") self.first_sequence = kwargs.get("first_sequence") self.second_sequence = kwargs.get("second_sequence") self.label_name = kwargs.get("label_name") self.label_enumerate_values = kwargs.get("label_enumerate_values")
[docs]class ClassificationRegressionPreprocessor(Preprocessor): """ Preprocessor for classification/regression task """ config_class = ClassificationRegressionPreprocessorConfig def __init__(self, config, **kwargs): Preprocessor.__init__(self, config, **kwargs) self.config = config self.input_tensor_names = [] for schema in config.input_schema.split(","): name = schema.split(":")[0] self.input_tensor_names.append(name) self.label_idx_map = OrderedDict() if self.config.label_enumerate_values is not None: for (i, label) in enumerate(self.config.label_enumerate_values.split(",")): self.label_idx_map[convert_to_unicode(label)] = i if hasattr(self.config, "multi_label") and self.config.multi_label is True: self.multi_label = True self.max_num_labels = self.config.max_num_labels if hasattr(self.config, "max_num_labels") else 5 else: self.multi_label = False self.max_num_labels = None
[docs] def set_feature_schema(self): if self.mode.startswith("predict") or self.mode == "preprocess": self.output_schema = self.config.output_schema self.output_tensor_names = ["input_ids", "input_mask", "segment_ids", "label_id"] if self.multi_label: self.seq_lens = [self.config.sequence_length] * 3 + [self.max_num_labels] self.feature_value_types = [tf.int64] * 3 + [tf.int64] else: self.seq_lens = [self.config.sequence_length] * 3 + [1] if len(self.label_idx_map) >= 2: self.feature_value_types = [tf.int64] * 4 else: self.feature_value_types = [tf.int64] * 3 + [tf.float32]
[docs] def convert_example_to_features(self, items): """ Convert single example to classifcation/regression features Args: items (`dict`): inputs from the reader Returns: features (`tuple`): (input_ids, input_mask, segment_ids, label_id) """ text_a = items[self.input_tensor_names.index(self.config.first_sequence)] tokens_a = self.config.tokenizer.tokenize(convert_to_unicode(text_a)) if self.config.second_sequence in self.input_tensor_names: text_b = items[self.input_tensor_names.index(self.config.second_sequence)] tokens_b = self.config.tokenizer.tokenize(convert_to_unicode(text_b)) truncate_seq_pair(tokens_a, tokens_b, self.config.sequence_length - 3) else: if len(tokens_a) > self.config.sequence_length - 2: tokens_a = tokens_a[0:(self.config.sequence_length - 2)] tokens_b = None tokens = [] segment_ids = [] tokens.append("[CLS]") segment_ids.append(0) for token in tokens_a: tokens.append(token) segment_ids.append(0) tokens.append("[SEP]") segment_ids.append(0) if tokens_b: for token in tokens_b: tokens.append(token) segment_ids.append(1) tokens.append("[SEP]") segment_ids.append(1) input_ids = self.config.tokenizer.convert_tokens_to_ids(tokens) # The mask has 1 for real tokens and 0 for padding tokens. Only real # tokens are attended to. input_mask = [1] * len(input_ids) # Zero-pad up to the sequence length. while len(input_ids) < self.config.sequence_length: input_ids.append(0) input_mask.append(0) segment_ids.append(0) assert len(input_ids) == self.config.sequence_length assert len(input_mask) == self.config.sequence_length assert len(segment_ids) == self.config.sequence_length if self.config.label_name is not None: label_value = items[self.input_tensor_names.index(self.config.label_name)] if isinstance(label_value, str) or isinstance(label_value, bytes): label = convert_to_unicode(label_value) else: label = str(label_value) if self.multi_label: label_ids = [self.label_idx_map[convert_to_unicode(x)] for x in label.split(",") if x] label_ids = label_ids[:self.max_num_labels] label_ids = label_ids + [-1 for _ in range(self.max_num_labels - len(label_ids))] label_ids = [str(t) for t in label_ids] label_id = ' '.join(label_ids) elif len(self.label_idx_map) >= 2: label_id = str(self.label_idx_map[convert_to_unicode(label)]) else: label_id = label else: label_id = '0' return ' '.join([str(t) for t in input_ids]), \ ' '.join([str(t) for t in input_mask]), \ ' '.join([str(t) for t in segment_ids]), label_id
[docs]class PairedClassificationRegressionPreprocessor(ClassificationRegressionPreprocessor): """ Preprocessor for paired classification/regression task """ config_class = ClassificationRegressionPreprocessorConfig def __init__(self, config, **kwargs): super(PairedClassificationRegressionPreprocessor, self).__init__(config, **kwargs)
[docs] def set_feature_schema(self): if self.mode.startswith("predict") or self.mode == "preprocess": self.output_schema = self.config.output_schema self.output_tensor_names = ["input_ids", "input_mask", "segment_ids", "label_id"] self.seq_lens = [self.config.sequence_length] * 6 + [1] if len(self.label_idx_map) >= 2: self.feature_value_types = [tf.int64] * 6 + [tf.int64] else: self.feature_value_types = [tf.int64] * 6 + [tf.float32]
[docs] def convert_example_to_features(self, items): """ Convert single example to classifcation/regression features Args: items (`dict`): inputs from the reader Returns: features (`tuple`): (input_ids_a, input_mask_a, segment_ids_a, input_ids_b, input_mask_b, segment_ids_b, label_id) """ assert self.config.first_sequence in self.input_tensor_names \ and self.config.second_sequence in self.input_tensor_names text_a = items[self.input_tensor_names.index(self.config.first_sequence)] tokens_a = self.config.tokenizer.tokenize(convert_to_unicode(text_a)) text_b = items[self.input_tensor_names.index(self.config.second_sequence)] tokens_b = self.config.tokenizer.tokenize(convert_to_unicode(text_b)) # Account for [CLS] and [SEP] with "- 2" if len(tokens_a) > self.config.sequence_length - 2: tokens_a = tokens_a[0:(self.config.sequence_length - 2)] if len(tokens_b) > self.config.sequence_length - 2: tokens_b = tokens_b[0:(self.config.sequence_length - 2)] tokens = [] segment_ids_a = [] tokens.append("[CLS]") segment_ids_a.append(0) for token in tokens_a: tokens.append(token) segment_ids_a.append(0) tokens.append("[SEP]") segment_ids_a.append(0) input_ids_a = self.config.tokenizer.convert_tokens_to_ids(tokens) tokens = [] segment_ids_b = [] if tokens_b: for token in tokens_b: tokens.append(token) segment_ids_b.append(1) tokens.append("[SEP]") segment_ids_b.append(1) input_ids_b = self.config.tokenizer.convert_tokens_to_ids(tokens) # The mask has 1 for real tokens and 0 for padding tokens. Only real # tokens are attended to. input_mask_a = [1] * len(input_ids_a) input_mask_b = [1] * len(input_ids_b) # Zero-pad up to the sequence length. while len(input_ids_a) < self.config.sequence_length: input_ids_a.append(0) input_mask_a.append(0) segment_ids_a.append(0) # Zero-pad up to the sequence length. while len(input_ids_b) < self.config.sequence_length: input_ids_b.append(0) input_mask_b.append(0) segment_ids_b.append(0) assert len(input_ids_a) == self.config.sequence_length assert len(input_mask_a) == self.config.sequence_length assert len(segment_ids_a) == self.config.sequence_length assert len(input_ids_b) == self.config.sequence_length assert len(input_mask_b) == self.config.sequence_length assert len(segment_ids_b) == self.config.sequence_length # support single/multi classification and regression if self.config.label_name is not None: label_value = items[self.input_tensor_names.index(self.config.label_name)] if isinstance(label_value, str) or isinstance(label_value, bytes): label = convert_to_unicode(label_value) else: label = str(label_value) if len(self.label_idx_map) >= 2: label_id = str(self.label_idx_map[convert_to_unicode(label)]) else: label_id = label else: label_id = '0' return ' '.join([str(t) for t in input_ids_a]), \ ' '.join([str(t) for t in input_mask_a]), \ ' '.join([str(t) for t in segment_ids_a]), \ ' '.join([str(t) for t in input_ids_b]), \ ' '.join([str(t) for t in input_mask_b]), \ ' '.join([str(t) for t in segment_ids_b]), label_id