在第一版的基础上进行了改进,增加了IP代理的功能。但是在selenium模块来爬取的时候,增加ip代理也不太可靠,需要优质的IP才能保证流畅的进行。

# coding: utf-8
# Author:南岛鹋 
# Blog: www.ndmiao.cn
# Date :2020/8/24 14:35
# Tool :PyCharm

from selenium import webdriver
import re
import csv
import time
import random
class Bilibili_data:
    def __init__(self):
        self.start_url="https://www.bilibili.com/"
        self.driver=webdriver.Chrome() # 调用谷歌浏览器
    def get_item_list(self):
        list = self.driver.find_elements_by_xpath("//*[@id='primaryChannelMenu']/*/*/*/span")
        item_list = []
        i = 0
        for element in list:
            item = {}
            str = re.sub("[A-Za-z0-9\!\%\[\]\+\。]", "", element.text)
            item["str"] = str
            item["url"] = element.find_element_by_xpath("./..")
            item_list.append(item)
            i = i+1
            if i == 15:
                break
        return item_list

    def get_item_detail(self,url):
        url.click()
        list = self.driver.find_elements_by_xpath("//ul[@class='clearfix']/*[position()>1]/*")
        i = 0
        item_detail = []
        for element in list:
            item = {}
            item["str"] = str(i) + ':' + element.text
            i = i + 1
            item["url"] = element
            item_detail.append(item)
        return item_detail

    def choose_time(self,url):
        url_last = "#/all/{}/0/1/{}"
        item = ['click', 'scores', 'stow', 'coin', 'dm']
        cn_item = ['播放数','评论数','收藏数','硬币数','弹幕数']
        num = 0
        for i in cn_item:
            print(str(num) + ':' + i)
            num = num+1
        item_choice = int(input('请输入你选择的排序:'))
        time_choice = input('请输入时间段(例如 2020-01-01,2020-01-07):')
        url = url + url_last.format(item[item_choice],time_choice)
        self.driver.get(url)

    def get_content_list(self):
        li_list = self.driver.find_elements_by_xpath("//ul[@class='vd-list mod-2']/li")
        content_list = []
        for li in li_list:
            video_detail = {}
            video_detail['title'] = li.find_element_by_xpath(".//div[@class='r']/a").text
            video_detail['author'] = li.find_element_by_xpath(".//div[@class='up-info']/a").text
            video_detail['href'] = li.find_element_by_xpath(".//div[@class='r']/a").get_attribute("href")
            author_href = li.find_element_by_xpath(".//div[@class='up-info']/a").get_attribute("href")
            video_detail['mid'] = re.findall(r'\d+',author_href)[0]
            content_list.append(video_detail)
        print(content_list)
        next_url = self.driver.find_elements_by_xpath("//button[@class='nav-btn iconfont icon-arrowdown3']")
        next_url = next_url[0] if len(next_url) > 0 else None
        return content_list,next_url

    def save_content_list(self,content_list):
        header = ['title','author','href','mid']
        with open('video.csv', 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
            writer.writerows(content_list)  # 写入数据

    def random_sleep(self,mu=3, sigma=0.4):
        '''正态分布随机睡眠

        :param mu: 平均值
        :param sigma: 标准差,决定波动范围
        '''
        secs = random.normalvariate(mu, sigma)
        if secs <= 0:
            secs = mu  # 太小则重置为平均值
        time.sleep(secs)

    def run(self):
        header = ['title','author','href','mid']
        with open('video.csv', 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
            writer.writeheader()  # 写入列名
        self.driver.get(self.start_url)
        list = self.get_item_list()
        num = 0
        for i in list:
            print(str(num) + ':' + i['str'])
            num = num+1
        choice1 = int(input("请输入你选择的分区:"))
        item_detail = self.get_item_detail(list[choice1]['url'])
        for detail in item_detail:
            print(detail['str'])
        choice2 = int(input("请输入你选择的分类:"))
        url_detail = item_detail[choice2]['url'].get_attribute("href")
        self.choose_time(url_detail)
        content_list,next_url = self.get_content_list()
        self.save_content_list(content_list)
        while next_url is not None:
            next_url.click()
            self.random_sleep()
            content_list, next_url=self.get_content_list()
            self.save_content_list(content_list)
        self.driver.quit()

if __name__=="__main__":
    data_get=Bilibili_data()
    data_get.run()
# coding: utf-8
# Author:南岛鹋 
# Blog: www.ndmiao.cn
# Date :2020/8/25 10:29
# Tool :PyCharm

import requests
import json
from bs4 import BeautifulSoup
import re
import bs4
import csv
import random
import time
class detail_data:
    def __init__(self):
        # 爬虫地址
        self.alphabet = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'

    def dec(self, x):  # BV号转换成AV号
        r = 0
        for i, v in enumerate([11, 10, 3, 8, 4, 6]):
            r += self.alphabet.find(x[v]) * 58 ** i
        return (r - 0x2_0840_07c0) ^ 0x0a93_b324

    def url_deal(self, url): 
        url = url[-12:]  #取后面的BV号
        return url

    def random_headers(self, path):# 随机读取一个信息头
        with open(path, 'r') as f:
            data = f.readlines()
            f.close()

        reg = []
        for i in data:
            k = eval(i)  # 将字符串转化为字典形式
            reg.append(k)
        header = random.choice(reg)
        return header

    def save_content_list(self,video_dict):
        header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num', 'favorite_num','share_num', 'follow-num', 'video_type', 'video_time', 'video_rank', 'video_tag']
        with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
            writer.writerow(video_dict)  # 写入数据

    def get_ip(self):
        print('切换IP中.......')
        url = '填入获取IP的接口'
        ip = requests.get(url).text
        if ip in ['{"ERRORCODE":"10055","RESULT":"提取太频繁,请按规定频率提取!"}','{"ERRORCODE":"10098","RESULT":"可用机器数量不足"}']:
            time.sleep(7.5)
            ip = requests.get(url).text
            print(ip)
        else:
            print(ip)
        proxies = {
            'https': 'http://' + ip,
            'http': 'http://' + ip
        }
        return proxies

    def get_time(self, url,proxy):
        headers = self.random_headers('headers.txt')
        try:
            r = requests.get(url,timeout=3,headers=headers,proxies =proxy)
        except requests.exceptions.RequestException as e:
            print(e)
            try:
                r = requests.get(url,timeout=5,headers=headers)
                proxy = self.get_ip()
            except requests.exceptions.RequestException as e:
                print(e)
                return ['None','None','None','None'],proxy
        print(1)
        soup = BeautifulSoup(r.text, "html.parser")
        result = soup.find(class_='video-data')
        try:
            result2 = soup.find(class_='default-btn follow-btn b-gz not-follow').find('span').find('span')
            followers = result2.text
        except:
            followers = 'none'
        timedata = []
        timedata.append(followers)
        try:
            for i in result:
                if type(i) == bs4.element.Tag:
                    timedata.append(re.sub('\s', ' ', i.text))
                else:
                    timedata.append('None')
        except:
            return ['None','None','None','None'],proxy
        return timedata,proxy

    def get_view(self,BV,url_video,proxy,title,author):
        bid = BV
        aid = self.dec(bid)
        url = r'https://api.bilibili.com/x/web-interface/archive/stat?aid=' + str(aid)
        url2 = r'https://api.bilibili.com/x/web-interface/view/detail/tag?aid=' + str(aid)
        headers = self.random_headers('headers.txt')
        timedata, proxy = self.get_time(url_video, proxy)
        try:
            response = requests.get(url, timeout=3, headers=headers, proxies=proxy)
        except requests.exceptions.RequestException as e:
            print(e)
            try:
                response = requests.get(url, timeout=5, headers=headers)
                proxy=self.get_ip()
            except requests.exceptions.RequestException as e:
                print(e)
        print(2)
        headers2 = self.random_headers('headers.txt')
        try:
            response2 = requests.get(url2, timeout=3, headers=headers2, proxies=proxy)
        except requests.exceptions.RequestException as e:
            print(e)
            try:
                response2 = requests.get(url2, timeout=5, headers=headers2)
                proxy = self.get_ip()
            except requests.exceptions.RequestException as e:
                print(e)
        print(3)
        text = response.text
        text2 = response2.text
        jsonobj = json.loads(text)
        jsonobj2 = json.loads(text2)
        video_tags = ''

        # 从Json对象获取视频基本信息并转入词典中
        try:
            for tags in jsonobj2['data']:
                video_tags = video_tags + tags['tag_name'] + ' '
            video_dict = {'title': title,
                          'author': author,
                          'href': url_video,
                          'bvid': BV,
                          'view': jsonobj['data']['view'],
                          'danmu': jsonobj['data']['danmaku'],
                          'reply_num': jsonobj['data']['reply'],
                          'like_num': jsonobj['data']['like'],
                          'coin_num': jsonobj['data']['coin'],
                          'favorite_num': jsonobj['data']['favorite'],
                          'share_num': jsonobj['data']['share'],
                          'follow-num': timedata[0],
                          'video_type': timedata[1],
                          'video_time': timedata[2],
                          'video_rank': timedata[3],
                          'video_tag': video_tags
                          }
        except:
            video_dict = {'title': title,
                          'author': author,
                          'href': url_video,
                          'bvid': 'None',
                          'view': 'None',
                          'danmu': 'None',
                          'reply_num': 'None',
                          'like_num': 'None',
                          'coin_num': 'None',
                          'favorite_num': 'None',
                          'share_num': 'None',
                          'follow-num': 'None',
                          'video_type': 'None',
                          'video_time': 'None',
                          'video_rank': 'None',
                          'video_tag': 'None'
                          }
            return video_dict, proxy

        return video_dict, proxy

    def run(self):
        header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num',
                  'favorite_num', 'share_num', 'follow-num', 'video_type', 'video_time', 'video_rank', 'video_tag']
        proxy = self.get_ip()
        with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
            writer.writeheader()  # 写入列名
        with open(r'video.csv', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                url = row['href']
                BV = self.url_deal(url)
                video_dict,proxy = self.get_view(BV, url,proxy,row['title'],row['author'])
                a = 1
                if video_dict['bvid'] == 'None' and a==1:
                    proxy = self.get_ip()
                    video_dict, proxy = self.get_view(BV, url, proxy, row['title'], row['author'])
                    a = a+1
                print(video_dict)
                self.save_content_list(video_dict)

if __name__ == '__main__':
    video = detail_data()
    video.run()
Last modification:September 3rd, 2020 at 02:42 pm
如果觉得我的文章对你有用,请随意赞赏