1905电影网中国地区电影数据分析(一) - 数据采集、清洗与存储
文章目录
- 前言
- 一、数据采集步骤及python库使用版本
- 1. python库使用版本
- 2. 数据采集步骤
- 二、数据采集网页分析
- 1. 分析采集的字段和URL
- 1.1 分析要爬取的数据字段
- 1.2 分析每部电影的URL
- 1.2 分析每页的URL
- 2. 字段元素标签定位
- 三、数据采集代码实现
- 1. 爬取1905电影网分类信息
- 2. 爬取电影主页HTML
- 3. 解析html并把数据保存到csv文件
- 四、数据清洗与存储代码实现
前言
本项目旨在通过爬取1905电影网的电影数据,展示如何使用Python及相关库进行网页数据采集。本项目将详细介绍数据采集的步骤,包括所需的Python库版本、网页分析、数据提取和保存等环节。我们将使用requests库进行网络请求,利用BeautifulSoup进行HTML解析,并将最终的数据保存为CSV文件,便于后续分析和处理。
一、数据采集步骤及python库使用版本
1. python库使用版本
python | requests | bs4 | beautifulsoup4 | soupsieve | lxml | pandas | sqlalchemy | mysql-connector-python | selenium | |
---|---|---|---|---|---|---|---|---|---|---|
版本 | 3.8.5 | 2.31.0 | 0.0.2 | 4.12.3 | 2.6 | 4.9.3 | 2.0.3 | 2.0.36 | 9.0.0 | 4.15.2 |
2. 数据采集步骤
二、数据采集网页分析
1. 分析采集的字段和URL
1.1 分析要爬取的数据字段
如下图所示,红框部分是要爬取的数据,包含电影标题、电影类型、电影时长、电影片名、电影别名、电影上映时间、电影编剧、电影导演、电影主演、电影剧情等字段。
1.2 分析每部电影的URL
访问中国地区的电影地址:https://www.1905.com/mdb/film/list/country-China/
如下图所示,电影是分页显示,每一页有多部电影,点击单部电影后会调转到对应主页,在对应的主页就有需要爬取的数据,所以需要从每页中解析出单个电影的URL。
如下图所示,检查单部电影的源码后可以看到对应的URL。
复制该部电影的URL为:https://www.1905.com/mdb/film/2248201/(2248201是这部电影的ID)
那么就可以通过解析网页获取到每部电影的URL。
1.2 分析每页的URL
如下图所示,检查源码后发现如下规律:
第二页的URL为:https://www.1905.com/mdb/film/list/country-China/o0d0p2.html
第三页的URL为:https://www.1905.com/mdb/film/list/country-China/o0d0p3.html
第四页的URL为:https://www.1905.com/mdb/film/list/country-China/o0d0p4.html
第五页的URL为:https://www.1905.com/mdb/film/list/country-China/o0d0p5.html
由此推断出:
第一页的URL为:https://www.1905.com/mdb/film/list/country-China/o0d0p1.html(o0d0p1.html可省略)
第n页的URL为:https://www.1905.com/mdb/film/list/country-China/o0d0p{n}.html
2. 字段元素标签定位
示例:定位电影标题元素
定位后的CSS选择器内容为:
body > div.topModule.normalCommon.normal_oneLine > div > div > div.topModule_title.clearfix > div.topModule_title_left.fl > h3 > span
三、数据采集代码实现
1. 爬取1905电影网分类信息
import random
import time
from pathlib import Path
import pandas as pd
import requests
from bs4 import BeautifulSoup
'''
爬取1905电影网分类信息(大分类 main_category,小分类 sub_category,链接 sub_category_link)
'''
def get_request(url, **kwargs):
time.sleep(random.uniform(0.1, 2))
print(f'===============================请求地址:{url} ===============================')
# 定义一组User-Agent字符串
user_agents = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/117.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/117.0',
'Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/117.0',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2040.0',
# Safari
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15',
]
# 请求头
headers = {
'User-Agent': random.choice(user_agents)
}
# 用户名密码认证(私密代理/独享代理)
username = ""
password = ""
proxies = {
"http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password,
"proxy": '36.25.243.5:11768'},
"https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password,
"proxy": '36.25.243.5:11768'}
}
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.get(url=url, timeout=10, headers=headers, **kwargs)
# response = requests.get(url=url, timeout=10, headers=headers, proxies=proxies, **kwargs)
if response.status_code == 200:
return response
else:
print(f"请求失败,状态码: {response.status_code},正在重新发送请求 (尝试 {attempt + 1}/{max_retries})")
except requests.exceptions.RequestException as e:
print(f"请求过程中发生异常: {e},正在重新发送请求 (尝试 {attempt + 1}/{max_retries})")
# 如果不是最后一次尝试,则等待一段时间再重试
if attempt < max_retries - 1:
time.sleep(random.uniform(1, 2))
print('================多次请求失败,请查看异常情况================')
return None # 或者返回最后一次的响应,取决于你的需求
def get_soup(markup):
return BeautifulSoup(markup=markup, features='lxml')
def save_categories_to_csv(response, csv_file_dir='./data_csv/', csv_file_name='category.csv'):
"""
从HTML响应中提取分类信息并保存到CSV文件。
参数:
response (requests.Response): 包含HTML内容的响应对象。
csv_file_dir (str): CSV文件存储目录,默认为'./data_csv/'。
csv_file_name (str): CSV文件名,默认为'category.csv'。
"""
# 确保目录存在
csv_file_dir_path = Path(csv_file_dir)
csv_file_dir_path.mkdir(parents=True, exist_ok=True)
# 解析HTML文档
soup = get_soup(response.text)
# 提取分类信息
data_list = []
tag_srh_group = soup.select("body > div.layout.mainCont.clear > div.leftArea > div > div.col-l-bd > dl.srhGroup.clear")
for tag_srh in tag_srh_group:
tag_dt = tag_srh.select_one('dt')
main_category = tag_dt.text.strip() if tag_dt is not None else None
tag_a_list = tag_srh.select('a')
print(f'===========================解析后的数据如下:===========================')
for tag_a in tag_a_list:
if tag_a is not None:
sub_category = tag_a.text.strip()
sub_category_link = 'https://www.1905.com' + tag_a.get('href', '')
data_dict = {
'main_category': main_category,
'sub_category': sub_category,
'sub_category_link': sub_category_link
}
data_list.append(data_dict)
print(data_dict)
# 创建DataFrame并清理数据
df = pd.DataFrame(data_list)
df_cleaned = df[df['sub_category'].notna() & (df['sub_category'] != '')]
print(f'===========================文件保存路径:{csv_file_dir + csv_file_name}===========================')
# 保存到CSV文件
df_cleaned.to_csv(csv_file_dir + csv_file_name, index=False, encoding='utf-8-sig')
if __name__ == '__main__':
res = get_request("https://www.1905.com/mdb/film/search/")
save_categories_to_csv(res)
保存后的文件内容如下图所示:
2. 爬取电影主页HTML
import random
import time
from pathlib import Path
import requests
from bs4 import BeautifulSoup
def get_request(url, **kwargs):
time.sleep(random.uniform(0.1, 2))
print(f'===============================请求地址:{url} ===============================')
# 定义一组User-Agent字符串
user_agents = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/117.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/117.0',
'Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/117.0',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2040.0',
# Safari
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15',
]
# 请求头
headers = {
'User-Agent': random.choice(user_agents)
}
# 用户名密码认证(私密代理/独享代理)
username = ""
password = ""
proxies = {
"http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password,
"proxy": '36.25.243.5:11768'},
"https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password,
"proxy": '36.25.243.5:11768'}
}
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.get(url=url, timeout=10, headers=headers, **kwargs)
# response = requests.get(url=url, timeout=10, headers=headers, proxies=proxies, **kwargs)
if response.status_code == 200:
return response
else:
print(f"请求失败,状态码: {response.status_code},正在重新发送请求 (尝试 {attempt + 1}/{max_retries})")
except requests.exceptions.RequestException as e:
print(f"请求过程中发生异常: {e},正在重新发送请求 (尝试 {attempt + 1}/{max_retries})")
# 如果不是最后一次尝试,则等待一段时间再重试
if attempt < max_retries - 1:
time.sleep(random.uniform(1, 2))
print('================多次请求失败,请查看异常情况================')
return None # 或者返回最后一次的响应,取决于你的需求
def get_soup(markup):
return BeautifulSoup(markup=markup, features='lxml')
def save_html_file(save_dir, file_name, content):
dir_path = Path(save_dir)
# 确保保存目录存在,如果不存在则创建所有必要的父级目录
dir_path.mkdir(parents=True, exist_ok=True)
# 使用 'with' 语句打开文件以确保正确关闭文件流
with open(save_dir + file_name, 'w', encoding='utf-8') as fp:
print(f"==============================={save_dir + file_name} 文件已保存===============================")
fp.write(str(content))
def save_rough_html_file():
i = 0
save_dir = './rough_html/china/'
while True:
i = i + 1
file_name = f'o0d0p{i}.html'
file_path = Path(save_dir + file_name)
if file_path.exists() and file_path.is_file():
print(f'===============================文件 {file_path} 已存在===============================')
continue
url = f'https://www.1905.com/mdb/film/list/country-China/o0d0p{i}.html'
response = get_request(url)
soup = get_soup(response.text)
tag_ul = soup.select_one('body > div.layout.mainCont.clear > div.leftArea > ul')
if tag_ul.text.strip() is None or tag_ul.text.strip() == '':
print(f'===============================网页爬取完成===============================')
break
save_html_file(save_dir, file_name, response.text)
def save_detail_info_html_file():
i = 0
save_dir = './detail_html/china/'
while True:
i = i + 1
url = f'https://www.1905.com/mdb/film/list/country-China/o0d0p{i}.html'
response = get_request(url)
soup = get_soup(response.text)
tag_ul = soup.select_one('body > div.layout.mainCont.clear > div.leftArea > ul')
if tag_ul.text.strip() is None or tag_ul.text.strip() == '':
print(f'===============================网页爬取完成===============================')
break
tag_li_list = tag_ul.select('li')
for tag_li in tag_li_list:
tag_a_href = tag_li.find('a').attrs.get('href')
movie_url = f'https://www.1905.com{tag_a_href}'
movie_id = tag_a_href.split('/')[-2]
file_name = f'{movie_id}.html'
file_path = Path(save_dir + file_name)
if file_path.exists() and file_path.is_file():
print(f'===============================文件 {file_path} 已存在===============================')
continue
detail_response = get_request(movie_url)
if detail_response is None:
continue
save_html_file(save_dir, file_name, detail_response.text)
if __name__ == '__main__':
# save_rough_html_file()
save_detail_info_html_file()
爬取后保存的部分html文件如下图所示:
3. 解析html并把数据保存到csv文件
from pathlib import Path
import pandas as pd
from bs4 import BeautifulSoup
def get_soup(markup):
return BeautifulSoup(markup=markup, features='lxml')
def parse_detail_html_to_csv():
# 定义CSV文件路径
csv_file_dir = '../1905movie/data_csv/'
csv_file_name = 'detail_1905movie_dataset.csv'
csv_file_path = Path(csv_file_dir + csv_file_name)
csv_file_dir_path = Path(csv_file_dir)
csv_file_dir_path.mkdir(parents=True, exist_ok=True)
detail_dir = Path('./detail_html/china/')
detail_file_list = detail_dir.rglob('*.html')
movie_data_list = []
i = 0
count = 0
for detail_file in detail_file_list:
movie_id = str(detail_file).split('\\')[-1].split('.')[0]
movie_url = f'https://www.1905.com/mdb/film/{movie_id}/'
soup = get_soup(open(file=detail_file, mode='r', encoding='utf-8'))
tag_img_url = soup.select_one('div.topModule_bottom_poster.picHover.fl img')
movie_img_url = tag_img_url.attrs.get('src') if tag_img_url is not None else None
tag_div_topmodule_title_right = soup.select_one('div.topModule_title_right.fr')
tag_evaluation_name = tag_div_topmodule_title_right.select_one('div.evaluation-name')
tag_judge_soon_fl = tag_div_topmodule_title_right.select_one('div.judge-soon.fl')
movie_rating = tag_evaluation_name.text if tag_evaluation_name is not None else None
movie_status = tag_judge_soon_fl.text if tag_judge_soon_fl is not None else '已上映'
tag_topmodule_title_left_fl = soup.select_one('div.topModule_title_left.fl')
tag_h3_span = tag_topmodule_title_left_fl.select_one('h3 > span')
movie_title = tag_h3_span.text if tag_h3_span is not None else None
tag_li = tag_topmodule_title_left_fl.select_one('li.topModule_line')
movie_genres = str(tag_li.find_next_sibling('li').text.strip()).split() if tag_li is not None else None
tag_li5 = tag_topmodule_title_left_fl.select_one('div > ul > li:nth-child(5)')
movie_duration = tag_li5.text.strip() if tag_li5 is not None else None
tag_div_left_top = soup.select_one('div#left_top')
tag_ul_consmodule_infos = tag_div_left_top.select_one(
'ul.consModule_infos.consModule_infos_l.fixedWidth.fl') if tag_div_left_top is not None else None
tag_li_em_release_date = tag_ul_consmodule_infos.find(name='span',
string='上映时间') if tag_ul_consmodule_infos is not None else None
movie_release_date = tag_li_em_release_date.find_next_sibling().text.strip() if tag_li_em_release_date is not None else None
tag_li_em_director = tag_ul_consmodule_infos.select_one(
'li > em > a') if tag_ul_consmodule_infos is not None else None
movie_director = tag_li_em_director.text.strip() if tag_li_em_director is not None else None
tag_ul_consmodule_infos_r = tag_div_left_top.select_one(
'ul.consModule_infos.consModule_infos_r.fl') if tag_div_left_top is not None else None
tag_alternative_titles = tag_ul_consmodule_infos_r.select_one(
'li > em') if tag_ul_consmodule_infos_r is not None else None
movie_alternative_titles = tag_alternative_titles.text if tag_alternative_titles is not None else None
tag_adaptation_source = tag_ul_consmodule_infos_r.find(name='span',
string='改编来源') if tag_ul_consmodule_infos_r is not None else None
movie_adaptation_source = tag_adaptation_source.find_next_sibling().text if tag_adaptation_source is not None else None
tag_screenwriter = tag_ul_consmodule_infos_r.select_one(
'li > em > a') if tag_ul_consmodule_infos_r is not None else None
movie_screenwriter = tag_screenwriter.text if tag_screenwriter is not None else None
tag_lead_actors = soup.select_one('#left_top > div > ul > li')
tag_lead_actors_a_list = tag_lead_actors.select('a') if tag_lead_actors is not None else None
movie_lead_actors = [tag.text for tag in tag_lead_actors_a_list] if tag_lead_actors_a_list is not None else []
tag_plot = soup.select_one('#left_top > ul > li.plotItem.borderStyle > div > a')
movie_plot = tag_plot.text if tag_plot is not None else None
movie_data_dict = {
'movie_id': movie_id,
'movie_url': movie_url,
'movie_img_url': movie_img_url,
'movie_duration': movie_duration,
'movie_title': movie_title,
'movie_director': movie_director,
'movie_release_date': movie_release_date,
'movie_status': movie_status,
'movie_rating': movie_rating,
'movie_genres': movie_genres,
'movie_lead_actors': movie_lead_actors,
'movie_alternative_titles': movie_alternative_titles,
'movie_adaptation_source': movie_adaptation_source,
'movie_screenwriter': movie_screenwriter,
'movie_plot': movie_plot
}
i = i + 1
print(f'===============================第{i}行数据,解析后的数据如下:===============================')
print(movie_data_dict)
print('=============================================================================================')
movie_data_list.append(movie_data_dict)
count = count + 1
if count == 200:
df = pd.DataFrame(movie_data_list)
if not csv_file_path.exists():
df.to_csv(csv_file_dir + csv_file_name, index=False, encoding='utf-8-sig')
else:
df.to_csv(csv_file_dir + csv_file_name, index=False, encoding='utf-8-sig', mode='a', header=False)
movie_data_list = []
count = 0
print(
f'===============================解析后的电影数据已保存到 {csv_file_dir + csv_file_name} 文件===============================')
if count != 0:
df = pd.DataFrame(movie_data_list)
df.to_csv(csv_file_dir + csv_file_name, index=False, encoding='utf-8-sig', mode='a', header=False)
print(
f'===============================解析后的电影数据已全部保存到 {csv_file_dir + csv_file_name} 文件===============================')
if __name__ == '__main__':
parse_detail_html_to_csv()
保存后的文件内容如下图:
四、数据清洗与存储代码实现
import re
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine
def read_csv_to_df(file_path):
# 加载CSV文件到DataFrame
return pd.read_csv(file_path, encoding='utf-8')
def contains_hours(text):
if pd.isna(text): # 检查是否为 NaN 或 None
return False
pattern = r'\d+\s*(小时|h|hours?|hrs?)'
return bool(re.search(pattern, text))
def convert_to_minutes(duration_str):
parts = str(duration_str).replace('小时', ' ').replace('分钟', '').split()
hours = int(parts[0]) if len(parts) > 0 else 0
minutes = int(parts[1]) if len(parts) > 1 else 0
return hours * 60 + minutes
# 定义一个函数来清理和标准化日期
def clean_and_standardize_date(date_str):
date_str_cleaned = str(date_str)
# 移除括号及其内容
if '(' in date_str_cleaned:
date_str_cleaned = date_str.split('(')[0]
# 尝试匹配并解析完整的日期格式
if "年" in date_str_cleaned and "月" in date_str_cleaned and "日" in date_str_cleaned:
date_obj = datetime.strptime(date_str_cleaned, "%Y年%m月%d日")
elif "年" in date_str_cleaned and "月" in date_str_cleaned:
date_obj = datetime.strptime(date_str_cleaned, "%Y年%m月")
date_obj = date_obj.replace(day=1) # 设置为该月的第一天
elif "年" in date_str_cleaned:
date_obj = datetime.strptime(date_str_cleaned, "%Y年")
date_obj = date_obj.replace(month=1, day=1) # 设置为该年的第一天
else:
return None # 如果不符合任何已知模式,则返回 None 或其他默认值
return date_obj.strftime("%Y-%m-%d") # 返回标准格式的字符串
# 定义函数:清理和转换数据格式
def clean_and_transform(df):
# 筛选出电影状态为“已上映”的数据
df = df[df['movie_status'] == '已上映']
# 删除电影标题为空的行
df.dropna(subset=['movie_title'], inplace=True)
# 删除id相同的数据
df.drop_duplicates(subset=['movie_id'], inplace=True)
# 电影时长字段处理
df['movie_duration'] = df['movie_duration'].apply(lambda x: x if contains_hours(x) else None)
if df['movie_duration'].isnull().sum() != 0:
df['movie_duration'] = df['movie_duration'].fillna(method='ffill')
df['movie_duration'] = df['movie_duration'].apply(convert_to_minutes)
# 发布日期字段处理
df['movie_release_date'] = df['movie_release_date'].apply(clean_and_standardize_date)
if df['movie_release_date'].isnull().sum() != 0:
df['movie_release_date'] = df['movie_release_date'].fillna(method='ffill')
# 评分字段处理
df['movie_rating'] = df['movie_rating'].astype('float').round(1)
if df['movie_rating'].isnull().sum() != 0:
df['movie_rating'] = df['movie_rating'].interpolate()
# 类型字段处理
if df['movie_genres'].isnull().sum() != 0:
df['movie_genres'] = df['movie_genres'].fillna(method='ffill')
# 其他空值字段处理
df = df.fillna('未知')
return df
def save_df_to_db(df):
# 设置数据库连接信息
db_user = 'root'
db_password = 'zxcvbq'
db_host = '127.0.0.1' # 或者你的数据库主机地址
db_port = '3306' # MySQL默认端口是3306
db_name = 'movie1905'
# 创建数据库引擎
engine = create_engine(f'mysql+mysqlconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
# 将df写入MySQL表
df.to_sql(name='movie1905_china', con=engine, if_exists='replace', index=False)
print("所有csv文件的数据已成功清洗并写入MySQL数据库")
if __name__ == '__main__':
csv_file = r'./data_csv/detail_1905movie_dataset.csv'
dataframe = read_csv_to_df(csv_file)
dataframe = clean_and_transform(dataframe)
save_df_to_db(dataframe)
清洗并存储后的部分数据如下图所示: