当前位置: 首页 > article >正文

爬虫——playwright获取亚马逊数据

目录

  • playwright简介
  • 使用playwright初窥亚马逊
    • 安装playwright
    • 打开亚马逊页面
  • 搞数据
    • 搜索
    • 修改bug
    • 数据获取
    • 翻页
      • 优化结构
    • 简单保存

playwright简介

playwright是微软新出的一个测试工具,与selenium类似,不过与selenium比起来还是有其自身的优势的(除了教程少是弱项)。

  • 任何浏览器 • 任何平台 • 一个 API

    • 跨浏览器。 Playwright 支持所有现代渲染引擎,包括 Chromium、WebKit 和 Firefox。
    • 跨平台。 在 Windows、Linux 和 macOS 上进行本地或 CI 测试,无头或有头。
    • 跨语言。 在 TypeScript、JavaScript、Python、.NET、Java 中使用 Playwright API。
    • 测试移动网络。 适用于 Android 的 Google Chrome 和 Mobile Safari 的原生移动模拟。 相同的渲染引擎可以在桌面和云端运行。
      (上面是抄官网的, 下面才是个人的使用体验)
  • 方便

    • 配置方便。与selenium的配置相比较的话,不需要下载额外的驱动,也不需要下载对应版本的浏览器。(就是配置地址不能修改)。
    • 可以异步selenium是只能阻塞式的执行,而playwright可以执行异步。
    • 内存占用少。好像确实是比较少一点。

使用playwright初窥亚马逊

通常情况,每家电商平台都有自己的反爬措施,亚马逊也不例外,所以要尝试一下,当然如果是scrapy这个爬虫框架的话,似乎可以越过反爬措施,内部的逻辑代码是什么没有深究,现在先拿过来用用。

安装playwright

同样的,这个第三方库也要安装,直接使用pip安装就行

pip install playwright

安装完毕之后,还需要安装一些依赖,只要执行一句话即可。这样就能安装了。

playwright install

如果不能安装,或者安装卡进度条了,可以查看这个教程

打开亚马逊页面

小试牛刀一把,看看这个库怎么样

import asyncio
import time
import traceback
from playwright.async_api import async_playwright

class AmazonBrowser:
    def __init__(self, headless=True):
        self.headless = headless
        self.browser = None
        self.page = None
        self.playwright = None

    async def start(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=False)
        self.page = await self.browser.new_page()
        await self.page.goto("https://www.amazon.com/")
        time.sleep(60)
        await self.browser.close()

if __name__ == "__main__":
    asyncio.run(AmazonBrowser().start())

在这里插入图片描述
what!!! 不愧是美国注明的电商平台,上来就是验证码=。=还有点不好搞定。

那试试添加headers,应该可以

class AmazonBrowser:
    def __init__(self, headless=True):
        self.headless = headless
        self.browser = None
        self.page = None
        self.playwright = None
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Referer": "https://www.google.com/", # 伪造,告诉亚马逊网站,说我是从Google搜索页面点击进来的。
        }

在这里插入图片描述
诶嘿! 来了!

搞数据

在第一个页面里面,没有我们想要的数据,所以我们需要输入内容,然后从新的页面中搞到要的数据。
获取元素 → 搜索 → 搞数据 应该是这个思路。

搜索

我们需要获取2个元素,一个元素是搜索框,另一个元素是搜索按钮(当然搞回车按键也行),从F12里面可以得知,搜索框是下面的内容:
在这里插入图片描述
搜索按钮是下面的:
在这里插入图片描述
先搞定输入框:

import random # 引入random库
class AmazonBrowser:
    async def start(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=False)
        self.page = await self.browser.new_page()
        await self.page.set_extra_http_headers(self.headers)
        await self.page.goto("https://www.amazon.com/")
		search_box = self.page.locator("input#twotabsearchtextbox")# 这里的input有个id,就是twotabsearchtextbox,这个id应该是独立的,所以直接用它。
        await search_box.wait_for(state="visible")
        await search_box.type('键盘', delay=random.uniform(50, 200)) 
        time.sleep(60)

【细节讲解】

  • locator:在Playwright中,locator的作用是用于定位页面上的元素,语法有很多:
    • "text=登录"是通过文本匹配,
    • "#username"是通过ID匹配,
    • "//button[@type='submit']"是通过xpath匹配,
    • "a.s-pagination-next.s-pagination-separator"通过css匹配,
    • "a.s-pagination-next.s-pagination-separator:not([aria-disabled="true"])",这个则是排除具有aria-disabled="true"属性的元素。
  • type:在playwright中,type的作用时将文字打入输入框中,与之类似的方法是fill,只不过前者是将文字挨个键入,而后者则是将文字一次性输入,在严格监听页面的网页中,type的的功能用的会比较多,因为可以模拟人的操作,而fill则不行。

