Compare commits

...

6 commits

Author SHA1 Message Date
28cf4d7065 fix:订单时间读取错误造成误判漏单
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
2024-10-05 16:37:56 +08:00
890efff953 整理requirements.in
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
2024-10-05 15:18:52 +08:00
ff8a7dd020 补充依赖
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
2024-10-05 11:25:19 +08:00
cbd6bf6cce Merge branch 'main' of https://git.zhaozuohong.vip/mower-ng/mower-ng
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
2024-10-05 11:24:27 +08:00
92e866e195 修复了由于YJ修改登陆接口导致无法正确登陆的问题/扫仓库同时使用图像识别/整理森空岛部分的代码 2024-10-05 11:24:05 +08:00
5be6361f97 perf:更新刷理智周计划的说明
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
2024-10-05 09:58:34 +08:00
10 changed files with 545 additions and 337 deletions

View file

@ -10,8 +10,7 @@ from typing import Literal
from mower.data import agent_list, base_room_list
from mower.solvers.credit import CreditSolver
from mower.solvers.credit_fight import CreditFight
# from mower.solvers.cultivate_depot import cultivate as cultivateDepotSolver
from mower.solvers.cultivate_depot import cultivate as cultivateDepotSolver
from mower.solvers.depotREC import depotREC as DepotSolver
from mower.solvers.infra.base_mixin import BaseMixin
from mower.solvers.infra.clue import ClueSolver
@ -2870,7 +2869,7 @@ class BaseSchedulerSolver(SceneGraphSolver, BaseMixin):
def 仓库扫描(self):
try:
# cultivateDepotSolver().start()
cultivateDepotSolver().start()
DepotSolver().run()
except Exception as e:
logger.exception(f"先不运行 出bug了 : {e}")

View file

