当前位置: 首页 > article >正文

JS爬虫实战之Tiktok中sec_id获取

本文章是使用DP对Tk网页进行抓取

本文仅用于学术交流, 未经允许请勿转载
NOTE: 代码进行了部分阉割, 不能直接使用

import threading, os, queue, shutil, requests, traceback, random, json, pymysql, redis, time, logging



# 创建一个任务队列
task_queue = queue.Queue()

def console_out(logFilename):
    ''''' Output log to file and console '''
    # Define a Handler and set a format which output to file
    logging.basicConfig(
        level=logging.DEBUG,  # 定义输出到文件的log级别,大于此级别的都被输出
        format='%(asctime)s  %(filename)s : %(levelname)s  %(message)s',  # 定义输出log的格式
        datefmt='%Y-%m-%d %A %H:%M:%S',  # 时间
        filename=logFilename,  # log文件名
        filemode='w')  # 写入模式“w”或“a”
    # Define a Handler and set a format which output to console
    console = logging.StreamHandler()  # 定义console handler
    console.setLevel(logging.INFO)  # 定义该handler级别
    formatter = logging.Formatter('%(asctime)s  %(filename)s : %(levelname)s  %(message)s')  # 定义该handler格式
    console.setFormatter(formatter)
    # Create an instance
    logging.getLogger().addHandler(console)  # 实例化添加handler


def parse(resp):
    logging.info("=========================== 开始解析页面 ===========================")
    itemList = resp['itemList']

    logging.info("本次解析有 {} 条数据。".format(len(itemList)))
    result_list = []
    for ind, item in enumerate(itemList):
        json_dict = {}
        author = item['author']
        author_stats = item['authorStats']
        author_stats_friendCount = author_stats['friendCount']
        author_stats_heart = author_stats['heart']
        video_music = item['music']                                         # 视频背景音乐
        video_stats = item['stats']                                         # 视频状态

        if "textExtra" in item:
            video_textExtra = item['textExtra'][0]
            video_textExtra_hashtagName = video_textExtra['hashtagName']        # 话题名称
            video_textExtra_hashtagId = video_textExtra['hashtagId']            # 话题ID
        else:
            video_textExtra_hashtagName = ''
            video_textExtra_hashtagId = ''

        # 构建json
        json_dict['video_id'] = item['id']                                                                              # 视频id
        json_dict['video_desc'] = item['desc']                                                                          # 视频描述
        json_dict['author_info_avatar'] = author['avatarLarger']                                                        # 达人头像(大)
        json_dict['author_info_id'] = author['id']                                                                      # 达人索引id
        json_dict['author_info_nickname'] = author['nickname']                                                          # 达人名称
        json_dict['author_info_signature'] = author['signature']                                                        # 达人描述
        json_dict['author_info_uniqueId'] = author['uniqueId']                                                          # 达人id
        json_dict['author_stats_diggCount'] = author_stats['diggCount']                                                 # 达人点赞数
        json_dict['author_stats_followerCount'] = author_stats['followerCount']                                         # 达人粉丝数
        json_dict['author_stats_followingCount'] = author_stats['followingCount']                                       # 达人关注的人数
        json_dict['author_stats_videoCount'] = author_stats['videoCount']                                               # 达人视频数
        json_dict['author_stats_heartCount'] = author_stats['heartCount']                                               # 达人获赞数
        json_dict['video_music_id'] = video_music['id']                                                                 # 视频背景音乐id
        json_dict['video_music_title'] = video_music['title']                                                           # 视频背景音乐id
        json_dict['video_stats_collectCount'] = video_stats['collectCount']                                             # 视频收藏数
        json_dict['video_stats_diggCount'] = video_stats['diggCount']                                                   # 视频点赞数
        json_dict['video_stats_playCount'] = video_stats['playCount']                                                   # 视频播放量
        json_dict['video_stats_shareCount'] = video_stats['shareCount']                                                 # 视频转发量
        json_dict['video_stats_commentCount'] = video_stats['commentCount']                                             # 视频评论数
        json_dict['video_textExtra_hashtagName'] = video_textExtra_hashtagName
        json_dict['video_textExtra_hashtagId'] = video_textExtra_hashtagId
        result_list.append(json_dict)

    return result_list, len(itemList)