再执行看看~
在这里插入图片描述
芜湖~
在这里插入图片描述
同样的,找到搜索按钮并点击就能跳转过去了。

class AmazonBrowser:
    async def start(self):
    # ...
        await search_box.type("键盘", delay=random.uniform(50, 200))
        await self.page.locator("input#nav-search-submit-button").click()

在这里插入图片描述

修改bug

有的人可能会遇到另外一种情况:
在这里插入图片描述
这个时候搜索输入框和搜索按钮的id就已经不一样了,这种情况的搜索输入框是id="nav-bb-search",搜索按钮是没有id的只有class="nav-bb-button",所以要修改上面的代码

class AmazonBrowser:
    async def start(self):
		#...
		# 同时检测两种搜索框的存在
        twotab_exists = await self.page.locator("input#twotabsearchtextbox").count()
        navbb_exists = await self.page.locator("input#nav-bb-search").count()
        if twotab_exists:
            search_box = self.page.locator("input#twotabsearchtextbox")
            search_btn = self.page.locator("input#nav-search-submit-button")
        elif navbb_exists:
            search_box = self.page.locator("input#nav-bb-search")
            search_btn = self.page.locator("input.nav-bb-button")
        else:
            raise Exception("No search box found")
        await search_box.wait_for(state="visible")
        await search_box.type("键盘", delay=random.uniform(50, 200))
        await search_btn.click()

数据获取

我们先来看看要搞哪些数据:
在这里插入图片描述
从这个截图上看,有图片,标题,评分,价格。不过有一点是:第二个图片里面有2个价格,到底那个是真实的价格呢?点进去发现,第一个价格是原价,第二个价格好像是二手价格。
在这里插入图片描述
那就弄原价吧,不然太麻烦了。可是他的标题有了,要是标题重复了怎么办?从F12里面可以发现data-asin应该是唯一的,那我们就把这个ASIN弄到手,当然后面也有一个uuid,弄那个也行。
在这里插入图片描述

# 新增依赖库
from urllib.parse import urljoin

class AmazonBrowser:
    async def start(self):
    	#...
    	await self.page.wait_for_selector('div[role="listitem"][data-asin]', timeout=15000)
        items = await self.page.locator('div[role="listitem"][data-asin]').all()
        batch_data = []
        for index, item in enumerate(items):
        	asin = await item.get_attribute("data-asin")
        	uuid = await item.get_attribute("data-uuid")
        	img = item.locator('img.s-image').first
        	await img.scroll_into_view_if_needed()
        	await self.page.wait_for_timeout(500)
        	src = await img.get_attribute('src')
			href = await item.locator('a.a-link-normal.s-line-clamp-2').get_attribute('href')
			product = await item.locator('h2.a-size-medium > span').first.inner_text()
			price_section = item.locator('span.a-price[data-a-color="base"]')
			if await price_section.count() > 0:
				price = await price_section.locator('span.a-offscreen').first.inner_text()
			else:
				price_section_secondary = item.locator('div.a-row.a-size-base.a-color-secondary')
                if await price_section_secondary.count() > 0:
            		price = await price_section_secondary.locator('span.a-color-base').first.inner_text()
            	else:
            		price = '暂无价格'
            batch_data.append({
                        'Product': product,
                        'UUID': uuid,
                        'ASIN': asin,
                        'ImageURL': src,
                        'URL': urljoin(self.target_url, href) if href else None,
                        'Price': price,
                    })

【细节讲解】

  • wait_for_selector:是等待页面上符合选择器的元素出现,最长等待15秒。
    • div[role="listitem"][data-asin]:是选择<div>元素,要求这个元素里面有[role="listitem"]以及[data-asin]这两个东西。
  • await,异步标志,有些情况下需要使用,有些情况下则不需要使用:
    • 需要使用的情况:
      • 涉及网络请求:如 goto()fetch()
      • 获取异步返回的内容:如 text_content()inner_text()
      • 涉及页面交互的操作:如点击、输入、等待
      • 需要时间完成的操作:如 wait_for_selector()
    • 不需要使用的情况:
      • locator()对象
      • 只定义选择器,不交互
      • 直接获取属性,如browser.name

翻页

一页当然是不行的,一页才16条数据,所以肯定要翻页的。同理从F12里面找到翻页标签,那就是
在这里插入图片描述
然后仅有这个也不够,我们还要知道终止,也就是不能点击的情况下,是什么情况的,所以翻到20页得到
在这里插入图片描述
所以需要设计选择器,来判断下一页按钮能不能按下去,或者是能不能检测到终止的<span>

