tf-idf
简介
n: 某个词在文档中出现的次数
N: 文档中所有出现词出现的次数总和
D: 语料中所有的文档总数
d: 语料中出现某个词的文档数量
tf: 词频,文档中某个词出现频率越高,就越重要。
\[tf=\frac{n}{N}
\]
idf:某个词在所有文档中出现的频率越低,词越重要
\[idf=log(\frac{D}{d+1})
\]
tf-idf:值越大,词越重要
\[tfidf=tf·idf
\]
通常我们先在大量数据上训练idf,线上过程中,只需计算tf即可。
代码
import math
import os
from jieba import posseg
class TFIDF(object):
"""
用法:
1. 离线基于分好词的大规模预料,训练idf;
2. 加载idf;
3. 在线计算tf-idf;
"""
def __init__(self):
self.idf = {} # 存放词及其idf权重
self.idf_median = 0 # idf中位数,防止在线时遇到一些(训练时未见过的)新词
# 加载停用词
self._stop_words = set()
with open('stop_words.txt', 'r', encoding='utf-8') as f:
for line in f:
word = line.strip()
if not word:
continue
self._stop_words.add(word)
# 保留的词性
self._ALLOW_POS = ('ns', 'n', 'vn', 'v', 'i', 'a', 'ad', 'an')
def clean(self, text, min_word_len=1):
"""
清洗
:param text: str 文本
:param min_word_len: 最小化word长度
:return: words list
"""
words = []
for word, pos in posseg.cut(text):
if not word.strip():
continue
if word in self._stop_words:
continue
if len(word) < min_word_len:
continue
if pos not in self._ALLOW_POS:
continue
words.append(word)
return words
def compute_tfidf(self, words):
"""
在线计算tfidf
Args:
words: 分好词的一短文本,list
Returns:
list [('word1', weight1), ('word2', weight2), ...]
"""
# 确保idf已经被加载:idf不为空,并且idf中位数不为0
assert self.idf and self.idf_median, "请确保idf被加载!"
# 统计tf
tf = {}
for word in words:
tf[word] = tf.get(word, 0) + 1
# 从加载好的idf字典中取idf,计算tfidf
tfidf = {}
for word in set(words):
tfidf[word] = tf[word] / len(words) * self.idf.get(word, self.idf_median)
# 对所有词的tfidf排序,按照权重从高到低排序,返回
tfidf = sorted(tfidf.items(), key=lambda item: item[1], reverse=True)
return tfidf
def load_idf(self, idf_path, splitter=' '):
"""
加载idf
"""
with open(idf_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or len(line.split(splitter)) != 2: # 跳过为空的文本、不合法的文本
continue
term, idf = line.split(splitter)
self.idf[term] = float(idf)
self.idf_median = sorted(self.idf.values())[len(self.idf) // 2] # 计算idf的中位数
def train_idf(self, seg_files, output_file_name, splitter=' '):
"""
离线训练idf
Args:
seg_files: list 分过词的训练文件列表,txt格式,文档用\n换行
output_file_name: 输出的标准idf文本,txt格式
splitter: term之间分隔符,word和idf的分隔符
"""
# 总文档数初始化为0
doc_count = 0
# 统计df
for seg_file in seg_files: # 迭代所有文件
with open(seg_file, encoding='utf-8') as f:
for line in f: # 迭代每一行
line = line.strip()
if not line:
continue
doc_count += 1 # 更新总文档数
words = set(line.split(splitter))
for word in words:
self.idf[word] = self.idf.get(word, 0) + 1 # 更新当前word的文档频数
# 计算idf,保存到文件
with open(output_file_name, 'w', encoding='utf-8') as f:
for word, df in self.idf.items():
self.idf[word] = math.log(doc_count / (df + 1)) # 计算idf
f.write('{}{}{}\n'.format(word, splitter, self.idf[word]))
if __name__ == '__main__':
seg_files_dir = 'test_data/seg_data'
seg_files = ['{}/{}'.format(seg_files_dir, f) for f in os.listdir(seg_files_dir)]
tfidf = TFIDF()
# tfidf.train_idf(seg_files, 'idf.txt', ' ')
tfidf.load_idf('idf.txt', ' ')
sentence = "今天他给我说,我的放假旅游计划泡汤了,因为要封校"
# sl = jieba.lcut(sentence)
sl = tfidf.clean(sentence)
print(sl)
result = tfidf.compute_tfidf(sl)
print(result)
text-rank
PR(A)是页面A的PageRank值
d是阻尼系数,代表任意时刻,用户到达某页面后继续向后浏览的概率,通常d=0.85,1-d为停止点击,随机浏览新网页的概率
Ti是指向A的所有页面中的某个页面
PR(Ti)是页面Ti的PR值
C(Ti)是页面Ti的出度,Ti指向其他页面的边的个数
page-rank公式
\[PR(A)=(1-d)+d(PR(T_i)/C(T_i)+...+PR(T_n)/C(T_n))
\]
迭代几轮后,PR值会趋于收敛。
textrank,基于词语共现来构建无向图。入度=出度
经典计算案例
假设每个页面的PR初始值为1,d为0.5
PR(A)=PR(B)=PR(C)=1
\(PR(A)=0.5+0.5*PR(C)=0.5+0.5*1=1\)
\(PR(B)=0.5+0.5*(PR(A)/2)=0.75\)
\(PR(C)=0.5+0.5*(PR(A)/2+PR(B))=1.125\)
之后再依照新的PR(A),PR(B),PR(C)的值进行迭代计算。
迭代多次结果:
1.000 0.750 1.125
1.062 0.766 1.148
1.074 0.769 1.153
1.076 0.769 1.154
1.077 0.769 1.154
1.077 0.769 1.154
pra = prb = prc = 1
for i in range(10):
pra = 0.5 + 0.5 * prc
prb = 0.5 + 0.5 * 0.5 * pra
prc = 0.5 + 0.5 * (0.5 * pra + prb)
print(f"{pra:.3f}\t{prb:.3f}\t{prc:.3f}")
代码
import jieba
from jieba import posseg
class UndirectedWeightedGraph(object): # 无向带权图
def __init__(self, iter_num=15):
self.iter_num = iter_num # 设置迭代次数,默认为15次
self.graph = {} # 初始化图,格式为{node1: [edge1, edge2, ...], ...}
self.d = 0.85 # 阻尼系数,设置为0.85
def add_edge(self, start_node, end_node, weight):
"""
使用tuple作为边, 即(start_node, end_node, weight)
这里是无向的,所以同时为start_node和end_node插入边
"""
if start_node not in self.graph:
self.graph[start_node] = []
if end_node not in self.graph:
self.graph[end_node] = []
self.graph[start_node].append((start_node, end_node, weight))
self.graph[end_node].append((end_node, start_node, weight))
def rank(self):
node_ranks = {} # 节点的PR值
node_edge_sum_weight = {} # 节点的所有边的总weight
init_rank = 1.0 / (len(self.graph) or 1.0) # 设置初始rank值
for node, edges in self.graph.items():
node_ranks[node] = init_rank # 初始化所有node的PR值
node_edge_sum_weight[node] = sum((edge[2] for edge in edges)) # 计算当前node的所有边的总权重
nodes = sorted(self.graph.keys()) # 获取所有node,排个序
for iter_i in range(self.iter_num): # 迭代15次
for node in nodes: # 对于每一个node,计算其PR值
# 根据公式 (1 - d) + d * _sum,计算
_sum = 0
for (_, neighbor_node, weight) in self.graph[node]: # 迭代node的所有neighbor_node(所有有边相连的节点)
# 累加当前neighbor_node贡献的pr值
_sum += weight / node_edge_sum_weight[neighbor_node] * node_ranks[neighbor_node]
node_ranks[node] = (1 - self.d) + self.d * _sum
# 归一化rank,使用最大最小归一化(x - min) / (max - min)
min_rank, max_rank = min(node_ranks.values()), max(node_ranks.values())
for node, rank_val in node_ranks.items():
node_ranks[node] = (rank_val - min_rank) / (max_rank - min_rank)
return node_ranks
class TextRank(object):
def __init__(self):
# 加载停用词
self._stop_words = set()
with open('stop_words.txt', 'r', encoding='utf-8') as f:
for line in f:
word = line.strip()
if not word:
continue
self._stop_words.add(word)
# 保留的词性
self._ALLOW_POS = ('ns', 'n', 'vn', 'v', 'i', 'a', 'ad', 'an')
def clean(self, text, min_word_len=1):
"""
清洗
:param text: str 文本
:param min_word_len: 最小化word长度
:return: words list
"""
words = []
for word, pos in posseg.cut(text):
if not word.strip():
continue
if word in self._stop_words:
continue
if len(word) < min_word_len:
continue
if pos not in self._ALLOW_POS:
continue
words.append(word)
return words
@staticmethod
def _calculate_graph(elements_relation):
"""
根据词语共现,构建page-rank图,并计算pr值
"""
# 创建空的无向图
graph = UndirectedWeightedGraph()
# 根据共现,逐步添加向图中添加边
for (word1, word2), freq in elements_relation.items():
graph.add_edge(word1, word2, freq)
# 运行page-rank迭代
nodes_rank = graph.rank()
# 解析结果
results = sorted(nodes_rank.items(), key=lambda item: item[1], reverse=True)
return results
def textrank(self, elements, window_size=3):
"""
Args:
elements: text rank计算的元素集合
window_size: 窗口大小
Returns:
list [('院长', 1.0), ('渎职', 0.9980043667706294)]
"""
# 保存词之间的关系成为dict
# {
# (word1, word2): freq,
# ...
# }
elements_relation = {}
# 统计每个词对儿的频数
for i, ele in enumerate(elements): # 迭代每个位置,准备针对每个位置滑窗
for j in range(i + 1, min(i + window_size, len(elements))): # 滑窗,两两保存关系
term = (ele, elements[j]) # 当前两个词
elements_relation[term] = elements_relation.get(term, 0) + 1 # 统计共现次数
# 构建page-rank图,计算每个节点的权重
results = self._calculate_graph(elements_relation)
return results
if __name__ == '__main__':
content = '报道称,拜登政府对中美经济关系的全面审查已进行了7个月的时间,现在审查应当回答一个核心问题,即如何处理前总统特朗普在2020' \
'年初签署的协议。在这份协议中,为了保护美国的关键产业,如汽车和飞机制造,特朗普政府威胁对价值3600' \
'亿美元的中国进口商品开征关税,经过双方协商,中国承诺购买更多美国产品并改变其贸易做法。但拜登政府至今没有提及这笔交易将何去何从,导致价值3600 亿美元的中国进口商品的关税悬而未决。 '
textrank = TextRank()
# words = [word for word in jieba.cut(content) if word not in textrank.stop_words] # 去除停用词
words = textrank.clean(content)
keywords = textrank.textrank(words, window_size=3)
print(keywords)
补充n-gram代码
from nltk import ngrams 源代码,比较有意思
from itertools import tee
n = 2
sequence = [1,2,4]
iterables = tee(sequence, n)
for i, sub_iterable in enumerate(iterables): # For each window,
for _ in range(i): # iterate through every order of ngram
next(sub_iterable, None) # generate the ngrams within the window.
print(list(zip(*iterables)))
# output: [(1, 2), (2, 3)]
原理是:
list(zip([1,2,3],[2,3]))