当前位置: 首页 > article >正文

Python3网络爬虫开发实战(14)资讯类页面智能解析

文章目录

  • 一、详细页智能解析算法
    • 1.1 提取标题
    • 1.2 提取正文
    • 1.3 提取时间
  • 二、列表页智能解析算法
  • 三、智能分辨列表页和详细页
  • 四、完整的库
    • 4.1 参考文献
    • 4.2 Project

页面智能解析就是利用算法从页面的 HTML 代码中提取想要的内容,算法会自动计算出目标内容在代码中的位置并将他们提取出来;

业界进展:DiffbotEmbedly ;目前 Diffbot 的提取效果算是比较先进的;

对于资讯类网站,除去一些特殊的页面(如登入页面,注册页面等),剩下页面可以分为两大类——列表页和详细页,前者提供多个详细页的索引导航信息,后者则包含具体的内容,针对这两类页面有以下几点需要解决:

  1. 详细页中文章标题,正文,发布事件和提取算法的实现;
  2. 列表页中链接列表的提取算法和实现;
  3. 如何判断一个页面是详细页还是列表页;

一、详细页智能解析算法

详细页是某个内容的展示页面,通常包含醒目的标题,发布事件和占据版面最大的正文部分。另外,详细页的侧栏通常会有一些关联或推荐的内容列表,页面头部的导航链接,评论区,广告区等;

一般来说,详细页包含的信息非常多,例如标题,发布时间,发布来源,作者,正文,封面图,评论数目,评论内容等,不过由于其中一些内容并不常用,而且提取算法大同小异,因此这里主要对三个信息进行提取,标题,正文和发布时间;

由于很多的网页都是由 JS 渲染而成的,因此通过请求获取的页面源代码不一定是在浏览器中看到的页面源代码,因此解析的前提是要求我们提取的必须是渲染完整的 HTML 代码

1.1 提取标题

详细页的标题一般包含在 title 节点或者 h 节点中,可以通过结合 title 节点和 h 节点的内容总结出两步提出思路:

  1. 提取页面中的 h 节点,将内容和 title 节点的文本做比对,和后者相似度最高的内容很可能就是详细页的标题
  2. 如果未在页面中找到 h 节点,则只能使用 title 节点的文本作为结果;

一般来说,有些网站为了使 SEO 效果比较好,会添加一些 meta 标签并将标题信息放入其中,因此总的来说可以综合三方面信息 title,h,meta 来获取信息;

from lxml.html import HtmlElement, fromstring  
  
METAS = [  
    '//meta[start-with](@property, "og:title")/@content',  
    '//meta[start-with](@name, "og:title")/@content',  
    '//meta[start-with](@property, "title")/@content',  
    '//meta[start-with](@name, "title")/@content',  
    '//meta[start-with](@property, "page:title")/@content',  
]  
  
  
def extract_by_meta(element: HtmlElement):  
    for xpath in METAS:  
        title = element.xpath(xpath)  
        if title:  
            return "".join(title)  
  
  
def extract_by_title(element: HtmlElement):  
    return "".join(element.xpath("//title//text()")).strip()  
  
  
def extract_by_h(element: HtmlElement):  
    hs = element.xpath("//h1//text()|//h2//text()|//h3//text()")  
    return hs or []  
  
  
def similarity(s1, s2):  
    if not s1 or not s2:  
        return 0  
    s1_set = set(list(s1))  
    s2_set = set(list(s2))  
    intersection = s1_set.intersection(s2_set)  
    union = s1_set.union(s2_set)  
    return len(intersection) / len(union)  
  
  
def extract_title(element: HtmlElement):  
    title_extracted_by_meta = extract_by_meta(element)  
    title_extracted_by_h = extract_by_h(element)  
    title_extracted_by_title = extract_by_title(element)  
  
    if title_extracted_by_meta:  
        return title_extracted_by_meta  
  
    title_extracted_by_h = sorted(  
        title_extracted_by_h,  
        key=lambda x: similarity(x, title_extracted_by_title),  
        reverse=True,  
    )  
  
    if title_extracted_by_h:  
        return title_extracted_by_h[0]  
  
    return title_extracted_by_title  
  
  
if __name__ == "__main__":  
    # 将html转化为xml格式  
    html = open("detail.html", encoding="utf-8").read()  
    element = fromstring(html=html)  
    title = extract_title(element)

1.2 提取正文

观察资讯类详细页中正文内容的特征,可以发现一些规律:

  1. 正文内容通常被包含在 body 节点的 p 节点中,而且 p 节点一般不会独立存在,而是存在于 div 等节点内;
  2. 正文内容所在的 p 节点也不一定全是正文内容,可能掺杂噪声,如网站的版权信息,发布人,文末广告等,这些都属于噪声;
  3. 正文内容所在的 p 节点中会夹杂 style,script 等节点,这些都不是正文内容;
  4. 正文内容所在的 p 节点内可能包含 code,span 等节点,这些内容大部分属于正文中的特殊样式字符,往往也需要归类到正文内容之中;

作者通过GeneralNewsExtractor和 基于文本及符号密度的网页正文提取方法的启发,得到了两个比较有效的正文文本提取依据指标——文本密度和符号密度;

文本密度不局限于纯文本和节点的大小比例,还考虑到了文本中包含的超链接,论文中定义,如果 i i i 为 HTML DOM 树种的一个节点,那么该节点的文本密度为:

T D i = T i − L T i T G i − L T G i TD_i = \frac{T_i-LT_i}{TG_i-LTG_i} TDi=TGiLTGiTiLTi

如下为其中各个符号的含义: T D i TD_i TDi 表示节点 i i i 的文本密度, T i T_i Ti 表示节点 i i i 中字符串的字数, L T i LT_i LTi 表示 i i i 中带链接的字符串的字数, T G i TG_i TGi 表示节点 i i i 中标签的数量, L T G i LTG_i LTGi 表示节点 i i i 中带链接的标签的数量;

正文中一般会带标点符号,而网页链接,广告信息由于文字较少,通常是不包含标点符号的,因此还可以借助符号密度排除一些内容;节点 i i i 的符号密度如下:

S b D i = T i − L T i S b i + 1 SbD_i=\frac{T_i-LT_i}{Sb_i + 1} SbDi=Sbi+1TiLTi
S b D i SbD_i SbDi 表示节点 i i i 的符号密度, T i T_i Ti 表示节点 i i i 中字符串的字数, L T i LT_i LTi 表示节点 i i i 中带链接的字符串的字数, S b i Sb_i Sbi 表示节点 i i i 中符号的数量(分母另外加 1 是为了确保除数不为 0 );

论文的作者经过多次实验。利用文本密度和符号密度相结合的方式提取正文信息能取得很不错的效果,可以结合两者为每个节点分别计算一个分数,分数最高的节点就为正文内容所在的节点,分数的计算公式如下: S c o r e i = l n S D × T D i × l g ( P N u m i + 2 ) × l n S b D i Score_i = lnSD \times TD_i \times lg(PNum_i + 2) \times lnSbD_i Scorei=lnSD×TDi×lg(PNumi+2)×lnSbDi
其中 S c o r e i Score_i Scorei 表示节点 i i i 的分数, S D SD SD 表示所有节点的文本密度标准差, T D i TD_i TDi 表示节点 i i i 的文本密度, P N u m i PNum_i PNumi 表示节点 i i i 包含的 p p p 节点的数量, S b D i SbD_i SbDi 表示节点 i i i 的符号密度;