class AmazonBrowser:
	async def start(self):
		#...
		disabled_selector = 'span.s-pagination-next.s-pagination-disabled'
		enabled_selector = 'a.s-pagination-next:not([aria-disabled="true"])'
		# 当检测到disable_selector时就继续
		while True:
			if await self.page.locator(disabled_selector).count() > 0:
				break
			next_btn = self.page.locator(enabled_selector)
			await next_btn.scroll_into_view_if_needed()
			        # 点击前确保元素可交互
        	await next_btn.wait_for(state="visible")
        	await next_btn.click()
        	await self.page.wait_for_timeout(2000)  # 基础等待

在这里插入图片描述

优化结构

但是仅有循环,没有获取数据,这也不是我们想要的,所以接下来是优化结构。将数据获取也翻页分开。

class AmazonBrowser:
	# ...
	async def data_collection(self):
        await self.page.wait_for_selector('div[role="listitem"][data-asin]', timeout=15000)
        items = await self.page.locator('div[role="listitem"][data-asin]').all()
        for index, item in enumerate(items):
            asin = await item.get_attribute("data-asin")
            uuid = await item.get_attribute("data-uuid")
            img = item.locator('img.s-image').first
            await img.scroll_into_view_if_needed()
            await self.page.wait_for_timeout(500)
            src = await img.get_attribute('src')
            href = await item.locator('a.a-link-normal.s-line-clamp-2').get_attribute('href')
            product = await item.locator('h2.a-size-medium > span').first.inner_text()
            price_section = item.locator('span.a-price[data-a-color="base"]')
            if await price_section.count() > 0:
                price = await price_section.locator('span.a-offscreen').first.inner_text()
            else:
                price_section_secondary = item.locator('div.a-row.a-size-base.a-color-secondary')
                if await price_section_secondary.count() > 0:
                    price = await price_section_secondary.locator('span.a-color-base').first.inner_text()
                else:
                    price = '暂无价格'
			# self.batch_data要提前声明哦
            self.batch_data.append({
                'Product': product,
                'UUID': uuid,
                'ASIN': asin,
                'ImageURL': src,
                'URL': urljoin(self.target_url, href) if href else None,
                'Price': price,
            })
	async def start(self):
		#...
		disabled_selector = 'span.s-pagination-next.s-pagination-disabled'
		enabled_selector = 'a.s-pagination-next:not([aria-disabled="true"])'
		# 当检测到disable_selector时就继续
		while True:
			if await self.page.locator(disabled_selector).count() > 0:
				break
			# 在这里添加
			await self.data_collection()
			next_btn = self.page.locator(enabled_selector)
			await next_btn.scroll_into_view_if_needed()
			        # 点击前确保元素可交互
        	await next_btn.wait_for(state="visible")
        	await next_btn.click()
        	await self.page.wait_for_timeout(2000)  # 基础等待

这样就搞定啦,但是数据要怎么保存呢?可以有两种方式,一种方式是保存在数据库里面,比如mongodb,mysql等等,还可以保存在csv文件里面。

简单保存

在这里就简单的保存成csv文件的样子吧,也就是直接使用pandas保存就行了,下一篇章就搞一下mongodb

# 导入依赖库
import pandas as pd
class AmazonBrowser:
	def __init__(self):
		self.df = pd.DataFrame()
	#... 
	async def start(self):
		while True:
			#...while循环内容
		new_df = pd.DataFrame(self.batch_data)
        self.df = pd.concat([self.df, new_df], ignore_index=True)
        self.df.to_csv(path)

就酱~
在这里插入图片描述


http://www.kler.cn/a/596473.html

相关文章:

  • 2025年3月AI搜索发展动态与趋势分析:从技术革新到生态重构
  • 清洁机器人垃圾物识别与智能分类回收系统研究(大纲)
  • 【 <二> 丹方改良:Spring 时代的 JavaWeb】之 Spring Boot 中的 RESTful API 设计:从上手到骨折
  • MyBatis 中 #{} 和 ${} 的区别详解
  • [思考记录]两则:宏观视角、理想化
  • 高等数学-第七版-上册 选做记录 习题5-2
  • 手机怎么换网络IP有什么用?操作指南与场景应用‌
  • MySQL高频八股——索引
  • Cables Finance 即将发布新的积分奖励计划及代币空投
  • 项目日记 -云备份 -服务器配置信息模块
  • Kafka consumer_offsets 主题深度剖析
  • SSE详解面试常考问题详解
  • HTTP 失败重试(重发)方案
  • PHP 应用留言板功能超全局变量数据库操作第三方插件引用
  • PRODIGY: “不折腾人”的蛋白-蛋白/蛋白-小分子结合能计算工具
  • 多线程14(哈希表与文件操作IO)
  • 数据结构(排序(上)):冒泡、选择、插入
  • Vue.js 模板语法全解析:从基础到实战应用
  • Java8 流式分组(groupingBy)与分区(partitioningBy)深度解析
  • 复现关于图片重构方向的项目