# 生产者线程类
class ProducerThread(threading.Thread):
    def __init__(self):
        super().__init__()

    def run(self):
        # 数据计数
        sum_count = 0
        # 打开浏览器
        driver = ChromiumPage()

        # 监听数据包
        driver.listen.start('https://www.*.com/api/*/item_list')

        # 访问网址
        driver.get(f'https://www.*.com/')

        count = 1
        while True:
            # 等待数据包加载
            logging.info("=========================== start listening ===========================")
            while True:
                resp = driver.listen.wait(timeout=5)
                if resp:
                    logging.info("=========================== 有数据 ===========================")
                    break
                logging.info("=========================== 刷新失败 ===========================")
            logging.info("=========================== listening complate ===========================")
            json_data = resp.response.body
            if json_data:
                json_datas, count_num = parse(json_data)
                task_queue.put(json_datas)
                sum_count += count_num
                time.sleep(10)
            else:
                logging.info("=========================== 划不动了,疯狂下滑中 ===========================")

            driver.scroll.to_bottom()
            logging.info("我们划了 {} 次, 应该获取 {} 条数据".format(count, sum_count))
            print("我们划了 {} 次, 应该获取 {} 条数据".format(count, sum_count))
            time.sleep(10)
            count += 1


# 消费者线程类
class ConsumerThread(threading.Thread):
    def run(self):
        mysql_obj = MysqlClass()
        while True:
            # 从队列获取任务
            result_data = task_queue.get()
            for json_dict in result_data:
                # 达人入库
                daren_sql_str = """
                        INSERT IGNORE INTO user_info_total (tk_id, unique_id, nick_name, avatar, signature, bio_link, region, 
                        follower_count, following_count, heart, video_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) 
                    """

                daren_sql_value = (
                    json_dict['author_info_id'], json_dict['author_info_uniqueId'], json_dict['author_info_nickname'],
                    json_dict['author_info_avatar'], json_dict['author_info_signature'], '', '',
                    json_dict['author_stats_followerCount'], json_dict['author_stats_followingCount'],
                    json_dict['author_stats_heartCount'], json_dict['author_stats_videoCount'])
                print(daren_sql_value)
                mysql_obj.insert_mysql(daren_sql_str, value=daren_sql_value)

                # 视频入库

                video_sql_str = "INSERT IGNORE INTO video_total (tk_id, video_id, video_desc, digg_count, collect_count, comment_count, " \
                                "share_count, play_count) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
                video_sql_value = (json_dict['author_info_id'],
                                   json_dict['video_id'], json_dict['video_desc'], json_dict['video_stats_diggCount'],
                                   json_dict['video_stats_collectCount'], json_dict['video_stats_commentCount'],
                                   json_dict['video_stats_shareCount'], json_dict['video_stats_playCount'])
                print(video_sql_value)
                mysql_obj.insert_mysql(video_sql_str, value=video_sql_value)

            # 标记任务完成
            task_queue.task_done()
            print('----------------------------------------------------------------')


if __name__ == '__main__':
    # 创建生产者线程
    producer_thread = ProducerThread()
    producer_thread.start()
    time.sleep(60)
    # 创建消费者线程
    consumer_threads = []
    for i in range(50):  # 创建100个消费者线程
        consumer_thread = ConsumerThread()
        consumer_threads.append(consumer_thread)
        consumer_thread.start()

    # 等待所有任务处理完成
    task_queue.join()

    # 终止所有线程
    producer_thread.join()

    for thread in consumer_threads:
        thread.join()


打个广告。 测试时获取了部分sec_id。如需沟通,请看主页联系博主


http://www.kler.cn/news/355033.html

相关文章:

  • JavaScript网页设计案例:构建动态交互的在线图书管理系统
  • 3万字66道Java基础面试题总结(2024版本)
  • 个人用数据挖掘笔记(待补充)
  • vb6 MSHFlexGrid1表格导出数据到电子表格 解决只能导出一次问题
  • ubuntu安装mysql5.7
  • RAID 矩阵
  • YOLO系列入门:1、YOLO V11环境搭建
  • python中不变的数据类型有哪些
  • 使用 nrm 管理 npm 镜像源
  • 0基础学Java之Day09(上午完整版)
  • 从零开始了解云WAF,您的网站安全升级指南
  • 直播美颜平台架构设计:基于视频美颜SDK的开发实践
  • 微信小程序引入组件教程
  • 通过比较list与vector在简单模拟实现时的不同进一步理解STL的底层
  • 标准C库总结
  • 第1节 入门
  • WLAN技术
  • 【前端】Matter:基础概念与入门
  • 基于51单片机的PID直流电机调速系统(程序+Proteus仿真+报告+原理图)
  • Rider + xmake DX12 开发环境