如果需要追求更高的正确率,我们还可以结合 css 来利用视觉信息通过计算节点所占区域的大小来排除一些干扰;

from lxml.html import HtmlElement, etree  
  
  
CONTENT_USELESS_TAGS = [  
    "meta",  
    "style",  
    "script",  
    "link",  
    "video",  
    "audio",  
    "iframe",  
    "source",  
    "svg",  
    "path",  
    "symbol",  
    "img",  
    "footer",  
    "header",  
]  
CONTENT_STRIP_TAGS = ["span", "blockquote"]  
CONTENT_NOISE_XPATHS = [  
    '//div[contain(@class, "comment")]',  
    '//div[contain(@class, "advertisement")]',  
    '//div[contain(@class, "advert")]',  
    '//div[contain(@class, "display:none")]',  
]  
  
  
def remove_element(element: HtmlElement):  
    # 如果有父节点那就删除,否则不处理  
    parent = element.getparent()  
    if parent is not None:  
        parent.remove(element)  
  
  
def remove_children(element: HtmlElement, xpaths=None):  
    # 删除掉目标位置的节点  
    if not xpaths:  
        return  
    for xpath in xpaths:  
        nodes = element.xpath(xpath)  
        for node in nodes:  
            remove_element(node)  
    return element  
  
  
def children(element: HtmlElement):  
    # 按html内容依次遍历所有节点  
    yield element  
    for child_element in element:  
        if isinstance(child_element, HtmlElement):  
            yield from children(child_element)  
  
  
def preprocess4content(element: HtmlElement):  
    # 删除标签和内容  
    etree.strip_elements(element, *CONTENT_USELESS_TAGS)  
    # 只删除标签对  
    etree.strip_tags(element, *CONTENT_STRIP_TAGS)  
    # 删除噪声标签  
    remove_children(element, CONTENT_NOISE_XPATHS)  
  
    for child in children(element):  
        # 把 span 和 strong 标签里面的文本呢合并到父级 p 标签里面  
        if child.tag.lower() == "p":  
            etree.strip_tags(child, "span")  
            etree.strip_tags(child, "strong")  
  
            if not (child.text and child.text.strip()):  
                remove_element(child)  
  
        # 如果 div 标签里没有任何子节点,就把它转换为 p 标签  
        if child.tag.lower() == "div" and not child.getchildren():  
            child.tag = "p"

预处理完毕后,整个 element 因为没有了噪声和干扰数据,变得比较规整了,下一步,来实现文本密度,符号密度和最终分数的计算;

为了方便处理,将节点定义成一个类,继承于 HtmlElement,包含很多字段,代表某个节点的信息,例如文本密度,符号密度等,Element 的定义(GerapyAutoExtractor/gerapy_auto_extractor/schemas/element.py at master · Gerapy/GerapyAutoExtractor (github.com))如下:

from lxml.html import HtmlElement, etree
from numpy import mean


