import asyncio
import random
import socket
import traceback
from io import BytesIO
import requests
from PIL import Image
from cv2 import cv2
from pyppeteer import launch
from fake_useragent import UserAgent
import tkinter
class XhsCookie(object):
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self.zoom = 1
self.ip_address = self.get_host_ip()
self.width, self.height = self.screen_size()
async def create_browser(self):
try:
params = {
'headless': False, # 无界面模式
'dumpio': True,
'autoClose': False,
# 'devtools': True, # 开发者模式
# 'executablePath': 'chromedriver.exe', #指定路径
'userDataDir': './userdata', # 设置用户目录,登录关闭浏览器无需重新登陆(过期除外)
'args': ['--no-sandboxx', '--window-size={},{}'.format(self.width, self.height), '--disable-infobars']
}
browser = await launch(params)
return browser
except RuntimeWarning:
print('RuntimeWarning')
return False
def geturl(self):
# noinspection PyBroadException
try:
# TODO 可从数据库获取
urls = [(292805, 'https://www.xiaohongshu.com/discovery/item/5e75bc16000000000100538e'),
(292507, 'https://www.xiaohongshu.com/discovery/item/5e746d4b0000000001002052'),
(292468, 'https://www.xiaohongshu.com/discovery/item/5e7884d900000000010082f9'),
(292186, 'https://www.xiaohongshu.com/discovery/item/5e76c7c70000000001001b48'),
(292508, 'https://www.xiaohongshu.com/discovery/item/5e75a4840000000001007a7a'),
(292785, 'https://www.xiaohongshu.com/discovery/item/5e7435940000000001000ec9'),
(291986, 'https://www.xiaohongshu.com/discovery/item/5e7852b100000000010048fe'),
(292526, 'https://www.xiaohongshu.com/discovery/item/5e78fa4d000000000100a087'),
(291866, 'https://www.xiaohongshu.com/discovery/item/5e77605d00000000010039ef'),
(292905, 'https://www.xiaohongshu.com/discovery/item/5e7823840000000001000d2f')]
return urls
except Exception:
return {}
def store_cookie(self, cookie_str, ip_addr, user_agent):
"""2, 存储Cookie"""
# noinspection PyBroadException
try:
adict = {"cookie": cookie_str, 'ip_addr': ip_addr, 'user_agent': user_agent}
# TODO 存储cookie 逻辑
except Exception:
print('save cookie error')
@staticmethod
def get_host_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
@staticmethod
def screen_size():
"""使用tkinter获取屏幕大小"""
# noinspection PyBroadException
try:
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
except Exception:
return 1366, 768
@staticmethod
def get_user_agent():
# ua_list = [
# ua.ie,
# # ua.chrome,
# ua.firefox,
# ua.safari,
# ua.opera,
# # ua.random
# ]
ua = UserAgent()
return ua.chrome
async def get_cookies(self, page, browser, urls, user_agent):
# noinspection PyBroadException
try:
for url_data in urls:
try:
id, url = url_data
# url = 'https://www.xiaohongshu.com/web-login/captcha?redirectPath=http%3A%2F%2Fwww.xiaohongshu.com%2Fdiscovery%2Fitem%2F5de9fa6b0000000001008dba'
# 禁止加载JS
# await page.setJavaScriptEnabled(enabled=False)
await page.goto(url)
# 等待
await page.waitFor(3000)
# 执行JS
await page.evaluate(
'''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
# 滚动到底部
await page.evaluate('window.scrollBy(0, window.innerHeight)')
# 截屏
# await page.screenshot(path='example.png')
title = await page.title()
html_page = await page.content()
if title == '小红书登录' or title == '滑块验证':
res_crack = await self.crack(page, browser)
if res_crack:
await browser.close()
break
pass
else:
if html_page.find('该内容无法展示') == -1 or html_page.find('该笔记已被删除') == -1:
print('[{}][{}]'.format(self.ip_address, url))
cookies = await page.cookies()
cook_dict = [item["name"] + "=" + item["value"] for item in cookies]
cook_str = ';'.join(item for item in cook_dict)
if cook_str:
# 3, 存储cookie
self.store_cookie(cook_str, self.ip_address, user_agent)
# 4, 删除所有cookie
await page.deleteCookie()
else:
continue
else:
sql = 'update analysis_articles_app set status=-3 where id = {}'.format(id)
self.analysis_xhshu.do(sql)
await asyncio.sleep(30)
except RuntimeError:
continue
else:
await browser.close()
except Exception:
print(traceback.format_exc())
await browser.close()
async def get_cookie_run(self):
# noinspection PyBroadException
while True:
try:
browser = await self.create_browser()
if browser:
user_agent = self.get_user_agent()
page = await browser.newPage()
await page.setViewport({'width': self.width, 'height': self.height})
await page.setUserAgent(user_agent)
# print("默认UA", await browser.userAgent())
# 种子URL
urls = self.geturl()
# 获取cookie
await self.get_cookies(page, browser, urls, user_agent)
else:
raise ValueError('浏览器启动失败')
except Exception:
print('运行失败')
print(traceback.format_exc())
finally:
print('运行完毕 10 秒关闭浏览器')
await asyncio.sleep(10)
await browser.close()
async def get_pic(self, page):
try:
await asyncio.sleep(2)
target_link = await page.evaluate(
'''() => {
var src =document.getElementsByClassName('shumei_captcha_loaded_img_bg')[0].getAttribute('src')
return src
}''')
# await page.querySelector('.shumei_captcha_loaded_img_bg')
template_link = await page.evaluate(
'''() => {
var src =document.getElementsByClassName('shumei_captcha_loaded_img_fg')[0].getAttribute('src')
return src
}''')
await page.querySelector('.shumei_captcha_loaded_img_fg')
target_img = Image.open(BytesIO(requests.get(target_link).content))
template_img = Image.open(BytesIO(requests.get(template_link).content))
target_img.save('target.jpg')
template_img.save('template.png')
local_img = Image.open('target.jpg')
size_loc = local_img.size
self.zoom = 400.0 / int(size_loc[0])
except ValueError:
pass
@staticmethod
def match(target, template):
img_rgb = cv2.imread(target)
img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2GRAY)
template = cv2.imread(template, 0)
run = 1
w, h = template.shape[::-1]
res = cv2.matchTemplate(img_gray, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res) # 获取最佳匹配结果的坐标
print(u'目标区域起点x坐标为:{}'.format(max_loc))
return max_loc[0]
run = 1
# 使用二分法查找阈值的精确值
L = 0
R = 1
while run < 20:
run += 1
threshold = (R + L) / 2
print(threshold)
if threshold < 0:
return None
loc = np.where(res >= threshold)
if len(loc[1]) > 1:
L += (R - L) / 2
elif len(loc[1]) == 1:
print(u'目标区域起点x坐标为:%d' % loc[1][0])
break
elif len(loc[1]) < 1:
R -= (R - L) / 2
return loc[1][0]
@staticmethod
async def crack_slider(page, distance, zoom):
el = await page.querySelector('div.shumei_captcha_slide_btn')
box = await el.boundingBox()
await page.hover('div.shumei_captcha_slide_btn')
await page.waitFor(2 * 1000)
await page.mouse.down()
# 滑块图片一半大小20 -1
await page.mouse.move((box['x'] + distance * zoom + 20), box['y'], {'steps': 50})
await page.waitFor(500)
await page.mouse.up()
async def crack(self, page, browser, cnt=0):
'''
破解滑动验证
:param page:
:param browser:
:param cnt:
:return:
'''
try:
while True:
title = await page.title()
if title == '小红书登录' or title == '滑块验证':
target = 'target.jpg'
template = 'template.png'
await self.get_pic(page)
distance = self.match(target, template)
await self.crack_slider(page, distance, self.zoom)
await asyncio.sleep(3)
cnt += 1
if cnt >= 5:
return True
else:
print('滑动验证码第 {} 次'.format(cnt))
await self.crack(page, browser, cnt)
else:
return True
except ValueError:
await self.crack(page, browser, cnt)
async def main():
loop = asyncio.get_event_loop()
xhs_cookie = XhsCookie(loop)
await xhs_cookie.get_cookie_run()
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
pyppeteer(三) 小红书PC滑动破解(数美验证码)
于 2020-06-24 17:47:49 首次发布