単純テキスト分類Simple Text Classification
プロジェクトのアドレス:https://github.com/Daya-Jin/DL_for_learner元ブログ:https://daya-jin.github.io/2019/03/12/TextClassification/
概要
概要
元のテキストファイルは、プロジェクトディレクトリの./dataset/news_CN/
の下にあり、各行のフォーマットは{label}\t{text}
です.たとえば、次のようになります. \t 15 65
プリプロセッシング
分詞
テキストタスクの場合、最も基本的な前処理は分詞であり、ここではjieba
オープンソース分詞ライブラリを使用して完了する.def gen_seg_file(file_in, file_out):
'''
:param file_in:
:param file_out: , ' '
:return:
'''
with open(file_in, 'r', encoding='utf-8') as fd:
text = fd.readlines()
with open(file_out, 'w', encoding='utf-8') as fd:
for line in text:
label, data = line.strip().split('\t')
words = jieba.cut(data)
words_trans = ''
#
for word in words:
word = word.strip()
if word != '':
words_trans += word + ' '
out_line = '{}\t{}
'.format(label, words_trans.strip())
fd.write(out_line)
辞書
分詞の後、データをフォーマットする必要があります.では、最も簡単なフォーマットは、単語ごとに整形符号化を行い、単語ごとに唯一の数字に対応しています.labelの場合もフォーマットが必要です.
整形符号化を実現するためには,単語と数字のマッピングテーブル,カテゴリと数字のマッピングテーブルという辞書を構築する必要がある.同時に、すべての可能な単語を含む辞書は巨大であり、実際にはこのような大きな記憶オーバーヘッドを受け入れることは不可能であるため、実際の辞書は一部の単語しか記録されず、ここでは周波数でどの単語を記録するかを選択する.このほか,辞書では未知語を符号化できる必要があり,ここでは未知語を0 0 0に統一的に符号化する.def gen_vocab(file_in, file_out):
'''
, 'idx word word_cnt'
:param file_in:
:param file_out:
'''
with open(file_in, 'r', encoding='utf-8') as fd:
text = fd.readlines()
word_dict = dict()
for line in text:
_, data = line.strip().split('\t')
for word in data.split():
word_dict.setdefault(word, 0)
word_dict[word] += 1
word_dict = sorted(word_dict.items(), key=lambda x: x[1], #
reverse=True)
with open(file_out, 'w', encoding='utf-8') as fd:
fd.write('0\t\t99999
' )
for idx, item in enumerate(word_dict):
fd.write('{}\t{}\t{}
'.format(idx + 1, item[0], item[1]))
カテゴリ辞書の構築は簡単で、直接1つ1つマッピングすればいいです.def gen_cat(file_in, file_out):
'''
:param file_in:
:param file_out:
:return:
'''
with open(file_in, 'r', encoding='utf-8') as fd:
text = fd.readlines()
label_dict = dict()
for line in text:
label, _ = line.strip().split('\t')
label_dict.setdefault(label, 0)
label_dict[label] += 1
label_dict = sorted(label_dict.items(), key=lambda x: x[1],
reverse=True)
with open(file_out, 'w', encoding='utf-8') as fd:
for idx, item in enumerate(label_dict):
fd.write('{}\t{}\t{}
'.format(idx, item[0], item[1]))
これで、元のファイルに対する前処理は終了します.
エンコーディング
本文とlabelに対して,それぞれ2つの符号化器をカプセル化する.
Text Encoder
テキストエンコーダでは,符号化と復号化を実現するとともに,単語と文レベルの機能を満たす必要がある.符号化と復号化は、それぞれ2つの辞書によって実現される.self._word2id = dict()
self._id2word = dict()
次に、外部に露出するコアAPIは4つあります.def word2id(self, word: str):
'''
:param word:
:return:
'''
return self._word2id.get(word, self._unk)
def id2word(self, idx: int):
'''
:param idx:
:return:
'''
return self._id2word.get(idx, '' )
def s2id(self, s: str):
'''
:param s:
:return:
'''
return [self.word2id(word) for word in s.split(' ')]
def id2s(self, idxs) -> str:
'''
:param idxs:
:return:
'''
return ' '.join([self.id2word(idx) for idx in idxs])
Label Encoder
同様に、カテゴリエンコーダの実装も辞書に依存する.self._cat2id = dict()
露出されたコアAPIはエンコーダです.def cat2id(self, cat):
if cat not in self._cat2id:
raise Exception('{} is not in cat'.format(cat))
else:
return self._cat2id[cat]
データクラス
以前に実装されたいくつかのCNNインスタンスと同様に、データの管理を容易にするために、Data
クラスが作成され、データがクラスに読み込まれ、コアAPIがnext_batch()
であるbatchの生成も担当する.
なお、シーケンスデータを処理する場合、feedからネットワーク内の各データ次元(時間次元と特徴次元)は同じであるべきである.長さを超えたデータについては、遮断します.長さが足りないデータには、記入します.label, content = line.strip().split('\t')
x = self._vocal.s2id(content)
y = self._cat_dict.cat2id(label)
x = x[:self._t_size]
n_pad = self._t_size - len(x) #
x = x + [self._vocal.unk for _ in range(n_pad)]
上記のコードでは、n_pad<=0
の場合、最後の行のリスト生成式は有効になりません.
モデル設計
テキスト分類問題は,実際にRNNにおけるmany to one問題に属する.すなわちRNN部の入力r n_i n p u t s rnn\_inputs rnn_inputsは複数の時間状態を有し、RNN部の出力r n n_o u t p u t s rnn\_outputs rnn_outputsは最後の時間状態の出力のみを取得します.
同時にテキストの処理に対してembeddingは迂回できない操作である.では、簡単なLSTMネットワークを設計し、まず入力X X Xに対してembeddingを行い、X e m b X_を得る.Emb Xe mb、そしてX e m b X_Emb Xe mbはLSTMネットワークに搬送され,FC層に接続され,分類結果が得られる.モデル構造は、次の図のようになります.
ネットワーク構造を決定した後、各層のデータストリームの次元に注意するだけでよい.
モデル構築
まずplaceholder
であり,テキスト入力としてのX X Xは時間次元を持ち,予測される目標変数はスカラーである.X = tf.placeholder(tf.int32, [None, params.t_size])
Y = tf.placeholder(tf.int64, [None])
埋め込みレイヤの入力次元はonehotベクトルの次元であり、出力次元は埋め込み次元である.テキストデータの場合、onehotベクトルの次元は辞書のサイズに等しい.emb_lookup = tf.get_variable('embedding', [vocal_size, params.emb_size],
dtype=tf.float32)
emb = tf.nn.embedding_lookup(emb_lookup, X) # (batch_size,t_size,emb_size)
次にLSTMレイヤ:lstm_layers = list()
for i in range(params.lstm_layers):
layer = tf.nn.rnn_cell.LSTMCell(params.lstm_size[i])
lstm_layers.append(layer)
lstm_layers = tf.nn.rnn_cell.MultiRNNCell(lstm_layers)
RNNの多対一の問題は、RNNネットワークの最後の層の最後の時間状態での出力のみを取り出す.lstm_outputs, _ = tf.nn.dynamic_rnn(lstm_layers,
inputs=emb, dtype=tf.float32)
lstm_outputs = lstm_outputs[:, -1, :]
後接FC層:fc = tf.layers.dense(lstm_outputs, params.fc_size, activation=tf.nn.relu)
最終出力:logits = tf.layers.dense(fc, unit_O, activation=None) # ,
多分類タスク、softmax損失関数を使用する:loss = tf.losses.sparse_softmax_cross_entropy(labels=Y, logits=logits)
以上がコアコードであり、完全なコードはこれを参照してください.
def gen_seg_file(file_in, file_out):
'''
:param file_in:
:param file_out: , ' '
:return:
'''
with open(file_in, 'r', encoding='utf-8') as fd:
text = fd.readlines()
with open(file_out, 'w', encoding='utf-8') as fd:
for line in text:
label, data = line.strip().split('\t')
words = jieba.cut(data)
words_trans = ''
#
for word in words:
word = word.strip()
if word != '':
words_trans += word + ' '
out_line = '{}\t{}
'.format(label, words_trans.strip())
fd.write(out_line)
def gen_vocab(file_in, file_out):
'''
, 'idx word word_cnt'
:param file_in:
:param file_out:
'''
with open(file_in, 'r', encoding='utf-8') as fd:
text = fd.readlines()
word_dict = dict()
for line in text:
_, data = line.strip().split('\t')
for word in data.split():
word_dict.setdefault(word, 0)
word_dict[word] += 1
word_dict = sorted(word_dict.items(), key=lambda x: x[1], #
reverse=True)
with open(file_out, 'w', encoding='utf-8') as fd:
fd.write('0\t\t99999
' )
for idx, item in enumerate(word_dict):
fd.write('{}\t{}\t{}
'.format(idx + 1, item[0], item[1]))
def gen_cat(file_in, file_out):
'''
:param file_in:
:param file_out:
:return:
'''
with open(file_in, 'r', encoding='utf-8') as fd:
text = fd.readlines()
label_dict = dict()
for line in text:
label, _ = line.strip().split('\t')
label_dict.setdefault(label, 0)
label_dict[label] += 1
label_dict = sorted(label_dict.items(), key=lambda x: x[1],
reverse=True)
with open(file_out, 'w', encoding='utf-8') as fd:
for idx, item in enumerate(label_dict):
fd.write('{}\t{}\t{}
'.format(idx, item[0], item[1]))
self._word2id = dict()
self._id2word = dict()
def word2id(self, word: str):
'''
:param word:
:return:
'''
return self._word2id.get(word, self._unk)
def id2word(self, idx: int):
'''
:param idx:
:return:
'''
return self._id2word.get(idx, '' )
def s2id(self, s: str):
'''
:param s:
:return:
'''
return [self.word2id(word) for word in s.split(' ')]
def id2s(self, idxs) -> str:
'''
:param idxs:
:return:
'''
return ' '.join([self.id2word(idx) for idx in idxs])
self._cat2id = dict()
def cat2id(self, cat):
if cat not in self._cat2id:
raise Exception('{} is not in cat'.format(cat))
else:
return self._cat2id[cat]
label, content = line.strip().split('\t')
x = self._vocal.s2id(content)
y = self._cat_dict.cat2id(label)
x = x[:self._t_size]
n_pad = self._t_size - len(x) #
x = x + [self._vocal.unk for _ in range(n_pad)]
X = tf.placeholder(tf.int32, [None, params.t_size])
Y = tf.placeholder(tf.int64, [None])
emb_lookup = tf.get_variable('embedding', [vocal_size, params.emb_size],
dtype=tf.float32)
emb = tf.nn.embedding_lookup(emb_lookup, X) # (batch_size,t_size,emb_size)
lstm_layers = list()
for i in range(params.lstm_layers):
layer = tf.nn.rnn_cell.LSTMCell(params.lstm_size[i])
lstm_layers.append(layer)
lstm_layers = tf.nn.rnn_cell.MultiRNNCell(lstm_layers)
lstm_outputs, _ = tf.nn.dynamic_rnn(lstm_layers,
inputs=emb, dtype=tf.float32)
lstm_outputs = lstm_outputs[:, -1, :]
fc = tf.layers.dense(lstm_outputs, params.fc_size, activation=tf.nn.relu)
logits = tf.layers.dense(fc, unit_O, activation=None) # ,
loss = tf.losses.sparse_softmax_cross_entropy(labels=Y, logits=logits)