class Element(HtmlElement):
    _id: int = None
    _selector: str = None
    _parent_selector: str = None
    _alias: str = None
    _tag_name: str = None
    _path: str = None
    _path_raw: str = None
    _children = None
    _parent = None
    _siblings = None
    _descendants = None
    _text = None
    _number_of_char: int = None
    _number_of_a_char: int = None
    _number_of_punctuation: int = None
    _number_of_a_descendants: int = None
    _number_of_p_descendants: int = None
    _number_of_children: int = None
    _number_of_siblings: int = None
    _number_of_descendants: int = None
    _density_of_punctuation: int = None
    _density_of_text: float = None
    _density_score: float = None
    _similarity_with_siblings: float = None
    _a_descendants: list = None
    _a_descendants_group: dict = None
    _a_descendants_group_text_length: dict = None
    _a_descendants_group_text_min_length: float = None
    _a_descendants_group_text_max_length: float = None
    
    density_score: float = None
    
    @property
    def id(self):
        """
        get id by hashed element
        :return:
        """
        if self._id is not None:
            return self._id
        self._id = hash(self)
        return self._id
    
    @property
    def nth(self):
        """
        get nth index of this element in parent element
        :return:
        """
        return len(list(self.itersiblings(preceding=True))) + 1
    
    
    @property
    def alias(self):
        """
        get alias of element, using all attributes to construct it.
        :return: string
        """
        if self._alias is not None:
            return self._alias
        from gerapy_auto_extractor.utils.element import alias
        self._alias = alias(self)
        return self._alias
    
    @property
    def selector(self):
        """
        get id by hashed element
        :return:
        """
        if self._selector is not None:
            return self._selector
        from gerapy_auto_extractor.utils.element import selector
        self._selector = selector(self)
        return self._selector
    
    @property
    def children(self):
        """
        get children of this element
        :return: 
        """
        if self._children is not None:
            return self._children
        from gerapy_auto_extractor.utils.element import children
        self._children = list(children(self))
        return self._children
    
    @property
    def siblings(self):
        """
        get siblings of this element
        :return: 
        """
        if self._siblings is not None:
            return self._siblings
        from gerapy_auto_extractor.utils.element import siblings
        self._siblings = list(siblings(self))
        return self._siblings
    
    @property
    def descendants(self):
        """
        get descendants of this element
        :return: 
        """
        if self._descendants is not None:
            return self._descendants
        from gerapy_auto_extractor.utils.element import descendants
        self._descendants = list(descendants(self))
        return self._descendants
    
    @property
    def parent_selector(self):
        """
        get id by hashed element
        :return:
        """
        if self._parent_selector is not None:
            return self._parent_selector
        from gerapy_auto_extractor.utils.element import selector, parent
        # TODO: change parent(self) to self.parent
        p = parent(self)
        if p is not None:
            self._parent_selector = selector(p)
        return self._parent_selector
    
    @property
    def tag_name(self):
        """
        return tag name
        :return:
        """
        if self._tag_name:
            return self._tag_name
        self._tag_name = self.tag
        return self._tag_name
    
    @property
    def text(self):
        """
        get text of element
        :return:
        """
        if self._text is not None:
            return self._text
        from gerapy_auto_extractor.utils.element import text
        self._text = text(self)
        return self._text
    
    @property
    def string(self):
        """
        return string of element
        :return:
        """
        return etree.tostring(self, pretty_print=True, encoding="utf-8", method='html').decode('utf-8')
    
    @property
    def path(self):
        """
        get tag path using external path function
        :return:
        """
        if self._path is not None:
            return self._path
        from gerapy_auto_extractor.utils.element import path
        self._path = path(self)
        return self._path
    
    @property
    def path_raw(self):
        """
        get tag raw path using external path raw function
        :return:
        """
        if self._path_raw is not None:
            return self._path_raw
        from gerapy_auto_extractor.utils.element import path_raw
        self._path_raw = path_raw(self)
        return self._path_raw
    
    @property
    def number_of_char(self):
        """
        get text length
        :return:
        """
        if self._number_of_char is not None:
            return self._number_of_char
        from gerapy_auto_extractor.utils.element import number_of_char
        self._number_of_char = number_of_char(self)
        return self._number_of_char
    
    @property
    def number_of_a_descendants(self):
        """
        get number of a descendants
        :return:
        """
        if self._number_of_a_descendants is not None:
            return self._number_of_a_descendants
        from gerapy_auto_extractor.utils.element import number_of_a_descendants
        self._number_of_a_descendants = number_of_a_descendants(self)
        return self._number_of_a_descendants
    
    @property
    def number_of_a_char(self):
        """
        get a text length
        :return:
        """
        if self._number_of_a_char is not None:
            return self._number_of_a_char
        from gerapy_auto_extractor.utils.element import number_of_a_char
        self._number_of_a_char = number_of_a_char(self)
        return self._number_of_a_char
    
    @property
    def number_of_p_descendants(self):
        """
        return number of paragraph
        :return:
        """
        if self._number_of_p_descendants is not None:
            return self._number_of_p_descendants
        from gerapy_auto_extractor.utils.element import number_of_p_descendants
        self._number_of_p_descendants = number_of_p_descendants(self)
        return self._number_of_p_descendants
    
    @property
    def number_of_punctuation(self):
        """
        get number of punctuation
        :return:
        """
        if self._number_of_punctuation is not None:
            return self._number_of_punctuation
        from gerapy_auto_extractor.utils.element import number_of_punctuation
        self._number_of_punctuation = number_of_punctuation(self)
        return self._number_of_punctuation
    
    @property
    def number_of_children(self):
        """
        get children number
        :return:
        """
        if self._number_of_children is not None:
            return self._number_of_children
        self._number_of_children = len(list(self.children))
        return self._number_of_children
    
    @property
    def number_of_siblings(self):
        """
        get number of siblings
        :return:
        """
        if self._number_of_siblings is not None:
            return self._number_of_siblings
        self._number_of_siblings = len(list(self.siblings))
        return self._number_of_siblings
    
    @property
    def number_of_descendants(self):
        """
        get number of descendants
        :return:
        """
        if self._number_of_descendants is not None:
            return self._number_of_descendants
        from gerapy_auto_extractor.utils.element import number_of_descendants
        self._number_of_descendants = len(list(self.descendants))
        return self._number_of_descendants
    
    @property
    def density_of_punctuation(self):
        """
        get density of punctuation
        :return:
        """
        if self._density_of_punctuation is not None:
            return self._density_of_punctuation
        from gerapy_auto_extractor.utils.element import density_of_punctuation
        self._density_of_punctuation = density_of_punctuation(self)
        return self._density_of_punctuation
    
    @property
    def density_of_text(self):
        """
        get density of text
        :return:
        """
        if self._density_of_text is not None:
            return self._density_of_text
        from gerapy_auto_extractor.utils.element import density_of_text
        self._density_of_text = density_of_text(self)
        return self._density_of_text
    
    @property
    def similarity_with_siblings(self):
        """
        get similarity with siblings
        :return:
        """
        if self._similarity_with_siblings is not None:
            return self._similarity_with_siblings
        from gerapy_auto_extractor.utils.element import similarity_with_siblings
        self._similarity_with_siblings = similarity_with_siblings(self)
        return self._similarity_with_siblings
    
    @property
    def a_descendants(self):
        """
        get linked descendants
        :return:
        """
        if self._a_descendants is not None:
            return self._a_descendants
        from gerapy_auto_extractor.utils.element import a_descendants
        self._a_descendants = a_descendants(self)
        return self._a_descendants
    
    @property
    def a_descendants_group(self):
        """
        get linked descendants group
        :return:
        """
        if self._a_descendants_group is not None:
            return self._a_descendants_group
        from gerapy_auto_extractor.utils.element import a_descendants_group
        self._a_descendants_group = a_descendants_group(self)
        return self._a_descendants_group
    
    @property
    def a_descendants_group_text_length(self):
        """
        grouped linked text length
        :return:
        """
        if self._a_descendants_group_text_length is not None:
            return self._a_descendants_group_text_length
        result = {}
        from gerapy_auto_extractor.utils.element import text
        for path, elements in self.a_descendants_group.items():
            lengths = []
            for element in elements:
                # TODO: convert len(text(element)) to element.number_of_char
                lengths.append(len(text(element)))
            mean_length = mean(lengths) if len(lengths) else 0
            result[path] = mean_length
        return result
    
    @property
    def a_descendants_group_text_min_length(self):
        """
        get grouped linked text min length
        :return:
        """
        if self._a_descendants_group_text_min_length is not None:
            return self._a_descendants_group_text_min_length
        values = self.a_descendants_group_text_length.values()
        self._a_descendants_group_text_min_length = min(values) if values else 0
        return self._a_descendants_group_text_min_length
    
    @property
    def a_descendants_group_text_max_length(self):
        """
        get grouped linked text max length
        :return:
        """
        if self._a_descendants_group_text_max_length is not None:
            return self._a_descendants_group_text_max_length
        values = self.a_descendants_group_text_length.values()
        self._a_descendants_group_text_max_length = max(values) if values else 0
        return self._a_descendants_group_text_max_length
    
    @property
    def a_descendants_group_text_avg_length(self):
        """
        get grouped linked text avg length
        :return:
        """
        if self._a_descendants_group_text_max_length is not None:
            return self._a_descendants_group_text_max_length
        values = self.a_descendants_group_text_length.values()
        self._a_descendants_group_text_max_length = max(values) if values else 0
        return self._a_descendants_group_text_max_length
    
    def __str__(self):
        """
        rewrite str
        :return:
        """
        return f'<Element {self.tag} of {self.path}>'
    
    def __repr__(self):
        """
        rewrite repr
        :return:
        """
        return self.__str__()

通过这些方法,可以计算 Element 对象中的各个指标,提取正文的方法定义如下:

def process(element: Element):
	"""
	extract content from html
	:param element:
	:return:
	"""
	# preprocess
	preprocess4content(element)
	
	# start to evaluate every child element
	element_infos = []
	descendants = descendants_of_body(element)
	
	# get std of density_of_text among all elements
	density_of_text = [descendant.density_of_text for descendant in descendants]
	density_of_text_std = np.std(density_of_text, ddof=1)
	
	# get density_score of every element
	for descendant in descendants:
		score = np.log(density_of_text_std) * \
				descendant.density_of_text * \
				np.log10(descendant.number_of_p_descendants + 2) * \
				np.log(descendant.density_of_punctuation)
		descendant.density_score = score
	
	# sort element info by density_score
	descendants = sorted(descendants, key=lambda x: x.density_score, reverse=True)
	descendant_first = descendants[0] if descendants else None
	if descendant_first is None:
		return None
	paragraphs = descendant_first.xpath('.//p//text()')
	paragraphs = [paragraph.strip() if paragraph else '' for paragraph in paragraphs]
	paragraphs = list(filter(lambda x: x, paragraphs))
	text = '\n'.join(paragraphs)
	text = text.strip()
	return text

1.3 提取时间