@ -1,166 +1,42 @@
import hashlib
import hmac
import json
import time
from urllib import parse
import requests
from mower.utils import config
from mower.utils.log import logger
from mower.utils.path import get_path
app_code = "4ca99fa6b56cc2ba"
# 签到url
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 绑定的角色url
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 验证码url
login_code_url = "https://as.hypergryph.com/general/v1/send_phone_code"
# 验证码登录
token_phone_code_url = "https://as.hypergryph.com/user/auth/v2/token_by_phone_code"
# 密码登录
token_password_url = "https://as.hypergryph.com/user/auth/v1/token_by_phone_password"
# 使用token获得认证代码
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 使用认证代码获得cred
cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
from mower.utils.skland import (
get_binding_list,
get_cred_by_token,
get_sign_header,
header,
log,
)
class cultivate:
def __init__(self):
self.record_path = get_path("@app/tmp/cultivate.json")
self.header = {
"cred": "",
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
self.header_login = {
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
self.reward = []
self.header_for_sign = {"platform": "", "timestamp": "", "dId": "", "vName": ""}
self.sign_token = ""
self.all_recorded = True
def start(self):
for item in config.conf.skland_info:
if item.isCheck:
self.save_param(self.get_cred_by_token(self.log(item)))
for i in self.get_binding_list():
self.save_param(get_cred_by_token(log(item)))
for i in get_binding_list(self.sign_token):
if item.cultivate_select == i.get("isOfficial"):
body = {"gameId": 1, "uid": i.get("uid")}
ingame = f"https://zonai.skland.com/api/v1/game/cultivate/player?uid={i.get('uid')}"
resp = requests.get(
ingame,
headers=self.get_sign_header(
ingame, "get", body, self.header
headers=get_sign_header(
ingame, "get", body, self.sign_token
),
).json()
with open(self.record_path, "w", encoding="utf-8") as file:
json.dump(resp, file, ensure_ascii=False, indent=4)
def save_param(self, cred_resp):
self.header["cred"] = cred_resp["cred"]
header["cred"] = cred_resp["cred"]
self.sign_token = cred_resp["token"]
def log(self, account):
r = requests.post(
token_password_url,
json={"phone": account.account, "password": account.password},
headers=self.header_login,
).json()
if r.get("status") != 0:
raise Exception(f'获得token失败{r["msg"]}')
return r["data"]["token"]
def get_cred_by_token(self, token):
return self.get_cred(self.get_grant_code(token))
def get_grant_code(self, token):
response = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=self.header_login,
)
resp = response.json()
if response.status_code != 200:
raise Exception(f"获得认证代码失败:{resp}")
if resp.get("status") != 0:
raise Exception(f'获得认证代码失败:{resp["msg"]}')
return resp["data"]["code"]
def get_cred(self, grant):
resp = requests.post(
cred_code_url, json={"code": grant, "kind": 1}, headers=self.header_login
).json()
if resp["code"] != 0:
raise Exception(f'获得cred失败{resp["message"]}')
return resp["data"]
def get_binding_list(self):
v = []
resp = requests.get(
binding_url,
headers=self.get_sign_header(binding_url, "get", None, self.header),
).json()
if resp["code"] != 0:
logger.warning(f"请求角色列表出现问题:{resp['message']}")
if resp.get("message") == "用户未登录":
logger.warning("用户登录可能失效了,请重新运行此程序!")
return []
for i in resp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v
def get_sign_header(self, url: str, method, body, old_header):
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
h["sign"], header_ca = self.generate_signature(
self.sign_token, p.path, p.query
)
else:
h["sign"], header_ca = self.generate_signature(
self.sign_token, p.path, json.dumps(body)
)
for i in header_ca:
h[i] = header_ca[i]
return h
def generate_signature(self, token: str, path, body_or_query):
"""
获得签名头
接口地址+方法为Get请求用query否则用body+时间戳+ 请求头的四个重要参数dIdplatformtimestampvName.toJSON()
将此字符串做HMAC加密算法为SHA-256密钥token为请求cred接口会返回的一个token值
再将加密后的字符串做MD5即得到sign
:param token: 拿cred时候的token
:param path: 请求路径不包括网址
:param body_or_query: 如果是GET则是它的queryPOST则为它的body
:return: 计算完毕的sign
"""
# 总是说请勿修改设备时间怕不是yj你的服务器有问题吧所以这里特地-2
t = str(int(time.time()) - 2)
token = token.encode("utf-8")
header_ca = json.loads(json.dumps(self.header_for_sign))
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str
hex_s = hmac.new(token, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = (
hashlib.md5(hex_s.encode("utf-8"))
.hexdigest()
.encode("utf-8")
.decode("utf-8")
)
return md5, header_ca

View file

@ -59,7 +59,7 @@ class GetOrderRemainingTimeSolver(SceneGraphSolver, BaseMixin):
return value
def read_remain_time(self, pos) -> int:
h, m, s = self.number(pos, 19, 90).split("::")
h, m, s = self.number(pos, 19, 100).split("::")
return int(h) * 3600 + int(m) * 60 + int(s)
def timeout(self) -> bool:

View file

@ -1,10 +1,5 @@
import datetime
import hashlib
import hmac
import json
import os
import time
from urllib import parse
import pandas as pd
import requests
@ -12,45 +7,24 @@ import requests
from mower.utils import config
from mower.utils.log import logger
from mower.utils.path import get_path
app_code = "4ca99fa6b56cc2ba"
# 签到url
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 绑定的角色url
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 验证码url
login_code_url = "https://as.hypergryph.com/general/v1/send_phone_code"
# 验证码登录
token_phone_code_url = "https://as.hypergryph.com/user/auth/v2/token_by_phone_code"
# 密码登录
token_password_url = "https://as.hypergryph.com/user/auth/v1/token_by_phone_password"
# 使用token获得认证代码
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 使用认证代码获得cred
cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
from mower.utils.skland import (
get_binding_list,
get_cred_by_token,
get_sign_header,
header,
header_login,
log,
sign_url,
token_password_url,
)
class SKLand:
def __init__(self):
self.record_path = get_path("@app/tmp/skland.csv")
self.header = {
"cred": "",
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
self.header_login = {
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
self.reward = []
# 签名请求头一定要这个顺序,否则失败
# timestamp是必填的,其它三个随便填,不要为none即可
self.header_for_sign = {"platform": "", "timestamp": "", "dId": "", "vName": ""}
self.sign_token = ""
self.all_recorded = True
@ -59,13 +33,14 @@ class SKLand:
if self.has_record(item.account):
continue
self.all_recorded = False
self.save_param(self.get_cred_by_token(self.log(item)))
for i in self.get_binding_list():
self.save_param(get_cred_by_token(log(item)))
for i in get_binding_list(self.sign_token):
body = {"gameId": 1, "uid": i.get("uid")}
# list_awards(1, i.get('uid'))
resp = requests.post(
sign_url,
headers=self.get_sign_header(sign_url, "post", body, self.header),
headers=get_sign_header(
sign_url, "post", body, self.sign_token, header
),
json=body,
).json()
if resp["code"] != 0:
@ -93,104 +68,19 @@ class SKLand:
return False
def save_param(self, cred_resp):
self.header["cred"] = cred_resp["cred"]
header["cred"] = cred_resp["cred"]
self.sign_token = cred_resp["token"]
def log(self, account):
r = requests.post(
token_password_url,
json={"phone": account.account, "password": account.password},
headers=self.header_login,
headers=header_login,
).json()
if r.get("status") != 0:
raise Exception(f'获得token失败{r["msg"]}')
return r["data"]["token"]
def get_cred_by_token(self, token):
return self.get_cred(self.get_grant_code(token))
def get_grant_code(self, token):
response = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=self.header_login,
)
resp = response.json()
if response.status_code != 200:
raise Exception(f"获得认证代码失败:{resp}")
if resp.get("status") != 0:
raise Exception(f'获得认证代码失败:{resp["msg"]}')
return resp["data"]["code"]
def get_cred(self, grant):
resp = requests.post(
cred_code_url, json={"code": grant, "kind": 1}, headers=self.header_login
).json()
if resp["code"] != 0:
raise Exception(f'获得cred失败{resp["message"]}')
return resp["data"]
def get_binding_list(self):
v = []
resp = requests.get(
binding_url,
headers=self.get_sign_header(binding_url, "get", None, self.header),
).json()
if resp["code"] != 0:
print(f"请求角色列表出现问题:{resp['message']}")
if resp.get("message") == "用户未登录":
print("用户登录可能失效了,请重新运行此程序!")
return []
for i in resp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v
def get_sign_header(self, url: str, method, body, old_header):
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
h["sign"], header_ca = self.generate_signature(
self.sign_token, p.path, p.query
)
else:
h["sign"], header_ca = self.generate_signature(
self.sign_token, p.path, json.dumps(body)
)
for i in header_ca:
h[i] = header_ca[i]
return h
def generate_signature(self, token: str, path, body_or_query):
"""
获得签名头
接口地址+方法为Get请求用query否则用body+时间戳+ 请求头的四个重要参数dIdplatformtimestampvName.toJSON()
将此字符串做HMAC加密算法为SHA-256密钥token为请求cred接口会返回的一个token值
再将加密后的字符串做MD5即得到sign
:param token: 拿cred时候的token
:param path: 请求路径不包括网址
:param body_or_query: 如果是GET则是它的queryPOST则为它的body
:return: 计算完毕的sign
"""
# 总是说请勿修改设备时间怕不是yj你的服务器有问题吧所以这里特地-2
t = str(int(time.time()) - 2)
token = token.encode("utf-8")
header_ca = json.loads(json.dumps(self.header_for_sign))
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str
hex_s = hmac.new(token, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = (
hashlib.md5(hex_s.encode("utf-8"))
.hexdigest()
.encode("utf-8")
.decode("utf-8")
)
return md5, header_ca
def record_log(self):
date_str = datetime.datetime.now().strftime("%Y/%m/%d")
logger.info(f"存入{date_str}的数据{self.reward}")
@ -227,8 +117,8 @@ class SKLand:
for item in config.conf.skland_info:
if item.isCheck:
try:
self.save_param(self.get_cred_by_token(self.log(item)))
for i in self.get_binding_list():
self.save_param(get_cred_by_token(log(item)))
for i in get_binding_list(self.sign_token):
if i["uid"]:
res.append(
"{}连接成功".format(

318
mower/utils/SecuritySm.py Normal file
View file

@ -0,0 +1,318 @@
# from https://gitee.com/FancyCabbage/skyland-auto-sign
import base64
import gzip
import hashlib
# 数美加密方法类
import json
import time
import uuid
import requests
from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.ciphers.base import Cipher
from cryptography.hazmat.primitives.ciphers.modes import CBC, ECB
# 查询dId请求头
devices_info_url = "https://fp-it.portal101.cn/deviceprofile/v4"
# 数美配置
SM_CONFIG = {
"organization": "UWXspnCCJN4sfYlNfqps",
"appId": "default",
"publicKey": "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCmxMNr7n8ZeT0tE1R9j/mPixoinPkeM+k4VGIn/s0k7N5rJAfnZ0eMER+QhwFvshzo0LNmeUkpR8uIlU/GEVr8mN28sKmwd2gpygqj0ePnBmOW4v0ZVwbSYK+izkhVFk2V/doLoMbWy6b+UnA8mkjvg0iYWRByfRsK2gdl7llqCwIDAQAB",
"protocol": "https",
"apiHost": "fp-it.portal101.cn",
}
PK = serialization.load_der_public_key(base64.b64decode(SM_CONFIG["publicKey"]))
DES_RULE = {
"appId": {
"cipher": "DES",
"is_encrypt": 1,
"key": "uy7mzc4h",
"obfuscated_name": "xx",
},
"box": {"is_encrypt": 0, "obfuscated_name": "jf"},
"canvas": {
"cipher": "DES",
"is_encrypt": 1,
"key": "snrn887t",
"obfuscated_name": "yk",
},
"clientSize": {
"cipher": "DES",
"is_encrypt": 1,
"key": "cpmjjgsu",
"obfuscated_name": "zx",
},
"organization": {
"cipher": "DES",
"is_encrypt": 1,
"key": "78moqjfc",
"obfuscated_name": "dp",
},
"os": {
"cipher": "DES",
"is_encrypt": 1,
"key": "je6vk6t4",
"obfuscated_name": "pj",
},
"platform": {
"cipher": "DES",
"is_encrypt": 1,
"key": "pakxhcd2",
"obfuscated_name": "gm",
},
"plugins": {
"cipher": "DES",
"is_encrypt": 1,
"key": "v51m3pzl",
"obfuscated_name": "kq",
},
"pmf": {
"cipher": "DES",
"is_encrypt": 1,
"key": "2mdeslu3",
"obfuscated_name": "vw",
},
"protocol": {"is_encrypt": 0, "obfuscated_name": "protocol"},
"referer": {
"cipher": "DES",
"is_encrypt": 1,
"key": "y7bmrjlc",
"obfuscated_name": "ab",
},
"res": {
"cipher": "DES",
"is_encrypt": 1,
"key": "whxqm2a7",
"obfuscated_name": "hf",
},
"rtype": {
"cipher": "DES",
"is_encrypt": 1,
"key": "x8o2h2bl",
"obfuscated_name": "lo",
},
"sdkver": {
"cipher": "DES",
"is_encrypt": 1,
"key": "9q3dcxp2",
"obfuscated_name": "sc",
},
"status": {
"cipher": "DES",
"is_encrypt": 1,
"key": "2jbrxxw4",
"obfuscated_name": "an",
},
"subVersion": {
"cipher": "DES",
"is_encrypt": 1,
"key": "eo3i2puh",
"obfuscated_name": "ns",
},
"svm": {
"cipher": "DES",
"is_encrypt": 1,
"key": "fzj3kaeh",
"obfuscated_name": "qr",
},
"time": {
"cipher": "DES",
"is_encrypt": 1,
"key": "q2t3odsk",
"obfuscated_name": "nb",
},
"timezone": {
"cipher": "DES",
"is_encrypt": 1,
"key": "1uv05lj5",
"obfuscated_name": "as",
},
"tn": {
"cipher": "DES",
"is_encrypt": 1,
"key": "x9nzj1bp",
"obfuscated_name": "py",
},
"trees": {
"cipher": "DES",
"is_encrypt": 1,
"key": "acfs0xo4",
"obfuscated_name": "pi",
},
"ua": {
"cipher": "DES",
"is_encrypt": 1,
"key": "k92crp1t",
"obfuscated_name": "bj",
},
"url": {
"cipher": "DES",
"is_encrypt": 1,
"key": "y95hjkoo",
"obfuscated_name": "cf",
},
"version": {"is_encrypt": 0, "obfuscated_name": "version"},
"vpw": {
"cipher": "DES",
"is_encrypt": 1,
"key": "r9924ab5",
"obfuscated_name": "ca",
},
}
BROWSER_ENV = {
"plugins": "MicrosoftEdgePDFPluginPortableDocumentFormatinternal-pdf-viewer1,MicrosoftEdgePDFViewermhjfbmdgcfjbbpaeojofohoefgiehjai1",
"ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0",
"canvas": "259ffe69", # 基于浏览器的canvas获得的值不知道复用行不行
"timezone": -480, # 时区,应该是固定值吧
"platform": "Win32",
"url": "https://www.skland.com/", # 固定值
"referer": "",
"res": "1920_1080_24_1.25", # 屏幕宽度_高度_色深_window.devicePixelRatio
"clientSize": "0_0_1080_1920_1920_1080_1920_1080",
"status": "0011", # 不知道在干啥
}
# // 将浏览器环境对象的key全部排序然后对其所有的值及其子对象的值加入数字并字符串相加。若值为数字则乘以10000(0x2710)再将其转成字符串存入数组,最后再做md5,存入tn变量tn变量要做加密
# //把这个对象用加密规则进行加密然后对结果做GZIP压缩结果是对象应该有序列化最后做AES加密加密细节目前不清除密钥为变量priId
# //加密规则新对象的key使用相对应加解密规则的obfuscated_name值value为字符串化后进行进行DES加密再进行btoa加密
# 通过测试
def _DES(o: dict):
result = {}
for i in o.keys():
if i in DES_RULE.keys():
rule = DES_RULE[i]
res = o[i]
if rule["is_encrypt"] == 1:
c = Cipher(TripleDES(rule["key"].encode("utf-8")), ECB())
data = str(res).encode("utf-8")
# 补足字节
data += b"\x00" * 8
res = base64.b64encode(c.encryptor().update(data)).decode("utf-8")
result[rule["obfuscated_name"]] = res
else:
result[i] = o[i]
return result
# 通过测试
def _AES(v: bytes, k: bytes):
iv = "0102030405060708"
key = AES(k)
c = Cipher(key, CBC(iv.encode("utf-8")))
c.encryptor()
# 填充明文
v += b"\x00"
while len(v) % 16 != 0:
v += b"\x00"
return c.encryptor().update(v).hex()
def GZIP(o: dict):
# 这个压缩结果似乎和前台不太一样,不清楚是否会影响
json_str = json.dumps(o, ensure_ascii=False)
stream = gzip.compress(json_str.encode("utf-8"), 2, mtime=0)
return base64.b64encode(stream)
# 获得tn的值,后续做DES加密用
# 通过测试
def get_tn(o: dict):
sorted_keys = sorted(o.keys())
result_list = []
for i in sorted_keys:
v = o[i]
if isinstance(v, (int, float)):
v = str(v * 10000)
elif isinstance(v, dict):
v = get_tn(v)
result_list.append(v)
return "".join(result_list)
def get_smid():
t = time.localtime()
_time = "{}{:0>2d}{:0>2d}{:0>2d}{:0>2d}{:0>2d}".format(
t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec
)
uid = str(uuid.uuid4())
v = _time + hashlib.md5(uid.encode("utf-8")).hexdigest() + "00"
smsk_web = hashlib.md5(("smsk_web_" + v).encode("utf-8")).hexdigest()[0:14]
return v + smsk_web + "0"
def get_d_id():
# storageName = '.thumbcache_' + md5(SM_CONFIG['organization']) // 用于从本地存储获得值
# uid = uuid()
# priId=md5(uid)[0:16]
# ep=rsa(uid,publicKey)
# SMID = localStorage.get(storageName);// 获得本地存储存的值
# _0x30b2eb为递归md5
uid = str(uuid.uuid4()).encode("utf-8")
priId = hashlib.md5(uid).hexdigest()[0:16]
# ep不一定对先走走看
ep = PK.encrypt(uid, padding.PKCS1v15())
ep = base64.b64encode(ep).decode("utf-8")
browser = BROWSER_ENV.copy()
current_time = int(time.time() * 1000)
browser.update(
{
"vpw": str(uuid.uuid4()),
"svm": current_time,
"trees": str(uuid.uuid4()),
"pmf": current_time,
}
)
des_target = {
**browser,
"protocol": 102,
"organization": SM_CONFIG["organization"],
"appId": SM_CONFIG["appId"],
"os": "web",
"version": "3.0.0",
"sdkver": "3.0.0",
"box": "", # 似乎是个SMID但是第一次的时候是空,不过不影响结果
"rtype": "all",
"smid": get_smid(),
"subVersion": "1.0.0",
"time": 0,
}
des_target["tn"] = hashlib.md5(get_tn(des_target).encode()).hexdigest()
des_result = _AES(GZIP(_DES(des_target)), priId.encode("utf-8"))
response = requests.post(
devices_info_url,
json={
"appId": "default",
"compress": 2,
"data": des_result,
"encode": 5,
"ep": ep,
"organization": SM_CONFIG["organization"],
"os": "web", # 固定值
},
)
resp = response.json()
if resp["code"] != 1100:
raise Exception("did计算失败请联系作者")
# 开头必须是B
return "B" + resp["detail"]["deviceId"]

View file

@ -6,6 +6,7 @@ import pandas as pd
# from .log import logger
from mower.data import key_mapping
from mower.utils.log import logger
# from typing import Dict, List, Union
from mower.utils.path import get_path
@ -17,13 +18,10 @@ def 读取仓库():
创建json()
with open(path, "r", encoding="utf-8") as f:
depotinfo = json.load(f)
depotinfo = {
"code": 0,
"message": "OK",
"timestamp": "0",
"data": {"items": [{"id": "0", "count": "0"}]},
}
物品数量 = depotinfo["data"]["items"]
logger.info(depotinfo["timestamp"])
time = int(depotinfo["timestamp"])
新物品1 = {
key_mapping[item["id"]][2]: int(item["count"])
for item in 物品数量
@ -43,7 +41,7 @@ def 读取仓库():
新物品json = {}
for item in 新物品:
新物品json[key_mapping[item][0]] = 新物品[item]
time = depotinfo.iloc[-1, 0]
# time = depotinfo.iloc[-1, 0]
sort = {
"A常用": [

148
mower/utils/skland.py Normal file
View file

@ -0,0 +1,148 @@
import hashlib
import hmac
import json
import time
from urllib import parse
import requests
from mower.utils.log import logger
from mower.utils.SecuritySm import get_d_id
app_code = "4ca99fa6b56cc2ba"
# 签到url
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 绑定的角色url
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 验证码url
login_code_url = "https://as.hypergryph.com/general/v1/send_phone_code"
# 验证码登录
token_phone_code_url = "https://as.hypergryph.com/user/auth/v2/token_by_phone_code"
# 密码登录
token_password_url = "https://as.hypergryph.com/user/auth/v1/token_by_phone_password"
# 使用token获得认证代码
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 使用认证代码获得cred
cred_code_url = "https://zonai.skland.com/web/v1/user/auth/generate_cred_by_code"
header = {
"cred": "",
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
header_login = {
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
"dId": get_d_id(),
}
header_for_sign = {"platform": "", "timestamp": "", "dId": "", "vName": ""}
def generate_signature(token: str, path, body_or_query):
"""
获得签名头
接口地址+方法为Get请求用query否则用body+时间戳+ 请求头的四个重要参数dIdplatformtimestampvName.toJSON()
将此字符串做HMAC加密算法为SHA-256密钥token为请求cred接口会返回的一个token值
再将加密后的字符串做MD5即得到sign
:param token: 拿cred时候的token
:param path: 请求路径不包括网址
:param body_or_query: 如果是GET则是它的queryPOST则为它的body
:return: 计算完毕的sign
"""
# 总是说请勿修改设备时间怕不是yj你的服务器有问题吧所以这里特地-2
t = str(int(time.time()) - 2)
token = token.encode("utf-8")
header_ca = json.loads(json.dumps(header_for_sign))
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str
hex_s = hmac.new(token, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest().encode("utf-8").decode("utf-8")
return md5, header_ca
def get_sign_header(url: str, method, body, sign_token, old_header=header):
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
h["sign"], header_ca = generate_signature(sign_token, p.path, p.query)
else:
h["sign"], header_ca = generate_signature(sign_token, p.path, json.dumps(body))
for i in header_ca:
h[i] = header_ca[i]
return h
def get_grant_code(token):
response = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=header_login,
)
resp = response.json()
if response.status_code != 200:
raise Exception(f"获得认证代码失败:{resp}")
if resp.get("status") != 0:
raise Exception(f'获得认证代码失败:{resp["msg"]}')
return resp["data"]["code"]
def get_cred(grant):
"""
获取cred
:param cred_code_url: 获取cred的URL
:param grant: 授权代码
:param header_login: 登录请求头
:return: cred
"""
resp = requests.post(
cred_code_url, json={"code": grant, "kind": 1}, headers=header_login
).json()
if resp["code"] != 0:
raise Exception(f'获得cred失败{resp["message"]}')
return resp["data"]
def get_binding_list(sign_token):
v = []
resp = requests.get(
binding_url,
headers=get_sign_header(
binding_url,
"get",
None,
sign_token,
),
).json()
if resp["code"] != 0:
logger.info(f"请求角色列表出现问题:{resp['message']}")
if resp.get("message") == "用户未登录":
logger.warning("用户登录可能失效了,请重新运行此程序!")
return []
for i in resp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v
def get_cred_by_token(token):
return get_cred(get_grant_code(token))
def log(account):
r = requests.post(
token_password_url,
json={"phone": account.account, "password": account.password},
headers=header_login,
).json()
if r.get("status") != 0:
raise Exception(f'获得token失败{r["msg"]}')
logger.info("森空岛登陆成功")
return r["data"]["token"]

View file

@ -55,3 +55,6 @@ Jinja2==3.1.4
# 启动模拟器后按快捷键
PyAutoGUI==0.9.54
# 森空岛检测
cryptography==43.0.1

View file

@ -14,14 +14,27 @@ bottle==0.12.25
# via pywebview
certifi==2024.7.4
# via requests
cffi==1.17.1
# via
# clr-loader
# cryptography
charset-normalizer==3.3.2
# via requests
click==8.1.7
# via flask
clr-loader==0.2.6
# via pythonnet
colorama==0.4.6
# via
# click
# colorlog
# qrcode
coloredlogs==15.0.1
# via onnxruntime
colorlog==6.8.2
# via -r requirements.in
cryptography==43.0.1
# via -r requirements.in
evalidate==2.0.2
# via -r requirements.in
flask==3.0.3
@ -108,6 +121,8 @@ pyautogui==0.9.54
# via -r requirements.in
pyclipper==1.3.0.post5
# via rapidocr-onnxruntime
pycparser==2.22
# via cffi
pydantic==2.8.2
# via -r requirements.in
pydantic-core==2.20.1
@ -120,6 +135,8 @@ pyperclip==1.9.0
# via mouseinfo
pypng==0.20220715.0
# via qrcode
pyreadline3==3.5.4
# via humanfriendly
pyrect==0.2.0
# via pygetwindow
pyscreeze==0.1.30
@ -128,12 +145,8 @@ pystray==0.19.5
# via -r requirements.in
python-dateutil==2.9.0.post0
# via pandas
python-xlib==0.33
# via pystray
python3-xlib==0.15
# via
# mouseinfo
# pyautogui
pythonnet==3.0.4
# via pywebview
pytweening==1.2.0
# via pyautogui
pytz==2024.1
@ -171,7 +184,6 @@ six==1.16.0
# via
# pystray
# python-dateutil
# python-xlib
# rapidocr-onnxruntime
sympy==1.13.0
# via onnxruntime
@ -186,7 +198,9 @@ typing-extensions==4.12.2
# pywebview
# qrcode
tzdata==2024.1
# via pandas
# via
# pandas
# tzlocal
tzlocal==5.2
# via -r requirements.in
urllib3==2.2.1

View file

@ -353,17 +353,12 @@ function drop(plan, ev) {
<n-card>
<n-checkbox v-model:checked="maa_enable" class="card-title">刷理智周计划</n-checkbox>
有问题先看问号2023年12月6日更新了 请点击下方按钮
有问题先看问号2024年10月5日更新了 请点击下方按钮
<help-text>
<div>支持第一章第八章第十二章主线关卡</div>
<div>支持所有主线关卡</div>
<div>如何不打本<b>把所有的"打"都点了</b>或者点一次<b>清除当前配置以匹配最新表格</b></div>
<div>建议每次更新后 点一次<b>清除当前配置以匹配最新表格</b></div>
<div>如果觉得这个表有什么要改进的 at群管理</div>
<div>前三行的空行用来写 一些自定义关卡</div>
<div>
新用法<n-tag closable>SSReopen-XX</n-tag> <n-tag closable>SSReopen-FC</n-tag
>可以刷照我以火的所有复刻本(FC-1到FC-8)
</div>
<ul>
<li>
在第一行填入<n-tag closable class="tag-mr">HE-7</n-tag>
@ -372,12 +367,7 @@ function drop(plan, ev) {
则会刷活动关HE-7若活动未开放或刷完HE-7有剩余的理智则刷1-7
</li>
<li><n-tag closable>12-17标准</n-tag> 表示12-17标准难度</li>
<li><n-tag closable>12-17磨难</n-tag> 表示12-17磨难难度</li>
<li>
<b>信用作战</b>若信用作战选项已开启且当日计划不包含
<n-tag closable>上次作战</n-tag>则自动进行信用作战
</li>
<li>含磨难的主线关卡会自动选择可以代理的难度刷理智仅需要写关卡名<n-tag closable>12-17</n-tag> </li>
</ul>
</help-text>
@ -443,54 +433,26 @@ function drop(plan, ev) {
<tbody>
<tr v-for="(plan, index) in maa_weekly_plan1" :key="plan.weekday1">
<td>
<n-button
:v-model="plan"
@click="() => togglePlan(plan)"
quaternary
style="width: 100%; height: 100%"
class="class1"
></n-button>
<n-button :v-model="plan" @click="() => togglePlan(plan)" quaternary style="width: 100%; height: 100%"
class="class1"></n-button>
</td>
<td>
<template v-if="index > 1 && index < 5">
<n-select
v-if="Array.isArray(plan.stage) && plan.stage.length === 0"
placeholder="关卡"
v-model:value="plan.stage"
filterable
multiple
tag
:show="false"
:show-arrow="false"
:on-create="create_tag"
@update:value="(value) => changestage(plan, value)"
@dragover.prevent
@drop="drop(plan, $event)"
/>
<n-tag
v-else
closable
class="custom-tag"
@close="changestage(plan, [])"
@dragover.prevent
@drop="drop(plan, $event)"
>
<n-select v-if="Array.isArray(plan.stage) && plan.stage.length === 0" placeholder="关卡"
v-model:value="plan.stage" filterable multiple tag :show="false" :show-arrow="false"
:on-create="create_tag" @update:value="(value) => changestage(plan, value)" @dragover.prevent
@drop="drop(plan, $event)" />
<n-tag v-else closable class="custom-tag" @close="changestage(plan, [])" @dragover.prevent
@drop="drop(plan, $event)">
{{ plan.stage[0] }}
</n-tag>
</template>
<span v-else>{{ showstage(plan.stage) }}</span>
</td>
<td
v-for="day in daysOfWeek"
:class="{ class2: plan[day] === 2, class1: plan[day] === 1 }"
>
<td v-for="day in daysOfWeek" :class="{ class2: plan[day] === 2, class1: plan[day] === 1 }">
<template v-if="plan[day] !== 0">
<n-button
:v-model="plan[day]"
@click="() => togglePlanAndStage(plan, day)"
quaternary
style="width: 100%; height: 100%"
>
<n-button :v-model="plan[day]" @click="() => togglePlanAndStage(plan, day)" quaternary
style="width: 100%; height: 100%">
<span v-if="plan[day] === 2"></span>
<span v-if="plan[day] === 1"></span>
</n-button>