菜单

Administrator
发布于 2025-02-28 / 52 阅读
0
0

u2实现多多自动点击链接处理

通过U2实现某多多微信小程序自动化处理,模拟点击访问某多多,可以实现自动点击

import os
import random
import subprocess
import time
from time import sleep
import loguru
import pyautogui
import redis
import uiautomator2 as u2
from retrying import retry


# def adb_screenshot(output_path='screenshot.png'):
#     # 截图并保存到设备的内部存储
#     adb_command = "adb shell screencap -p /sdcard/screenshot.png"
#     subprocess.run(adb_command, shell=True, check=True)
#
#     # 将截图从设备拉取到本地
#     pull_command = f"adb pull /sdcard/screenshot.png {output_path}"
#     subprocess.run(pull_command, shell=True, check=True)
#
#     # 可选:删除设备上的截图文件
#     rm_command = "adb shell rm /sdcard/screenshot.png"
#     subprocess.run(rm_command, shell=True, check=True)
#
#     print(f"Screenshot saved to {output_path}")

def random_5_10():
    random_5_10_time = 5 + random.random() * (15 - 5)
    return random_5_10_time


class ui_auto2:
    def __init__(self):
        # 连接设备
        self.cnt = 0
        self.d = u2.connect()
        self.today = time.strftime("%Y%m%d", time.localtime())
        self.log_file = f'{self.today}.log'
        # 指定loguru的日志文件名
        # 自定义日志文件名,使用当前日期作为日志文件名
        log_file = f"loguru{self.today}.log"
        # 配置 loguru 日志
        loguru.logger.add(log_file, rotation="1 day", encoding="utf-8", level="INFO", compression="zip")
        # self.redis_conn = redis.StrictRedis(host='', port='66379', db=11, decode_responses=True, password='')
        self.back_xpath = '//*[@resource-id="com.android.systemui:id/ends_group"]'
        self.ran = 5 + random.random() * (15 - 5)
        if self.d.info:
            loguru.logger.info(f"connected {self.d.serial} successfully")
        else:
            loguru.logger.error(f"connect {self.d.serial} failed")
            exit(-995)
        time.sleep(self.ran)
    def get_page_step(self):
        if self.d(resourceId="com.tencent.mm:id/bkl").exists:
            return 2
        # resource-id="com.tencent.mm:id/cj1"
        elif self.d(resourceId="com.tencent.mm:id/cj1").exists:
            return 1
        else:
            return 0
    @retry(stop_max_attempt_number=5)
    def click_man(self,message):
        message.click()
        time.sleep(2)

    def run(self):
        self.d.app_stop("com.tencent.mm")
        self.d.app_start("com.tencent.mm")
        time.sleep(random_5_10())
        name = self.d(resourceId="com.tencent.mm:id/kbq").sibling(text="好玩")
        if not name.exists:
            for i in range(10):
                self.d.swipe(500, 1000, 500, 500, 0.6)
                time.sleep(1)
                name = self.d(resourceId="com.tencent.mm:id/kbq").sibling(text="好玩")
                if name.exists:
                    break
        time.sleep(2)
        if self.get_page_step()==1:
            name.click()
            time.sleep(2)
        # for r in range(0,self.cnt):
        #     # 滑动
        #     self.d.swipe_ext("down", scale=0.8)
        time.sleep(5)
        # 判断是否有消息且不在redis中
        while True:
            # 截图
            # adb_screenshot(f"screenshot{self.cnt}.png")
            # 使用
            # 随机时间5-10秒


            message_list = self.d.xpath('//*[@resource-id="com.tencent.mm:id/bkl"]').all()
            loguru.logger.info(f'找到{len(message_list)}条消息')
            time.sleep(5)
            for message in message_list:
                self.cnt += 1
                # 检查文件是否存在,如果不存在则创建一个空文件
                if not os.path.exists(self.log_file):
                    with open(self.log_file, 'w', encoding='utf-8') as f:
                        pass  # 创建空文件
                print(message.info['text'])
                time.sleep(5)
                # 检查是否包含特定文本
                if 'mobile.yangkeduo.com/goods1.html' in message.info['text']:
                    # 打开文件并逐行检查是否已存在该文本
                    with open(self.log_file, 'r', encoding='utf-8') as f:
                        file_content = f.readlines()
                    time.sleep(5)
                    # 如果 message.info['text'] 不在文件中,则写入
                    if message.info['text'] + '\n' not in file_content:
                            loguru.logger.info(message.info)  # 打印信息
                            # time.sleep(5.6)
                            # while not message.info:
                            bounds = message.bounds
                            loguru.logger.info(bounds)
                            # tuple_0 = (0,0,0,0)
                            # while tuple(bounds) != tuple_0:
                            if self.get_page_step() == 2:
                                self.click_man(message)
                                # message.click()
                                # time.sleep(1)
                                # break
                            # else:
                            #     time.sleep(1)
                            #     self.d.swipe_ext("up", scale=0.1)
                            if self.get_page_step() == 0:
                                time.sleep(3.1)
                                with open(self.log_file, 'a', encoding='utf-8') as f1:
                                    f1.write(message.info['text'] + '\n')
                                time.sleep(3.6)
                                self.d.screenshot(filename=f'{self.cnt}.jpg')
                                # 随机0~3
                                for i in range(0, random.randint(0, 3)):
                                    # 随机0.4~0.8
                                    scale = random.uniform(0.4, 0.8)
                                    time.sleep(2)
                                    self.d.swipe_ext("up", scale=scale)
                                    # 随机是否
                                    if random.randint(0, 1) == 1:
                                        time.sleep(3)
                                        self.d.swipe_ext("down", scale=scale)
                                    else:
                                        time.sleep(4)
                                        self.d.swipe_ext("up", scale=scale)
                                time.sleep(15.65)
                                self.d.press("back")
                                time.sleep(324)
                    else:
                        loguru.logger.info('已存在')
                        time.sleep(1)
                        continue
                else:
                    loguru.logger.info('未找到')
                    time.sleep(1)
                    continue
            time.sleep(3)
            self.d.swipe_ext("down", scale=0.8)
            time.sleep(5)



if __name__ == '__main__':
    ui = ui_auto2()
    ui.run()


评论