和标题类似,一些正规的网站为了使 SEO 效果比较好,会把时间信息放到 meta 节点内,然而不是所有的网站都会加上这样的 meta 节点,在这里我们可以使用正则表达式来提取时间信息;

总的来说,发布事件的提取标准如下:

  1. 根据 meta 节点的信息提取时间,提取结果大概率就是真实的发布事件,可信度较高;
  2. 根据正则表达式提取时间,如果匹配到一些置信度比较高的规则,那么可以直接提取,如果匹配到置信度不高的规则或者提取到多个事件信息,则可以进行下一步的提取和筛选;
  3. 通过计算节点和正文的距离,再结合其他相关信息筛选出最有节点作为结果;

首先定义 meta 和 正则表达式如下:

METAS_CONTENT = [
    '//meta[starts-with(@property, "rnews:datePublished")]/@content',
    '//meta[starts-with(@property, "article:published_time")]/@content',
    '//meta[starts-with(@property, "og:published_time")]/@content',
    '//meta[starts-with(@property, "og:release_date")]/@content',
    '//meta[starts-with(@itemprop, "datePublished")]/@content',
    '//meta[starts-with(@itemprop, "dateUpdate")]/@content',
    '//meta[starts-with(@name, "OriginalPublicationDate")]/@content',
    '//meta[starts-with(@name, "article_date_original")]/@content',
    '//meta[starts-with(@name, "og:time")]/@content',
    '//meta[starts-with(@name, "apub:time")]/@content',
    '//meta[starts-with(@name, "publication_date")]/@content',
    '//meta[starts-with(@name, "sailthru.date")]/@content',
    '//meta[starts-with(@name, "PublishDate")]/@content',
    '//meta[starts-with(@name, "publishdate")]/@content',
    '//meta[starts-with(@name, "PubDate")]/@content',
    '//meta[starts-with(@name, "pubtime")]/@content',
    '//meta[starts-with(@name, "_pubtime")]/@content',
    '//meta[starts-with(@name, "weibo: article:create_at")]/@content',
    '//meta[starts-with(@pubdate, "pubdate")]/@content',
]

METAS_MATCH = [
    '//meta[starts-with(@property, "rnews:datePublished")]',
    '//meta[starts-with(@property, "article:published_time")]',
    '//meta[starts-with(@property, "og:published_time")]',
    '//meta[starts-with(@property, "og:release_date")]',
    '//meta[starts-with(@itemprop, "datePublished")]',
    '//meta[starts-with(@itemprop, "dateUpdate")]',
    '//meta[starts-with(@name, "OriginalPublicationDate")]',
    '//meta[starts-with(@name, "article_date_original")]',
    '//meta[starts-with(@name, "og:time")]',
    '//meta[starts-with(@name, "apub:time")]',
    '//meta[starts-with(@name, "publication_date")]',
    '//meta[starts-with(@name, "sailthru.date")]',
    '//meta[starts-with(@name, "PublishDate")]',
    '//meta[starts-with(@name, "publishdate")]',
    '//meta[starts-with(@name, "PubDate")]',
    '//meta[starts-with(@name, "pubtime")]',
    '//meta[starts-with(@name, "_pubtime")]',
    '//meta[starts-with(@name, "weibo: article:create_at")]',
    '//meta[starts-with(@pubdate, "pubdate")]',
]

REGEXES = [
    "(\d{4}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[0-1]?[0-9]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{4}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[2][0-3]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{4}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[0-1]?[0-9]:[0-5]?[0-9])",
    "(\d{4}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[2][0-3]:[0-5]?[0-9])",
    "(\d{4}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[1-24]\d时[0-60]\d分)([1-24]\d时)",
    "(\d{2}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[0-1]?[0-9]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{2}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[2][0-3]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{2}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[0-1]?[0-9]:[0-5]?[0-9])",
    "(\d{2}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[2][0-3]:[0-5]?[0-9])",
    "(\d{2}[-|/|.]\d{1,2}[-|/|.]\d{1,2}\s*?[1-24]\d时[0-60]\d分)([1-24]\d时)",
    "(\d{4}年\d{1,2}月\d{1,2}日\s*?[0-1]?[0-9]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{4}年\d{1,2}月\d{1,2}日\s*?[2][0-3]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{4}年\d{1,2}月\d{1,2}日\s*?[0-1]?[0-9]:[0-5]?[0-9])",
    "(\d{4}年\d{1,2}月\d{1,2}日\s*?[2][0-3]:[0-5]?[0-9])",
    "(\d{4}年\d{1,2}月\d{1,2}日\s*?[1-24]\d时[0-60]\d分)([1-24]\d时)",
    "(\d{2}年\d{1,2}月\d{1,2}日\s*?[0-1]?[0-9]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{2}年\d{1,2}月\d{1,2}日\s*?[2][0-3]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{2}年\d{1,2}月\d{1,2}日\s*?[0-1]?[0-9]:[0-5]?[0-9])",
    "(\d{2}年\d{1,2}月\d{1,2}日\s*?[2][0-3]:[0-5]?[0-9])",
    "(\d{2}年\d{1,2}月\d{1,2}日\s*?[1-24]\d时[0-60]\d分)([1-24]\d时)",
    "(\d{1,2}月\d{1,2}日\s*?[0-1]?[0-9]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{1,2}月\d{1,2}日\s*?[2][0-3]:[0-5]?[0-9]:[0-5]?[0-9])",
    "(\d{1,2}月\d{1,2}日\s*?[0-1]?[0-9]:[0-5]?[0-9])",
    "(\d{1,2}月\d{1,2}日\s*?[2][0-3]:[0-5]?[0-9])",
    "(\d{1,2}月\d{1,2}日\s*?[1-24]\d时[0-60]\d分)([1-24]\d时)",
    "(\d{4}[-|/|.]\d{1,2}[-|/|.]\d{1,2})",
    "(\d{2}[-|/|.]\d{1,2}[-|/|.]\d{1,2})",
    "(\d{4}年\d{1,2}月\d{1,2}日)",
    "(\d{2}年\d{1,2}月\d{1,2}日)",
    "(\d{1,2}月\d{1,2}日)"
]

最后定义一个提取方法并整合到一起,优先使用 meta 中的内容;

import re  
  
def extract_by_regex(element: HtmlElement) -> str:  
    """  
    extract datetime according to predefined regex    
    :param element:    
    :return:  
    """    
    text = ''.join(element.xpath('.//text()'))  
    for regex in REGEXES:  
        result = re.search(regex, text)  
        if result:  
            return result.group(1)  
  
  
def extract_by_meta(element: HtmlElement) -> str:  
    """  
    extract according to meta    
    :param element:    
    :return: str  
    """    
    for xpath in METAS_CONTENT:  
        datetime = element.xpath(xpath)  
        if datetime:  
            return ''.join(datetime)  
  
  
def process(element: HtmlElement):  
    """  
    extract datetime    
    :param html:    
    :return:  
    """    
    return extract_by_meta(element) or \  
        extract_by_regex(element)

二、列表页智能解析算法

列表页包含一个个详细页的标题和链接,点击其中某个链接,就可以进入对应的详细页,列表页页面主要区域里的列表都很醒目;

