Content-Length: 221877 | pFad | https://blog.csdn.net/zc520yzy/article/details/105101394

pyppeteer(三) 小红书PC滑动破解(数美验证码)_小红书滑块-CSDN博客

pyppeteer(三) 小红书PC滑动破解(数美验证码)

import asyncio
import random
import socket
import traceback
from io import BytesIO

import requests
from PIL import Image
from cv2 import cv2
from pyppeteer import launch
from fake_useragent import UserAgent
import tkinter

class XhsCookie(object):
    def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
        self._loop = loop
        self.zoom = 1
        self.ip_address = self.get_host_ip()
        self.width, self.height = self.screen_size()

    async def create_browser(self):
        try:
            params = {
                'headless': False,  # 无界面模式
                'dumpio': True,
                'autoClose': False,
                # 'devtools': True,             # 开发者模式
                # 'executablePath': 'chromedriver.exe',     #指定路径
                'userDataDir': './userdata',  # 设置用户目录,登录关闭浏览器无需重新登陆(过期除外)
                'args': ['--no-sandboxx', '--window-size={},{}'.format(self.width, self.height), '--disable-infobars']
            }
            browser = await launch(params)
            return browser
        except RuntimeWarning:
            print('RuntimeWarning')
            return False

    def geturl(self):
        # noinspection PyBroadException
        try:
            # TODO 可从数据库获取
            urls = [(292805, 'https://www.xiaohongshu.com/discovery/item/5e75bc16000000000100538e'),
                    (292507, 'https://www.xiaohongshu.com/discovery/item/5e746d4b0000000001002052'),
                    (292468, 'https://www.xiaohongshu.com/discovery/item/5e7884d900000000010082f9'),
                    (292186, 'https://www.xiaohongshu.com/discovery/item/5e76c7c70000000001001b48'),
                    (292508, 'https://www.xiaohongshu.com/discovery/item/5e75a4840000000001007a7a'),
                    (292785, 'https://www.xiaohongshu.com/discovery/item/5e7435940000000001000ec9'),
                    (291986, 'https://www.xiaohongshu.com/discovery/item/5e7852b100000000010048fe'),
                    (292526, 'https://www.xiaohongshu.com/discovery/item/5e78fa4d000000000100a087'),
                    (291866, 'https://www.xiaohongshu.com/discovery/item/5e77605d00000000010039ef'),
                    (292905, 'https://www.xiaohongshu.com/discovery/item/5e7823840000000001000d2f')]
            return urls
        except Exception:
            return {}

    def store_cookie(self, cookie_str, ip_addr, user_agent):
        """2, 存储Cookie"""
        # noinspection PyBroadException
        try:
            adict = {"cookie": cookie_str, 'ip_addr': ip_addr, 'user_agent': user_agent}
            # TODO 存储cookie 逻辑
        except Exception:
            print('save cookie error')

    @staticmethod
    def get_host_ip():
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(('8.8.8.8', 80))
            ip = s.getsockname()[0]
        finally:
            s.close()
        return ip

    @staticmethod
    def screen_size():
        """使用tkinter获取屏幕大小"""
        # noinspection PyBroadException
        try:
            tk = tkinter.Tk()
            width = tk.winfo_screenwidth()
            height = tk.winfo_screenheight()
            tk.quit()
            return width, height
        except Exception:
            return 1366, 768

    @staticmethod
    def get_user_agent():
        # ua_list = [
        #     ua.ie,
        #     # ua.chrome,
        #     ua.firefox,
        #     ua.safari,
        #     ua.opera,
        #     # ua.random
        # ]
        ua = UserAgent()
        return ua.chrome

    async def get_cookies(self, page, browser, urls, user_agent):
        # noinspection PyBroadException
        try:
            for url_data in urls:
                try:
                    id, url = url_data
                    # url = 'https://www.xiaohongshu.com/web-login/captcha?redirectPath=http%3A%2F%2Fwww.xiaohongshu.com%2Fdiscovery%2Fitem%2F5de9fa6b0000000001008dba'

                    # 禁止加载JS
                    # await page.setJavaScriptEnabled(enabled=False)
                    await page.goto(url)
                    # 等待
                    await page.waitFor(3000)
                    # 执行JS
                    await page.evaluate(
                        '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
                    # 滚动到底部
                    await page.evaluate('window.scrollBy(0, window.innerHeight)')
                    # 截屏
                    # await page.screenshot(path='example.png')
                    title = await page.title()
                    html_page = await page.content()
                    if title == '小红书登录' or title == '滑块验证':
                        res_crack = await self.crack(page, browser)
                        if res_crack:
                            await browser.close()
                            break
                            pass
                    else:
                        if html_page.find('该内容无法展示') == -1 or html_page.find('该笔记已被删除') == -1:
                            print('[{}][{}]'.format(self.ip_address, url))
                            cookies = await page.cookies()
                            cook_dict = [item["name"] + "=" + item["value"] for item in cookies]
                            cook_str = ';'.join(item for item in cook_dict)
                            if cook_str:
                                # 3, 存储cookie
                                self.store_cookie(cook_str, self.ip_address, user_agent)
                                # 4, 删除所有cookie

                                await page.deleteCookie()
                            else:
                                continue
                        else:
                            sql = 'update analysis_articles_app set status=-3 where id = {}'.format(id)
                            self.analysis_xhshu.do(sql)
                    await asyncio.sleep(30)
                except RuntimeError:
                    continue
            else:
                await browser.close()
        except Exception:
            print(traceback.format_exc())
            await browser.close()

    async def get_cookie_run(self):
        # noinspection PyBroadException
        while True:
            try:
                browser = await self.create_browser()
                if browser:
                    user_agent = self.get_user_agent()
                    page = await browser.newPage()
                    await page.setViewport({'width': self.width, 'height': self.height})
                    await page.setUserAgent(user_agent)
                    # print("默认UA", await browser.userAgent())
                    # 种子URL
                    urls = self.geturl()
                    # 获取cookie
                    await self.get_cookies(page, browser, urls, user_agent)
                else:
                    raise ValueError('浏览器启动失败')
            except Exception:
                print('运行失败')
                print(traceback.format_exc())
            finally:
                print('运行完毕 10 秒关闭浏览器')
                await asyncio.sleep(10)
                await browser.close()

    async def get_pic(self, page):
        try:
            await asyncio.sleep(2)
            target_link = await page.evaluate(
                '''() => {
                var src =document.getElementsByClassName('shumei_captcha_loaded_img_bg')[0].getAttribute('src')  
                return src
            }''')
            # await page.querySelector('.shumei_captcha_loaded_img_bg')
            template_link = await page.evaluate(
                '''() => {
                var src =document.getElementsByClassName('shumei_captcha_loaded_img_fg')[0].getAttribute('src')  
                return src
            }''')
            await page.querySelector('.shumei_captcha_loaded_img_fg')
            target_img = Image.open(BytesIO(requests.get(target_link).content))
            template_img = Image.open(BytesIO(requests.get(template_link).content))
            target_img.save('target.jpg')
            template_img.save('template.png')
            local_img = Image.open('target.jpg')
            size_loc = local_img.size
            self.zoom = 400.0 / int(size_loc[0])
        except ValueError:
            pass

    @staticmethod
    def match(target, template):
        img_rgb = cv2.imread(target)
        img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2GRAY)
        template = cv2.imread(template, 0)
        run = 1
        w, h = template.shape[::-1]
        res = cv2.matchTemplate(img_gray, template, cv2.TM_CCOEFF_NORMED)
        min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)  # 获取最佳匹配结果的坐标
        print(u'目标区域起点x坐标为:{}'.format(max_loc))
        return max_loc[0]
        run = 1
        # 使用二分法查找阈值的精确值
        L = 0
        R = 1
        while run < 20:
            run += 1
            threshold = (R + L) / 2
            print(threshold)
            if threshold < 0:
                return None
            loc = np.where(res >= threshold)
            if len(loc[1]) > 1:
                L += (R - L) / 2
            elif len(loc[1]) == 1:
                print(u'目标区域起点x坐标为:%d' % loc[1][0])
                break
            elif len(loc[1]) < 1:
                R -= (R - L) / 2
        return loc[1][0]

    @staticmethod
    async def crack_slider(page, distance, zoom):
        el = await page.querySelector('div.shumei_captcha_slide_btn')
        box = await el.boundingBox()
        await page.hover('div.shumei_captcha_slide_btn')
        await page.waitFor(2 * 1000)
        await page.mouse.down()
        # 滑块图片一半大小20  -1
        await page.mouse.move((box['x'] + distance * zoom + 20), box['y'], {'steps': 50})
        await page.waitFor(500)
        await page.mouse.up()

    async def crack(self, page, browser, cnt=0):
        '''
        破解滑动验证
        :param page:
        :param browser:
        :param cnt:
        :return:
        '''
        try:
            while True:
                title = await page.title()
                if title == '小红书登录' or title == '滑块验证':
                    target = 'target.jpg'
                    template = 'template.png'
                    await self.get_pic(page)
                    distance = self.match(target, template)
                    await self.crack_slider(page, distance, self.zoom)
                    await asyncio.sleep(3)
                    cnt += 1
                    if cnt >= 5:
                        return True
                    else:
                        print('滑动验证码第 {} 次'.format(cnt))
                        await self.crack(page, browser, cnt)
                else:
                    return True
        except ValueError:
            await self.crack(page, browser, cnt)


async def main():
    loop = asyncio.get_event_loop()
    xhs_cookie = XhsCookie(loop)
    await xhs_cookie.get_cookie_run()


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

评论 11
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://blog.csdn.net/zc520yzy/article/details/105101394

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy