在第一版的基础上进行了改进,增加了IP代理的功能。但是在selenium模块来爬取的时候,增加ip代理也不太可靠,需要优质的IP才能保证流畅的进行。
# coding: utf-8
# Author:南岛鹋
# Blog: www.ndmiao.cn
# Date :2020/8/24 14:35
# Tool :PyCharm
from selenium import webdriver
import re
import csv
import time
import random
class Bilibili_data:
def __init__(self):
self.start_url="https://www.bilibili.com/"
self.driver=webdriver.Chrome() # 调用谷歌浏览器
def get_item_list(self):
list = self.driver.find_elements_by_xpath("//*[@id='primaryChannelMenu']/*/*/*/span")
item_list = []
i = 0
for element in list:
item = {}
str = re.sub("[A-Za-z0-9\!\%\[\]\+\。]", "", element.text)
item["str"] = str
item["url"] = element.find_element_by_xpath("./..")
item_list.append(item)
i = i+1
if i == 15:
break
return item_list
def get_item_detail(self,url):
url.click()
list = self.driver.find_elements_by_xpath("//ul[@class='clearfix']/*[position()>1]/*")
i = 0
item_detail = []
for element in list:
item = {}
item["str"] = str(i) + ':' + element.text
i = i + 1
item["url"] = element
item_detail.append(item)
return item_detail
def choose_time(self,url):
url_last = "#/all/{}/0/1/{}"
item = ['click', 'scores', 'stow', 'coin', 'dm']
cn_item = ['播放数','评论数','收藏数','硬币数','弹幕数']
num = 0
for i in cn_item:
print(str(num) + ':' + i)
num = num+1
item_choice = int(input('请输入你选择的排序:'))
time_choice = input('请输入时间段(例如 2020-01-01,2020-01-07):')
url = url + url_last.format(item[item_choice],time_choice)
self.driver.get(url)
def get_content_list(self):
li_list = self.driver.find_elements_by_xpath("//ul[@class='vd-list mod-2']/li")
content_list = []
for li in li_list:
video_detail = {}
video_detail['title'] = li.find_element_by_xpath(".//div[@class='r']/a").text
video_detail['author'] = li.find_element_by_xpath(".//div[@class='up-info']/a").text
video_detail['href'] = li.find_element_by_xpath(".//div[@class='r']/a").get_attribute("href")
author_href = li.find_element_by_xpath(".//div[@class='up-info']/a").get_attribute("href")
video_detail['mid'] = re.findall(r'\d+',author_href)[0]
content_list.append(video_detail)
print(content_list)
next_url = self.driver.find_elements_by_xpath("//button[@class='nav-btn iconfont icon-arrowdown3']")
next_url = next_url[0] if len(next_url) > 0 else None
return content_list,next_url
def save_content_list(self,content_list):
header = ['title','author','href','mid']
with open('video.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writerows(content_list) # 写入数据
def random_sleep(self,mu=3, sigma=0.4):
'''正态分布随机睡眠
:param mu: 平均值
:param sigma: 标准差,决定波动范围
'''
secs = random.normalvariate(mu, sigma)
if secs <= 0:
secs = mu # 太小则重置为平均值
time.sleep(secs)
def run(self):
header = ['title','author','href','mid']
with open('video.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writeheader() # 写入列名
self.driver.get(self.start_url)
list = self.get_item_list()
num = 0
for i in list:
print(str(num) + ':' + i['str'])
num = num+1
choice1 = int(input("请输入你选择的分区:"))
item_detail = self.get_item_detail(list[choice1]['url'])
for detail in item_detail:
print(detail['str'])
choice2 = int(input("请输入你选择的分类:"))
url_detail = item_detail[choice2]['url'].get_attribute("href")
self.choose_time(url_detail)
content_list,next_url = self.get_content_list()
self.save_content_list(content_list)
while next_url is not None:
next_url.click()
self.random_sleep()
content_list, next_url=self.get_content_list()
self.save_content_list(content_list)
self.driver.quit()
if __name__=="__main__":
data_get=Bilibili_data()
data_get.run()
# coding: utf-8
# Author:南岛鹋
# Blog: www.ndmiao.cn
# Date :2020/8/25 10:29
# Tool :PyCharm
import requests
import json
from bs4 import BeautifulSoup
import re
import bs4
import csv
import random
import time
class detail_data:
def __init__(self):
# 爬虫地址
self.alphabet = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
def dec(self, x): # BV号转换成AV号
r = 0
for i, v in enumerate([11, 10, 3, 8, 4, 6]):
r += self.alphabet.find(x[v]) * 58 ** i
return (r - 0x2_0840_07c0) ^ 0x0a93_b324
def url_deal(self, url):
url = url[-12:] #取后面的BV号
return url
def random_headers(self, path):# 随机读取一个信息头
with open(path, 'r') as f:
data = f.readlines()
f.close()
reg = []
for i in data:
k = eval(i) # 将字符串转化为字典形式
reg.append(k)
header = random.choice(reg)
return header
def save_content_list(self,video_dict):
header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num', 'favorite_num','share_num', 'follow-num', 'video_type', 'video_time', 'video_rank', 'video_tag']
with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writerow(video_dict) # 写入数据
def get_ip(self):
print('切换IP中.......')
url = '填入获取IP的接口'
ip = requests.get(url).text
if ip in ['{"ERRORCODE":"10055","RESULT":"提取太频繁,请按规定频率提取!"}','{"ERRORCODE":"10098","RESULT":"可用机器数量不足"}']:
time.sleep(7.5)
ip = requests.get(url).text
print(ip)
else:
print(ip)
proxies = {
'https': 'http://' + ip,
'http': 'http://' + ip
}
return proxies
def get_time(self, url,proxy):
headers = self.random_headers('headers.txt')
try:
r = requests.get(url,timeout=3,headers=headers,proxies =proxy)
except requests.exceptions.RequestException as e:
print(e)
try:
r = requests.get(url,timeout=5,headers=headers)
proxy = self.get_ip()
except requests.exceptions.RequestException as e:
print(e)
return ['None','None','None','None'],proxy
print(1)
soup = BeautifulSoup(r.text, "html.parser")
result = soup.find(class_='video-data')
try:
result2 = soup.find(class_='default-btn follow-btn b-gz not-follow').find('span').find('span')
followers = result2.text
except:
followers = 'none'
timedata = []
timedata.append(followers)
try:
for i in result:
if type(i) == bs4.element.Tag:
timedata.append(re.sub('\s', ' ', i.text))
else:
timedata.append('None')
except:
return ['None','None','None','None'],proxy
return timedata,proxy
def get_view(self,BV,url_video,proxy,title,author):
bid = BV
aid = self.dec(bid)
url = r'https://api.bilibili.com/x/web-interface/archive/stat?aid=' + str(aid)
url2 = r'https://api.bilibili.com/x/web-interface/view/detail/tag?aid=' + str(aid)
headers = self.random_headers('headers.txt')
timedata, proxy = self.get_time(url_video, proxy)
try:
response = requests.get(url, timeout=3, headers=headers, proxies=proxy)
except requests.exceptions.RequestException as e:
print(e)
try:
response = requests.get(url, timeout=5, headers=headers)
proxy=self.get_ip()
except requests.exceptions.RequestException as e:
print(e)
print(2)
headers2 = self.random_headers('headers.txt')
try:
response2 = requests.get(url2, timeout=3, headers=headers2, proxies=proxy)
except requests.exceptions.RequestException as e:
print(e)
try:
response2 = requests.get(url2, timeout=5, headers=headers2)
proxy = self.get_ip()
except requests.exceptions.RequestException as e:
print(e)
print(3)
text = response.text
text2 = response2.text
jsonobj = json.loads(text)
jsonobj2 = json.loads(text2)
video_tags = ''
# 从Json对象获取视频基本信息并转入词典中
try:
for tags in jsonobj2['data']:
video_tags = video_tags + tags['tag_name'] + ' '
video_dict = {'title': title,
'author': author,
'href': url_video,
'bvid': BV,
'view': jsonobj['data']['view'],
'danmu': jsonobj['data']['danmaku'],
'reply_num': jsonobj['data']['reply'],
'like_num': jsonobj['data']['like'],
'coin_num': jsonobj['data']['coin'],
'favorite_num': jsonobj['data']['favorite'],
'share_num': jsonobj['data']['share'],
'follow-num': timedata[0],
'video_type': timedata[1],
'video_time': timedata[2],
'video_rank': timedata[3],
'video_tag': video_tags
}
except:
video_dict = {'title': title,
'author': author,
'href': url_video,
'bvid': 'None',
'view': 'None',
'danmu': 'None',
'reply_num': 'None',
'like_num': 'None',
'coin_num': 'None',
'favorite_num': 'None',
'share_num': 'None',
'follow-num': 'None',
'video_type': 'None',
'video_time': 'None',
'video_rank': 'None',
'video_tag': 'None'
}
return video_dict, proxy
return video_dict, proxy
def run(self):
header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num',
'favorite_num', 'share_num', 'follow-num', 'video_type', 'video_time', 'video_rank', 'video_tag']
proxy = self.get_ip()
with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writeheader() # 写入列名
with open(r'video.csv', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
url = row['href']
BV = self.url_deal(url)
video_dict,proxy = self.get_view(BV, url,proxy,row['title'],row['author'])
a = 1
if video_dict['bvid'] == 'None' and a==1:
proxy = self.get_ip()
video_dict, proxy = self.get_view(BV, url, proxy, row['title'], row['author'])
a = a+1
print(video_dict)
self.save_content_list(video_dict)
if __name__ == '__main__':
video = detail_data()
video.run()