列表页解析的目标是从当前列表页中把详细页的标题和链接提取出来,并以列表的形式返回;

[
 {
	 "title": *************,
	 "url": *************,
 },
 {
	 "title": *************,
	 "url": *************,
 },
 {
	 "title": *************,
	 "url": *************,
 },
]

列表页中的标题以及链接并不都是按照固定的 ul,li 来排列的,因此我们需要找一个通用的提取模式,可以观察得到,列表中的标题通常是一组一组呈现的,如果进观察一组,可以发现组内包含多个连续并列的兄弟节点,如果我们把这些连续并列的兄弟节点作为寻找目标,就可以得到这样一个通用的规律:

  1. 这些节点都是同类型且连续的兄弟节点,数量至少为 2 个;
  2. 这些节点有一个共同的父节点;

为了更好的表述算法流程,把共同的父节点称为 “组节点”,同类型且连续的兄弟节点称为 “成员节点”;目标组节点和其他组节点最明显不同之处在于字数,因此我们需要规定成员节点的最小平均字符数,同时对于多个目标组节点,我们可以通过合并的方式变为一个组节点再来提取;

首先便是预处理,和详细页一样;

from lxml.html import HtmlElement, etree  
  
  
CONTENT_USELESS_TAGS = [  
    "meta",  
    "style",  
    "script",  
    "link",  
    "video",  
    "audio",  
    "iframe",  
    "source",  
    "svg",  
    "path",  
    "symbol",  
    "img",  
    "footer",  
    "header",  
]  
CONTENT_STRIP_TAGS = ["span", "blockquote"]  
CONTENT_NOISE_XPATHS = [  
    '//div[contain(@class, "comment")]',  
    '//div[contain(@class, "advertisement")]',  
    '//div[contain(@class, "advert")]',  
    '//div[contain(@class, "display:none")]',  
]  
  
  
def remove_element(element: HtmlElement):  
    # 如果有父节点那就删除,否则不处理  
    parent = element.getparent()  
    if parent is not None:  
        parent.remove(element)  
  
  
def remove_children(element: HtmlElement, xpaths=None):  
    # 删除掉目标位置的节点  
    if not xpaths:  
        return  
    for xpath in xpaths:  
        nodes = element.xpath(xpath)  
        for node in nodes:  
            remove_element(node)  
    return element  
  
  
def children(element: HtmlElement):  
    # 按html内容依次遍历所有节点  
    yield element  
    for child_element in element:  
        if isinstance(child_element, HtmlElement):  
            yield from children(child_element)  
  
  
def preprocess4content(element: HtmlElement):  
    # 删除标签和内容  
    etree.strip_elements(element, *CONTENT_USELESS_TAGS)  
    # 只删除标签对  
    etree.strip_tags(element, *CONTENT_STRIP_TAGS)  
    # 删除噪声标签  
    remove_children(element, CONTENT_NOISE_XPATHS)  
  
    for child in children(element):  
        # 把 span 和 strong 标签里面的文本呢合并到父级 p 标签里面  
        if child.tag.lower() == "p":  
            etree.strip_tags(child, "span")  
            etree.strip_tags(child, "strong")  
  
            if not (child.text and child.text.strip()):  
                remove_element(child)  
  
        # 如果 div 标签里没有任何子节点,就把它转换为 p 标签  
        if child.tag.lower() == "div" and not child.getchildren():  
            child.tag = "p"

同样的,和详细页一样,定义成一个类,继承于 HtmlElement,包含很多字段,代表某个节点的信息,例如文本密度,符号密度等,Element 的定义(GerapyAutoExtractor/gerapy_auto_extractor/schemas/element.py at master · Gerapy/GerapyAutoExtractor (github.com))如下:

from lxml.html import HtmlElement, etree
from numpy import mean


