通过U2实现某多多微信小程序自动化处理,模拟点击访问某多多,可以实现自动分享商品给指定微信好友,完全自动化
import hashlib
import random
import time
from time import sleep
import loguru
import uiautomator2 as u2
import uiautomator2.exceptions
loguru.logger.add("log.txt", encoding="utf-8", format="{time} {level} {message}", rotation="10 MB")
"""
-995 连接设备失败
-996 启动应用失败
"""
def get_sha1_hash(info):
"""
计算给定字符串的MD5哈希值。
:param info: 字符串,需要进行MD5加密
:return: MD5哈希值的十六进制表示
"""
# 确保输入是字符串类型
if not isinstance(info, str):
raise ValueError("info参数必须是字符串类型")
# 创建MD5对象
sha1 = hashlib.sha1()
# 更新MD5对象,必须以字节形式传入
sha1.update(info.encode('utf-8'))
# 返回sha1哈希值的十六进制表示
return sha1.hexdigest()
class ui_auto2:
def __init__(self, wechat_name):
# 连接设备
# 18241FDF6002VJ
self.d = u2.connect()
self.cnt = 0
# self.we_chat_name = wechat_name
self.per_xpath = f'//*[@resource-id="com.tencent.mm:id/llt"]/android.widget.RelativeLayout/android.widget.LinearLayout'
self.search_blank_xpath = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]'
self.search_button_xpath = '//android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.view.ViewGroup[1]/android.widget.TextView[1]'
self.back_xpath = '//*[@resource-id="com.android.systemui:id/ends_group"]'
self.back_id_xpath = '//*[@resource-id="com.android.systemui:id/back"]'
self.share_xpath = '//*[@resource-id="android:id/content"]/android.widget.FrameLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[3]/android.view.View[1]'
self.share_wechat_xpath = '//*[@resource-id="android:id/content"]/android.widget.RelativeLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.HorizontalScrollView[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.ImageView[1]'
# self.share_list_xpath = '//*[@resource-id="com.tencent.mm:id/mil"]/android.widget.RelativeLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]//'
self.share_button_xpath = '//*[@resource-id="android:id/content"]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.Button[2]'
self.back_to_pdd_xpath = '//*[@resource-id="android:id/content"]/android.widget.LinearLayout[1]/android.widget.LinearLayout[1]/android.widget.LinearLayout[2]/android.widget.LinearLayout[1]/android.widget.Button[1]'
self.goods_list_xpath = '//*[@resource-id="com.xunmeng.pinduoduo:id/tv_title"]'
self.ran = 5 + random.random() * (15 - 5)
if self.d.info:
loguru.logger.info(f"connected {self.d.serial} successfully")
else:
loguru.logger.error(f"connect {self.d.serial} failed")
exit(-995)
time.sleep(self.ran)
def run(self):
# 启动拼多多应用
self.d.session("com.xunmeng.pinduoduo")
# 好像不能通过查看当前app确定pdd了可能是没root
# if self.d.app_current()['package']=='com.xunmeng.pinduoduo':
# loguru.logger.info(f"start pinduoduo")
# else:
# loguru.logger.error(f"start pinduoduo failed")
# exit(-996)
sleep(self.ran)
def find_by_tag(self, tag):
tag_xpath = f'//android.widget.TextView[@text="{tag}"]'
phone = self.d.xpath(tag_xpath)
phone.click()
time.sleep(self.ran)
for i in range(0, 10):
if i == 0:
self.d.swipe_ext("up", scale=0.6)
else:
self.d.swipe_ext("up", scale=1.0)
list_name = self.d.xpath(self.goods_list_xpath)
for elem in list_name.all():
hash_info=get_sha1_hash(info=str(elem.info))
if hash_info in uni_list:
continue
uni_list.add(hash_info)
elem.click()
time.sleep(self.ran)
self.d.swipe_ext("up", scale=0.5)
self.d.swipe_ext("up", scale=0.5)
self.d.xpath(self.back_xpath).child(self.back_id_xpath).click()
time.sleep(self.ran)
def find_by_search(self, tag):
self.d.xpath(self.search_blank_xpath).click()
time.sleep(self.ran)
self.d.send_keys(tag)
time.sleep(self.ran)
self.d.xpath(self.search_button_xpath).click()
time.sleep(self.ran)
for j in range(1, 10):
if j == 1:
self.d.swipe_ext("up", scale=0.6)
else:
self.d.swipe_ext("up", scale=0.8)
list_name = self.d.xpath(self.goods_list_xpath)
time.sleep(self.ran)
for elem in list_name.all():
hash_info=get_sha1_hash(info=str(elem.info))
if hash_info in uni_list:
continue
uni_list.add(hash_info)
elem.click()
time.sleep(self.ran)
self.d.swipe_ext("up", scale=0.5)
self.d.swipe_ext("up", scale=0.5)
self.d.xpath(self.back_xpath).child(self.back_id_xpath).click()
time.sleep(self.ran)
def find_by_send(self, tag):
# 定义导航栏标签文本
tag_xpath = f'//android.widget.TextView[@text="{tag}"]'
phone = self.d.xpath(tag_xpath)
# 点击对应的文本
phone.click()
time.sleep(self.ran)
# 定义下滑次数
for i in range(0, 1000):
# time.sleep(170+random.random()*15)
# 决定下滑宽度
if i == 0:
self.d.swipe_ext("up", scale=0.6)
else:
self.d.swipe_ext("up", scale=0.8)
# 获取商品列表
list_name = self.d.xpath(self.goods_list_xpath)
for elem in list_name.all():
try:
print(elem.info)
# 去重
hash_info=get_sha1_hash(info=str(elem.info))
if hash_info in uni_list:
continue
uni_list.add(hash_info)
self.cnt+=1
time.sleep(self.ran)
loguru.logger.info(f'正在处理{self.cnt}条')
# 点击商品
elem.click()
time.sleep(self.ran)
# 分享按钮点击
self.d.xpath(self.share_xpath).click()
time.sleep(self.ran)
# 点击分享到微信
self.d.xpath(self.share_wechat_xpath).click()
time.sleep(self.ran)
# 点击微信好友
#待修改
self.d.xpath(self.per_xpath).click()
time.sleep(self.ran)
# 点击分享按钮
self.d.xpath(self.share_button_xpath).click()
time.sleep(self.ran)
# 返回到拼多多
self.d.xpath(self.back_to_pdd_xpath).click()
time.sleep(self.ran)
# 返回到商品列表
self.d.xpath(self.back_xpath).child(self.back_id_xpath).click()
time.sleep(self.ran)
except uiautomator2.exceptions.XPathElementNotFoundError:
loguru.logger.info('没有找到该标签')
self.d.xpath(self.back_xpath).child(self.back_id_xpath).click()
continue
if __name__ == '__main__':
uni_list = set()
ui = ui_auto2(wechat_name='爱冲浪的网民')
ui.run()
# ui.find_by_search('华为')
# ui.find_by_tag('手机')
ui.find_by_send('手机')