在第一版的基础上进行了改进,增加了IP代理的功能。但是在selenium模块来爬取的时候,增加ip代理也不太可靠,需要优质的IP才能保证流畅的进行。
# coding: utf-8 # Author:南岛鹋 # Blog: www.ndmiao.cn # Date :2020/8/24 14:35 # Tool :PyCharm from selenium import webdriver import re import csv import time import random class Bilibili_data: def __init__(self): self.start_url="https://www.bilibili.com/" self.driver=webdriver.Chrome() # 调用谷歌浏览器 def get_item_list(self): list = self.driver.find_elements_by_xpath("//*[@id='primaryChannelMenu']/*/*/*/span") item_list = [] i = 0 for element in list: item = {} str = re.sub("[A-Za-z0-9\!\%\[\]\+\。]", "", element.text) item["str"] = str item["url"] = element.find_element_by_xpath("./..") item_list.append(item) i = i+1 if i == 15: break return item_list def get_item_detail(self,url): url.click() list = self.driver.find_elements_by_xpath("//ul[@class='clearfix']/*[position()>1]/*") i = 0 item_detail = [] for element in list: item = {} item["str"] = str(i) + ':' + element.text i = i + 1 item["url"] = element item_detail.append(item) return item_detail def choose_time(self,url): url_last = "#/all/{}/0/1/{}" item = ['click', 'scores', 'stow', 'coin', 'dm'] cn_item = ['播放数','评论数','收藏数','硬币数','弹幕数'] num = 0 for i in cn_item: print(str(num) + ':' + i) num = num+1 item_choice = int(input('请输入你选择的排序:')) time_choice = input('请输入时间段(例如 2020-01-01,2020-01-07):') url = url + url_last.format(item[item_choice],time_choice) self.driver.get(url) def get_content_list(self): li_list = self.driver.find_elements_by_xpath("//ul[@class='vd-list mod-2']/li") content_list = [] for li in li_list: video_detail = {} video_detail['title'] = li.find_element_by_xpath(".//div[@class='r']/a").text video_detail['author'] = li.find_element_by_xpath(".//div[@class='up-info']/a").text video_detail['href'] = li.find_element_by_xpath(".//div[@class='r']/a").get_attribute("href") author_href = li.find_element_by_xpath(".//div[@class='up-info']/a").get_attribute("href") video_detail['mid'] = re.findall(r'\d+',author_href)[0] content_list.append(video_detail) print(content_list) next_url = self.driver.find_elements_by_xpath("//button[@class='nav-btn iconfont icon-arrowdown3']") next_url = next_url[0] if len(next_url) > 0 else None return content_list,next_url def save_content_list(self,content_list): header = ['title','author','href','mid'] with open('video.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。 writer.writerows(content_list) # 写入数据 def random_sleep(self,mu=3, sigma=0.4): '''正态分布随机睡眠 :param mu: 平均值 :param sigma: 标准差,决定波动范围 ''' secs = random.normalvariate(mu, sigma) if secs <= 0: secs = mu # 太小则重置为平均值 time.sleep(secs) def run(self): header = ['title','author','href','mid'] with open('video.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。 writer.writeheader() # 写入列名 self.driver.get(self.start_url) list = self.get_item_list() num = 0 for i in list: print(str(num) + ':' + i['str']) num = num+1 choice1 = int(input("请输入你选择的分区:")) item_detail = self.get_item_detail(list[choice1]['url']) for detail in item_detail: print(detail['str']) choice2 = int(input("请输入你选择的分类:")) url_detail = item_detail[choice2]['url'].get_attribute("href") self.choose_time(url_detail) content_list,next_url = self.get_content_list() self.save_content_list(content_list) while next_url is not None: next_url.click() self.random_sleep() content_list, next_url=self.get_content_list() self.save_content_list(content_list) self.driver.quit() if __name__=="__main__": data_get=Bilibili_data() data_get.run()
# coding: utf-8
# Author:南岛鹋
# Blog: www.ndmiao.cn
# Date :2020/8/25 10:29
# Tool :PyCharm
import requests
import json
from bs4 import BeautifulSoup
import re
import bs4
import csv
import random
import time
class detail_data:
def __init__(self):
# 爬虫地址
self.alphabet = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
def dec(self, x): # BV号转换成AV号
r = 0
for i, v in enumerate([11, 10, 3, 8, 4, 6]):
r += self.alphabet.find(x[v]) * 58 ** i
return (r - 0x2_0840_07c0) ^ 0x0a93_b324
def url_deal(self, url):
url = url[-12:] #取后面的BV号
return url
def random_headers(self, path):# 随机读取一个信息头
with open(path, 'r') as f:
data = f.readlines()
f.close()
reg = []
for i in data:
k = eval(i) # 将字符串转化为字典形式
reg.append(k)
header = random.choice(reg)
return header
def save_content_list(self,video_dict):
header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num', 'favorite_num','share_num', 'follow-num', 'video_type', 'video_time', 'video_rank', 'video_tag']
with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writerow(video_dict) # 写入数据
def get_ip(self):
print('切换IP中.......')
url = '填入获取IP的接口'
ip = requests.get(url).text
if ip in ['{"ERRORCODE":"10055","RESULT":"提取太频繁,请按规定频率提取!"}','{"ERRORCODE":"10098","RESULT":"可用机器数量不足"}']:
time.sleep(7.5)
ip = requests.get(url).text
print(ip)
else:
print(ip)
proxies = {
'https': 'http://' + ip,
'http': 'http://' + ip
}
return proxies
def get_time(self, url,proxy):
headers = self.random_headers('headers.txt')
try:
r = requests.get(url,timeout=3,headers=headers,proxies =proxy)
except requests.exceptions.RequestException as e:
print(e)
try:
r = requests.get(url,timeout=5,headers=headers)
proxy = self.get_ip()
except requests.exceptions.RequestException as e:
print(e)
return ['None','None','None','None'],proxy
print(1)
soup = BeautifulSoup(r.text, "html.parser")
result = soup.find(class_='video-data')
try:
result2 = soup.find(class_='default-btn follow-btn b-gz not-follow').find('span').find('span')
followers = result2.text
except:
followers = 'none'
timedata = []
timedata.append(followers)
try:
for i in result:
if type(i) == bs4.element.Tag:
timedata.append(re.sub('\s', ' ', i.text))
else:
timedata.append('None')
except:
return ['None','None','None','None'],proxy
return timedata,proxy
def get_view(self,BV,url_video,proxy,title,author):
bid = BV
aid = self.dec(bid)
url = r'https://api.bilibili.com/x/web-interface/archive/stat?aid=' + str(aid)
url2 = r'https://api.bilibili.com/x/web-interface/view/detail/tag?aid=' + str(aid)
headers = self.random_headers('headers.txt')
timedata, proxy = self.get_time(url_video, proxy)
try:
response = requests.get(url, timeout=3, headers=headers, proxies=proxy)
except requests.exceptions.RequestException as e:
print(e)
try:
response = requests.get(url, timeout=5, headers=headers)
proxy=self.get_ip()
except requests.exceptions.RequestException as e:
print(e)
print(2)
headers2 = self.random_headers('headers.txt')
try:
response2 = requests.get(url2, timeout=3, headers=headers2, proxies=proxy)
except requests.exceptions.RequestException as e:
print(e)
try:
response2 = requests.get(url2, timeout=5, headers=headers2)
proxy = self.get_ip()
except requests.exceptions.RequestException as e:
print(e)
print(3)
text = response.text
text2 = response2.text
jsonobj = json.loads(text)
jsonobj2 = json.loads(text2)
video_tags = ''
# 从Json对象获取视频基本信息并转入词典中
try:
for tags in jsonobj2['data']:
video_tags = video_tags + tags['tag_name'] + ' '
video_dict = {'title': title,
'author': author,
'href': url_video,
'bvid': BV,
'view': jsonobj['data']['view'],
'danmu': jsonobj['data']['danmaku'],
'reply_num': jsonobj['data']['reply'],
'like_num': jsonobj['data']['like'],
'coin_num': jsonobj['data']['coin'],
'favorite_num': jsonobj['data']['favorite'],
'share_num': jsonobj['data']['share'],
'follow-num': timedata[0],
'video_type': timedata[1],
'video_time': timedata[2],
'video_rank': timedata[3],
'video_tag': video_tags
}
except:
video_dict = {'title': title,
'author': author,
'href': url_video,
'bvid': 'None',
'view': 'None',
'danmu': 'None',
'reply_num': 'None',
'like_num': 'None',
'coin_num': 'None',
'favorite_num': 'None',
'share_num': 'None',
'follow-num': 'None',
'video_type': 'None',
'video_time': 'None',
'video_rank': 'None',
'video_tag': 'None'
}
return video_dict, proxy
return video_dict, proxy
def run(self):
header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num',
'favorite_num', 'share_num', 'follow-num', 'video_type', 'video_time', 'video_rank', 'video_tag']
proxy = self.get_ip()
with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writeheader() # 写入列名
with open(r'video.csv', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
url = row['href']
BV = self.url_deal(url)
video_dict,proxy = self.get_view(BV, url,proxy,row['title'],row['author'])
a = 1
if video_dict['bvid'] == 'None' and a==1:
proxy = self.get_ip()
video_dict, proxy = self.get_view(BV, url, proxy, row['title'], row['author'])
a = a+1
print(video_dict)
self.save_content_list(video_dict)
if __name__ == '__main__':
video = detail_data()
video.run()