class Element(HtmlElement):
    _id: int = None
    _selector: str = None
    _parent_selector: str = None
    _alias: str = None
    _tag_name: str = None
    _path: str = None
    _path_raw: str = None
    _children = None
    _parent = None
    _siblings = None
    _descendants = None
    _text = None
    _number_of_char: int = None
    _number_of_a_char: int = None
    _number_of_punctuation: int = None
    _number_of_a_descendants: int = None
    _number_of_p_descendants: int = None
    _number_of_children: int = None
    _number_of_siblings: int = None
    _number_of_descendants: int = None
    _density_of_punctuation: int = None
    _density_of_text: float = None
    _density_score: float = None
    _similarity_with_siblings: float = None
    _a_descendants: list = None
    _a_descendants_group: dict = None
    _a_descendants_group_text_length: dict = None
    _a_descendants_group_text_min_length: float = None
    _a_descendants_group_text_max_length: float = None
    
    density_score: float = None
    
    @property
    def id(self):
        """
        get id by hashed element
        :return:
        """
        if self._id is not None:
            return self._id
        self._id = hash(self)
        return self._id
    
    @property
    def nth(self):
        """
        get nth index of this element in parent element
        :return:
        """
        return len(list(self.itersiblings(preceding=True))) + 1
    
    
    @property
    def alias(self):
        """
        get alias of element, using all attributes to construct it.
        :return: string
        """
        if self._alias is not None:
            return self._alias
        from gerapy_auto_extractor.utils.element import alias
        self._alias = alias(self)
        return self._alias
    
    @property
    def selector(self):
        """
        get id by hashed element
        :return:
        """
        if self._selector is not None:
            return self._selector
        from gerapy_auto_extractor.utils.element import selector
        self._selector = selector(self)
        return self._selector
    
    @property
    def children(self):
        """
        get children of this element
        :return: 
        """
        if self._children is not None:
            return self._children
        from gerapy_auto_extractor.utils.element import children
        self._children = list(children(self))
        return self._children
    
    @property
    def siblings(self):
        """
        get siblings of this element
        :return: 
        """
        if self._siblings is not None:
            return self._siblings
        from gerapy_auto_extractor.utils.element import siblings
        self._siblings = list(siblings(self))
        return self._siblings
    
    @property
    def descendants(self):
        """
        get descendants of this element
        :return: 
        """
        if self._descendants is not None:
            return self._descendants
        from gerapy_auto_extractor.utils.element import descendants
        self._descendants = list(descendants(self))
        return self._descendants
    
    @property
    def parent_selector(self):
        """
        get id by hashed element
        :return:
        """
        if self._parent_selector is not None:
            return self._parent_selector
        from gerapy_auto_extractor.utils.element import selector, parent
        # TODO: change parent(self) to self.parent
        p = parent(self)
        if p is not None:
            self._parent_selector = selector(p)
        return self._parent_selector
    
    @property
    def tag_name(self):
        """
        return tag name
        :return:
        """
        if self._tag_name:
            return self._tag_name
        self._tag_name = self.tag
        return self._tag_name
    
    @property
    def text(self):
        """
        get text of element
        :return:
        """
        if self._text is not None:
            return self._text
        from gerapy_auto_extractor.utils.element import text
        self._text = text(self)
        return self._text
    
    @property
    def string(self):
        """
        return string of element
        :return:
        """
        return etree.tostring(self, pretty_print=True, encoding="utf-8", method='html').decode('utf-8')
    
    @property
    def path(self):
        """
        get tag path using external path function
        :return:
        """
        if self._path is not None:
            return self._path
        from gerapy_auto_extractor.utils.element import path
        self._path = path(self)
        return self._path
    
    @property
    def path_raw(self):
        """
        get tag raw path using external path raw function
        :return:
        """
        if self._path_raw is not None:
            return self._path_raw
        from gerapy_auto_extractor.utils.element import path_raw
        self._path_raw = path_raw(self)
        return self._path_raw
    
    @property
    def number_of_char(self):
        """
        get text length
        :return:
        """
        if self._number_of_char is not None:
            return self._number_of_char
        from gerapy_auto_extractor.utils.element import number_of_char
        self._number_of_char = number_of_char(self)
        return self._number_of_char
    
    @property
    def number_of_a_descendants(self):
        """
        get number of a descendants
        :return:
        """
        if self._number_of_a_descendants is not None:
            return self._number_of_a_descendants
        from gerapy_auto_extractor.utils.element import number_of_a_descendants
        self._number_of_a_descendants = number_of_a_descendants(self)
        return self._number_of_a_descendants
    
    @property
    def number_of_a_char(self):
        """
        get a text length
        :return:
        """
        if self._number_of_a_char is not None:
            return self._number_of_a_char
        from gerapy_auto_extractor.utils.element import number_of_a_char
        self._number_of_a_char = number_of_a_char(self)
        return self._number_of_a_char
    
    @property
    def number_of_p_descendants(self):
        """
        return number of paragraph
        :return:
        """
        if self._number_of_p_descendants is not None:
            return self._number_of_p_descendants
        from gerapy_auto_extractor.utils.element import number_of_p_descendants
        self._number_of_p_descendants = number_of_p_descendants(self)
        return self._number_of_p_descendants
    
    @property
    def number_of_punctuation(self):
        """
        get number of punctuation
        :return:
        """
        if self._number_of_punctuation is not None:
            return self._number_of_punctuation
        from gerapy_auto_extractor.utils.element import number_of_punctuation
        self._number_of_punctuation = number_of_punctuation(self)
        return self._number_of_punctuation
    
    @property
    def number_of_children(self):
        """
        get children number
        :return:
        """
        if self._number_of_children is not None:
            return self._number_of_children
        self._number_of_children = len(list(self.children))
        return self._number_of_children
    
    @property
    def number_of_siblings(self):
        """
        get number of siblings
        :return:
        """
        if self._number_of_siblings is not None:
            return self._number_of_siblings
        self._number_of_siblings = len(list(self.siblings))
        return self._number_of_siblings
    
    @property
    def number_of_descendants(self):
        """
        get number of descendants
        :return:
        """
        if self._number_of_descendants is not None:
            return self._number_of_descendants
        from gerapy_auto_extractor.utils.element import number_of_descendants
        self._number_of_descendants = len(list(self.descendants))
        return self._number_of_descendants
    
    @property
    def density_of_punctuation(self):
        """
        get density of punctuation
        :return:
        """
        if self._density_of_punctuation is not None:
            return self._density_of_punctuation
        from gerapy_auto_extractor.utils.element import density_of_punctuation
        self._density_of_punctuation = density_of_punctuation(self)
        return self._density_of_punctuation
    
    @property
    def density_of_text(self):
        """
        get density of text
        :return:
        """
        if self._density_of_text is not None:
            return self._density_of_text
        from gerapy_auto_extractor.utils.element import density_of_text
        self._density_of_text = density_of_text(self)
        return self._density_of_text
    
    @property
    def similarity_with_siblings(self):
        """
        get similarity with siblings
        :return:
        """
        if self._similarity_with_siblings is not None:
            return self._similarity_with_siblings
        from gerapy_auto_extractor.utils.element import similarity_with_siblings
        self._similarity_with_siblings = similarity_with_siblings(self)
        return self._similarity_with_siblings
    
    @property
    def a_descendants(self):
        """
        get linked descendants
        :return:
        """
        if self._a_descendants is not None:
            return self._a_descendants
        from gerapy_auto_extractor.utils.element import a_descendants
        self._a_descendants = a_descendants(self)
        return self._a_descendants
    
    @property
    def a_descendants_group(self):
        """
        get linked descendants group
        :return:
        """
        if self._a_descendants_group is not None:
            return self._a_descendants_group
        from gerapy_auto_extractor.utils.element import a_descendants_group
        self._a_descendants_group = a_descendants_group(self)
        return self._a_descendants_group
    
    @property
    def a_descendants_group_text_length(self):
        """
        grouped linked text length
        :return:
        """
        if self._a_descendants_group_text_length is not None:
            return self._a_descendants_group_text_length
        result = {}
        from gerapy_auto_extractor.utils.element import text
        for path, elements in self.a_descendants_group.items():
            lengths = []
            for element in elements:
                # TODO: convert len(text(element)) to element.number_of_char
                lengths.append(len(text(element)))
            mean_length = mean(lengths) if len(lengths) else 0
            result[path] = mean_length
        return result
    
    @property
    def a_descendants_group_text_min_length(self):
        """
        get grouped linked text min length
        :return:
        """
        if self._a_descendants_group_text_min_length is not None:
            return self._a_descendants_group_text_min_length
        values = self.a_descendants_group_text_length.values()
        self._a_descendants_group_text_min_length = min(values) if values else 0
        return self._a_descendants_group_text_min_length
    
    @property
    def a_descendants_group_text_max_length(self):
        """
        get grouped linked text max length
        :return:
        """
        if self._a_descendants_group_text_max_length is not None:
            return self._a_descendants_group_text_max_length
        values = self.a_descendants_group_text_length.values()
        self._a_descendants_group_text_max_length = max(values) if values else 0
        return self._a_descendants_group_text_max_length
    
    @property
    def a_descendants_group_text_avg_length(self):
        """
        get grouped linked text avg length
        :return:
        """
        if self._a_descendants_group_text_max_length is not None:
            return self._a_descendants_group_text_max_length
        values = self.a_descendants_group_text_length.values()
        self._a_descendants_group_text_max_length = max(values) if values else 0
        return self._a_descendants_group_text_max_length
    
    def __str__(self):
        """
        rewrite str
        :return:
        """
        return f'<Element {self.tag} of {self.path}>'
    
    def __repr__(self):
        """
        rewrite repr
        :return:
        """
        return self.__str__()

最后定义一个聚类方式,聚类信息然后提取

