使用Python刷某博客平台的阅读量

近期重新拾起来了久违的爬虫,有时候就想练练手,正好想起来了某个博客平台,虽然这个平台的评价不高,但是也是有一定存在意义的。那就用Python的方式的方式刷一下阅读量吧。

目标分析

获取目标数据

首先要获取所有文章的列表,一般只有两个方式:

  1. 从个人主页获取
  2. 从创作中心也就是个人后台获取。

很明显从个人主页的位置获取更简单,因为不需要登录,完全没有任何必要给自己找麻烦。

我们要的是文章的版块所以选择文章,在开发者工具中看到有个接口get-business-list提供了我们需要的数据:

  • 文章ID
  • 文章URL
  • 文章标题

其实到这里,我们能拿到文章ID或者URL就可以了。

分析接口

接下来我们具体分析这个接口情况

载荷部分很简单,定义了页数索引和size以及username等数据。

然后再看一眼请求头,如果没有特殊字段就好办了。

很明显,只有一个cookie,但是这个cookie是必须的吗,也不一定,待会试试就可以了。直接复制完整URLhttps://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=u013046615 发起get请求。

的确可以直接请求,可以没有cookie。

那么到这里,我们就清晰了获取文章ID的整个过程,接下来就可以考虑代码的思路了。

代码思路

代码的整体思路如下:

  1. 首先获取文章ID或URL(URL其实也可以自己去拼接)
  2. 根据文章信息进行requests请求

获取文章信息的代码

1
2
3
4
5
6
7
8
def get_id_from_api():
api_url = 'https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog' \
'&orderby=&noMore=false&year=&month=&username=u013046615'
resp = requests.get(api_url, headers=headers)
all_ids = list()
for i in resp.json()['data']['list']:
all_ids.append(i['articleId'])
return all_ids

将数据进行提取,主要是最后的文章ID,也可以直接使用URL,这都无所谓。

进行文章访问的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def pv(id):
async with semaphore:
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
# 我这里加了代理,因人而异
async with session.get(f'{url}{id}', headers=headers,proxy=proxy) as resp:
await asyncio.sleep(random.uniform(1.0, 3.0))
if resp.status == 200:
return True
else:
return False

except Exception as e:
return False

这里使用的是aiohttp,使用requests也可以。将文章ID传入,然后进行访问,至于返回结果无所属,只要阅读数上升了,就算有一些异常也无关紧要。这里建议使用一些代理,否则可能接口返回异常的可能性大一些。

最后还是要sleep以下,否则容易被ban掉,也算是对自己负责吧。

整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""
@time:2023/10/13
@file:csdn.py
@author:medivh
@mail:admin@econow.cn
@IDE:PyCharm
"""
# !/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import random
import ssl
import time
import aiohttp
import requests

from tqdm import tqdm
from multiprocessing import freeze_support

freeze_support()
user_agent_list = [
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/61.0",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
"Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10.5; en-US; rv:1.9.2.15) Gecko/20110303 Firefox/3.6.15",
]

headers = {"User-Agent": random.choice(user_agent_list)}

host = "https://blog.csdn.net"
url = "https://blog.csdn.net/u013046615/article/details/"



def get_id_from_api():
api_url = 'https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog' \
'&orderby=&noMore=false&year=&month=&username=u013046615'
resp = requests.get(api_url, headers=headers)
all_ids = list()
for i in resp.json()['data']['list']:
all_ids.append(i['articleId'])
return all_ids


async def pv(id):
async with semaphore:
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(f'{url}{id}', headers=headers,proxy=proxy) as resp:
await asyncio.sleep(random.uniform(1.0, 3.0))
if resp.status == 200:
return True
else:
return False

except Exception as e:
return False


async def main():
i = 0
total_lines = 9999999 # 你需要提供总共的循环次数
pbar = tqdm(total=total_lines, desc="进度")
id_list = get_id_from_api()
# 为每个元素分配权重,可以根据索引设置权重,这里使用倒序索引
weights = [1 / (i + 1) for i in range(len(id_list))]

while i < total_lines:
num_list = [num for num in range(1, 14)]

# 随机选择一个元素,带有权重
selected_id = random.choices(id_list, weights=weights, k=1)[0]
tasks = [asyncio.create_task(pv(selected_id)) for _ in num_list]
await asyncio.wait(tasks)
# 使用 tqdm 更新进度
pbar.update(1)
i += 1


if __name__ == "__main__":
start_ts = time.time()
sslgen = SSLFactory()
semaphore = asyncio.Semaphore(10) # 限制同时进行的请求数量为10
timeout = aiohttp.ClientTimeout(total=10) # 设置总超时时间为10秒
asyncio.run(main())
print(f'耗时:{time.time() - start_ts}')

测试结果

可以对比前面的截图中的viewCount 174多了好几百,在个人中心也可以看到同样的数据,真实有效!

总结

实现刷阅读量的功能很简单,思路也很简单,没有什么逆向的过程。当然代码也有一些优化的潜力,比如:

  • 目前是获取了第一页的文章列表,可以根据第一次请求的总数算出需要请求多少页
  • 也可以加上定时任务,比如只有工作日的时候刷,毕竟很少有人大半夜闲的没事干读技术文章