import math  
import operator  
from loguru import logger  
import numpy as np  
from collections import defaultdict  
from urllib.parse import urljoin  
  
  
LIST_MIN_NUMBER = 5  
LIST_MIN_LENGTH = 8  
LIST_MAX_LENGTH = 44  
SIMILARITY_THRESHOLD = 0.8  
  
  
class ListExtractor:  
    """  
    extract list from index page    """  
    def __init__(self, min_number=LIST_MIN_NUMBER, min_length=LIST_MIN_LENGTH, max_length=LIST_MAX_LENGTH,  
                 similarity_threshold=SIMILARITY_THRESHOLD):  
        """  
        init list extractor        """        super(ListExtractor, self).__init__()  
        self.min_number = min_number  
        self.min_length = min_length  
        self.max_length = max_length  
        self.avg_length = (self.min_length + self.max_length) / 2  
        self.similarity_threshold = similarity_threshold  
  
    def _probability_of_title_with_length(self, length):  
        """  
        get the probability of title according to length        import matplotlib.pyplot as plt        x = np.asarray(range(5, 40))        y = list_extractor.probability_of_title_with_length(x)        plt.plot(x, y, 'g', label='m=0, sig=2')        plt.show()        :param length:        :return:  
        """        sigma = 6  
        return np.exp(-1 * ((length - self.avg_length) ** 2) / (2 * (sigma ** 2))) / (math.sqrt(2 * np.pi) * sigma)  
  
    def _build_clusters(self, element):  
        """  
        build candidate clusters according to element        :return:  
        """        descendants_tree = defaultdict(list)  
        descendants = descendants_of_body(element)  
        for descendant in descendants:  
            # if one element does not have enough siblings, it can not become a child of candidate element  
            if descendant.number_of_siblings + 1 < self.min_number:  
                continue  
            # if min length is larger than specified max length, it can not become a child of candidate element  
            if descendant.a_descendants_group_text_min_length > self.max_length:  
                continue  
            # if max length is smaller than specified min length, it can not become a child of candidate element  
            if descendant.a_descendants_group_text_max_length < self.min_length:  
                continue  
            # descendant element must have same siblings which their similarity should not below similarity_threshold  
            if descendant.similarity_with_siblings < self.similarity_threshold:  
                continue  
            descendants_tree[descendant.parent_selector].append(descendant)  
        descendants_tree = dict(descendants_tree)  
  
        # cut tree, remove parent block  
        selectors = sorted(list(descendants_tree.keys()))  
        last_selector = None  
        for selector in selectors[::-1]:  
            # if later selector  
            if last_selector and selector and last_selector.startswith(selector):  
                del descendants_tree[selector]  
            last_selector = selector  
        clusters = cluster_dict(descendants_tree)  
        return clusters  
  
    def _evaluate_cluster(self, cluster):  
        """  
        calculate score of cluster using similarity, numbers, or other info        :param cluster:        :return:  
        """        score = dict()  
  
        # calculate avg_similarity_with_siblings  
        score['avg_similarity_with_siblings'] = np.mean(  
            [element.similarity_with_siblings for element in cluster])  
  
        # calculate number of elements  
        score['number_of_elements'] = len(cluster)  
  
        # calculate probability of it contains title  
        # score['probability_of_title_with_length'] = np.mean([        #     self._probability_of_title_with_length(len(a_descendant.text)) \        #     for a_descendant in itertools.chain(*[element.a_descendants for element in cluster]) \        #     ])  
        # TODO: add more quota to select best cluster  
        score['clusters_score'] = \  
            score['avg_similarity_with_siblings'] \  
            * np.log10(score['number_of_elements'] + 1) \  
            # * clusters_score[cluster_id]['probability_of_title_with_length']  
        return score  
  
    def _extend_cluster(self, cluster):  
        """  
        extend cluster's elements except for missed children        :param cluster:        :return:  
        """        result = [element.selector for element in cluster]  
        for element in cluster:  
            path_raw = element.path_raw  
            siblings = list(element.siblings)  
            for sibling in siblings:  
                # skip invalid element  
                if not isinstance(sibling, Element):  
                    continue  
                sibling_selector = sibling.selector  
                sibling_path_raw = sibling.path_raw  
                if sibling_path_raw != path_raw:  
                    continue  
                # add missed sibling  
                if sibling_selector not in result:  
                    cluster.append(sibling)  
                    result.append(sibling_selector)  
  
        cluster = sorted(cluster, key=lambda x: x.nth)  
        logger.log('inspect', f'cluster after extend {cluster}')  
        return cluster  
  
    def _best_cluster(self, clusters):  
        """  
        use clustering algorithm to choose best cluster from candidate clusters        :param clusters:        :return:  
        """        if not clusters:  
            logger.log('inspect', 'there is on cluster, just return empty result')  
            return []  
        if len(clusters) == 1:  
            logger.log('inspect', 'there is only one cluster, just return first cluster')  
            return clusters[0]  
        # choose best cluster using score  
        clusters_score = defaultdict(dict)  
        clusters_score_arg_max = 0  
        clusters_score_max = -1  
        for cluster_id, cluster in clusters.items():  
            # calculate avg_similarity_with_siblings  
            clusters_score[cluster_id] = self._evaluate_cluster(cluster)  
            # get max score arg index  
            if clusters_score[cluster_id]['clusters_score'] > clusters_score_max:  
                clusters_score_max = clusters_score[cluster_id]['clusters_score']  
                clusters_score_arg_max = cluster_id  
        logger.log('inspect', f'clusters_score {clusters_score}')  
        best_cluster = clusters[clusters_score_arg_max]  
        return best_cluster  
  
    def _extract_cluster(self, cluster):  
        """  
        extract title and href from best cluster        :param cluster:        :return:  
        """        if not cluster:  
            return None  
        # get best tag path of title  
        probabilities_of_title = defaultdict(list)  
        for element in cluster:  
            descendants = element.a_descendants  
            for descendant in descendants:  
                path = descendant.path  
                descendant_text = descendant.text  
                probability_of_title_with_length = self._probability_of_title_with_length(len(descendant_text))  
                # probability_of_title_with_descendants = self.probability_of_title_with_descendants(descendant)  
                # TODO: add more quota to calculate probability_of_title  
                probability_of_title = probability_of_title_with_length  
                probabilities_of_title[path].append(probability_of_title)  
  
        # get most probable tag_path  
        probabilities_of_title_avg = {k: np.mean(v) for k, v in probabilities_of_title.items()}  
        if not probabilities_of_title_avg:  
            return None  
        best_path = max(probabilities_of_title_avg.items(), key=operator.itemgetter(1))[0]  
        logger.log('inspect', f'best tag path {best_path}')  
  
        # extract according to best tag path  
        result = []  
        for element in cluster:  
            descendants = element.a_descendants  
            for descendant in descendants:  
                path = descendant.path  
                if path != best_path:  
                    continue  
                title = descendant.text  
                url = descendant.attrib.get('href')  
                if not url:  
                    continue  
                if url.startswith('//'):  
                    url = 'http:' + url  
                base_url = self.kwargs.get('base_url')  
                if base_url:  
                    url = urljoin(base_url, url)  
                result.append({  
                    'title': title,  
                    'url': url  
                })  
        return result  
  
    def process(self, element: Element):  
        """  
        extract content from html        :param element:        :return:  
        """        # preprocess  
        preprocess4list_extractor(element)  
  
        # build clusters  
        clusters = self._build_clusters(element)  
        logger.log('inspect', f'after build clusters {clusters}')  
  
        # choose best cluster  
        best_cluster = self._best_cluster(clusters)  
        logger.log('inspect', f'best cluster {best_cluster}')  
  
        extended_cluster = self._extend_cluster(best_cluster)  
        logger.log('inspect', f'extended cluster {extended_cluster}')  
  
        # extract result from best cluster  
        return self._extract_cluster(best_cluster)  
  
  
list_extractor = ListExtractor()  
  
  
def extract_list(html, **kwargs):  
    """  
    extract list from index html    :param: base_url  
    :return:  
    """    return list_extractor.extract(html, **kwargs)

三、智能分辨列表页和详细页

由于结果只有两种,要么是列表页,要么是详细页;这里我们可以使用 svm 来完成分类任务;

这里有几个可以用来区别列表页和详细页的特征:

  • 文本密度:正文页通常会包含密集的文字,比如一个 p 节点内部就包含几十上百个文字,如果用单个节点内的文字数目来表示文本密度的话,那么详情页的部分内容文本密度会很高。
  • 超链接节点的数量和比例:一般来说列表页通常会包含多个超链接,而且很大比例都是超链接文本,而详情页却有很多的文字并不是超链接,比如正文。
  • 符号密度:一般来说列表页通常会是一些标题导航,一般可能都不会包含句号,而详情页的正文内容通常就会包含句号等内容,如果按照单位文字所包含的标点符号数量来表示符号密度的话,后者的符号密度也会高一些。
  • 列表簇的数目:一般来说,列表页通常会包含多个具有公共父节点的条目,多个条目构成一个列表簇,虽然说详情页侧栏也会包含一些列表,但至少这个数量也可以成为一个特征来判别。
  • meta 信息:有一些特殊的 meta 信息是列表页独有的,比如只有详情页才会有发布时间,而列表页通常是没有的。
  • 正文标题和 title 标题相似度:一般来说,详情页的正文标题和 title 标题很可能是相同的内容,而列表页通常则是站点的名称。

将现有的 HTML 文本进行预处理,把上面的一些特征提取出来,然后直接声明一个 SVM 分类模型即可。 这里声明了一个 feature 名字和对应的处理方法:

self.feature_funcs = {  
    'number_of_a_char': number_of_a_char,  
    'number_of_a_char_log10': self._number_of_a_char_log10,  
    'number_of_char': number_of_char,  
    'number_of_char_log10': self._number_of_char_log10,  
    'rate_of_a_char': self._rate_of_a_char,  
    'number_of_p_descendants': number_of_p_descendants,  
    'number_of_a_descendants': number_of_a_descendants,  
    'number_of_punctuation': number_of_punctuation,  
    'density_of_punctuation': density_of_punctuation,  
    'number_of_clusters': self._number_of_clusters,  
    'density_of_text': density_of_text,  
    'max_density_of_text': self._max_density_of_text,  
    'max_number_of_p_children': self._max_number_of_p_children,  
    'has_datetime_meta': self._has_datetime_mata,  
    'similarity_of_title': self._similarity_of_title,  
}  
self.feature_names = self.feature_funcs.keys()

以上方法就是特征和对应的获取方法,具体根据实际情况实现即可。 然后关键的部分就是对数据的处理和模型的训练了,关键代码如下:

list_file_paths = list(glob(f'{DATASETS_LIST_DIR}/*.html'))
detail_file_paths = list(glob(f'{DATASETS_DETAIL_DIR}/*.html'))

x_data, y_data = [], []

for index, list_file_path in enumerate(list_file_paths):
    logger.log('inspect', f'list_file_path {list_file_path}')
    element = file2element(list_file_path)
    if element is None:
        continue
    preprocess4list_classifier(element)
    x = self.features_to_list(self.features(element))
    x_data.append(x)
    y_data.append(1)

for index, detail_file_path in enumerate(detail_file_paths):
    logger.log('inspect', f'detail_file_path {detail_file_path}')
    element = file2element(detail_file_path)
    if element is None:
        continue
    preprocess4list_classifier(element)
    x = self.features_to_list(self.features(element))
    x_data.append(x)
    y_data.append(0)

# preprocess data
ss = StandardScaler()
x_data = ss.fit_transform(x_data)
joblib.dump(ss, self.scaler_path)
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.2, random_state=5)

# set up grid search
c_range = np.logspace(-5, 20, 5, base=2)
gamma_range = np.logspace(-9, 10, 5, base=2)
param_grid = [
    {'kernel': ['rbf'], 'C': c_range, 'gamma': gamma_range},
    {'kernel': ['linear'], 'C': c_range},
]
grid = GridSearchCV(SVC(probability=True), param_grid, cv=5, verbose=10, n_jobs=-1)
clf = grid.fit(x_train, y_train)
y_true, y_pred = y_test, clf.predict(x_test)
logger.log('inspect', f'n{classification_report(y_true, y_pred)}')
score = grid.score(x_test, y_test)
logger.log('inspect', f'test accuracy {score}')
# save model
joblib.dump(grid.best_estimator_, self.model_path)

这里首先对数据进行预处理,然后将每个 feature 存 存到 x_data 中,标注结果存到 y_data 中。接着我们使用 StandardScaler 对数据进行标准化处理,然后进行随机切分。最后使用 GridSearch 训练了一个 SVM 模型然后保存了下来。 以上便是基本的模型训练过程,具体的代码可以再完善一下。

四、完整的库

  1. GeneralNewsExtractor/GeneralNewsExtractor: 新闻网页正文通用抽取器 Beta 版. (github.com)
  2. Gerapy/GerapyAutoExtractor: Auto Extractor Module (github.com)

4.1 参考文献

  • 面向不规则列表的网页数据抽取技术的研究
  • 基于文本及符号密度的网页正文提取方法
  • 基于块密度加权标签路径特征的Web新闻在线抽取
  • 基于DOM树和视觉特征的网页信息自动抽取

4.2 Project

  • GeneralNewsExtractor
  • Readability

http://www.kler.cn/a/300517.html

相关文章:

  • 【大数据2025】Yarn 总结
  • 【Android】蓝牙电话HFP连接源码分析
  • 口令攻击和钓鱼攻击
  • Linux 操作二:文件映射与文件状态
  • MySQL HASH索引详解
  • Typora + PowerShell 在终端打开文件
  • 【大数据算法】一文掌握大数据算法之:空间亚线性算法。
  • windows和linux安装mysql5.7.31保姆级教程
  • C/C++程序的内存开辟
  • MySQL数据库 — Explain命令
  • hadoop分布式搭建
  • 贪心算法day29|134. 加油站(理解有难度)、135. 分发糖果、860. 柠檬水找零、406. 根据身高重建队列
  • 最佳实践-模板设计模式
  • 横版闯关手游【全明星时空阿拉德】Linux手工服务端+运营后台+双app端
  • git:认识git和基本操作(1)
  • 手写Promise
  • 《实现 HTML 图片轮播效果》
  • <<编码>> 第 5 章 绕过拐弯的通信(Seeing Around Corners) 示例电路
  • 深入浅出 Ansible 自动化运维:从入门到实战
  • C++ Primer Plus(速记版)-基本语言
  • 网络安全入门教程(非常详细)从零基础入门到精通
  • 多线程:java中的实现
  • flink中slotSharingGroup() 的详解
  • MySQL索引优化与B+树【后端 14】
  • GO 闭包
  • Python | Leetcode Python题解之第396题旋转函数