暑假写的暂时没空解释,先贴代码。
大致的接口是这样,下面这个代码有很很多是被我写死了,需要自己去找下特定的字段。
from hyper.contrib import HTTP20Adapter from requests import session import urllib.parse from datetime import datetime, timedelta from random import randint,sample from local import Local import json,time class Params(object): """docstring for Params""" def __init__(self ): super(Params, self).__init__() self.headers = { "main": { "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", }, "log": { "sec-fetch-dest": "image", "sec-fetch-mode": "cors", "sec-fetch-site": "cross-site" } } def id(self,num=24,s): H = 'abcdef0'*3 tg = "".join(sample(H,num)) return tg def projectGroup(self,name="text",s): return { "showAll": True, "open": False, "name": name, "teamId": None, "listType": "group", "sortType": "", "id": self.id() } def task(self,s): # "repeatFlag":"RRULE:FREQ=WEEKLY;INTERVAL=1;WKST=MO;BYDAY=SU,TU,WE,TH # "repeatFlag": "ERULE:NAME=TT_WORKDAY;CYCLE=0", return { "items": [], "reminders": [], "exDate": [], "isAllDay": False, "repeatFrom": "0", "startDate": "020-07-20", "dueDate": "2020-07-21T07", "id": self.id() , "projectId": self.projectId , "title": "测试开发", "content": " api测试接口 ", "desc":"", "tags": [], "priority": 0, "progress": 0, "assignee": None, "isFloating": False, "status": 0, "modifiedTime": self.getTime() , "timeZone": "Asia/Shanghai", } def habit(self,s): return { "color": "#70CE62", "iconRes": "txt_棒", "createdTime": self.getTime(), "encouragement": "", "etag": "", "goal": 1, "id": self.id(), "modifiedTime": self.getTime(), "name": "太棒了", "recordEnable": True, "reminders": ["10:00"], "repeatRule": "RRULE:FREQ=WEEKLY;BYDAY=SU,MO,TU,WE,TH,FR,SA", # "sortOrder": -96, "status": 0, "step": 0, "totalCheckIns": 0, "type": "Boolean", "unit": "Count" } def getTime(self,day=0,hour=0,minute=0,strTime="",template="%Y-%m-%dT%H:%M:%S.000+0000",s): if strTime: now_time = datetime.strptime(strTime, "%Y-%m-%d %H:%M:%S") hour=0;minute=0 else: if not day and not hour and not minute: return None hour = int(hour) if day: hour += int(day)*24 now_time = datetime.now() hour += 8 utc_time = now_time - timedelta(hours=hour,minutes=minute) # UTC只是比北京时间提前了8个小时 utc_time = utc_time.strftime(template) # 转换成Aliyun要求的传参格式... return utc_time def print(self,data,s): print(json.dumps(data,indent=4,ensure_ascii=False)) class Ticktick_api( Params ): """docstring for Ticktick_api""" def __init__(self, base_url="https://api.dida365.com" ): super(Ticktick_api, self).__init__() self.base_url = base_url self.requests = session() self.local = Local("ticktick_api.pkl") # print(self.headers) self.headers = self.headers self.requests.headers["user-agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36" self.requests.headers["x-device"] = "web_app, Chrome 83.0.4103.116, 3700, 这有一串字符 你自己去找, web_app" self.requests.headers["referer"] = "https://dida365.com/webapp/" cookies = { "t": "你的cookie 一直都不会改变", } for cookie in cookies: self.requests.cookies.set(cookie,cookies[cookie]) self.user = self.status() self.projectId = self.user["inboxId"] self.projects = {} self.getProjects() print("Welcome ",self.user["username"]) def prePare(self,url,method="GET",headers="",s): # if not self.requests.headers.get(":authority","") or self.requests.headers[":authority"] not in url: self.requests.mount(url, HTTP20Adapter()) urlParse = urllib.parse.urlparse(url) self.requests.headers.update( { ":authority": urlParse.hostname, ":method": method , ":scheme": urlParse.scheme , ":path": urlParse.path+"?"+urlParse.query , "cache-control": "no-cache", "pragma": "no-cache" } ) if method=="POST" : self.requests.headers.update({"content-type":"application/json;charset=UTF-8"}) else: if "content-type" in self.requests.headers: del self.requests.headers["content-type"] self.requests.headers.update({"accept":"application/json, text/plain, */*"}) if headers: self.requests.headers.update( self.headers.get(headers,{}) ) def subTask(self,tasks=[],status=0,s): if not tasks: return [] items = [];index=0 for task in tasks: index+=1 items.append( {"id":self.id(),"status":status,"title":task,"sortOrder":index } ) return items def reminders(self,schedule=[],s): if not schedule: return [] items = [] for task in schedule: items.append( {"id":self.id(),"trigger": "TRIGGER:%s"%task} ) return items def login(self,s): pass def changeStatus(self,items=[],status=1 ,s): for item in items: item.update({"status":status}) data = { "add": [], "update": items, "delete": [] } return self.submit( data ) def add(self,params={},project="",s): task = self.task() task.update( params ) if project: task["projectId"] = self.getProjects(key=project,default="") if task.get("content") and task["items"]: task["desc"] += " "+task["content"] ; task["content"]="" elif task.get("desc") and not task["items"]: task["content"] += " "+task["desc"] ; task["desc"] = "" data = { "add": [task], "update": [], "delete": [] } # print(task) return self.submit( data ) def delete(self,tasks=[],item="",s): # task = {"taskId":"5f12bcdaa3503f747e78a4d8","projectId":"5f09032f715c121b752a9db6"} if item not in self.projects: return self.projects for task in tasks: task["taskId"] = task["id"] task["projectId"] = self.getProjects(key=item,default=task["id"]) data = { "add": [], "update": [], "delete": tasks } return self.submit( data ) def update(self,params={},s): # 使用已有id进行修改 task = self.task() task.update( params ) data = { "add": [], "update": [task], "delete": [] } return self.submit( data ) def move(self,s): url = "https://api.dida365.com/api/v2/batch/taskProject" data = [{"fromProjectId":"5f11096e967ee416e7b856e4","toProjectId":"5f09032f715c121b752a9db6","sortOrder":-08,"taskId":"5f12bcdaa3503f747e78a4d8"}] def submit(self,data,s): url = "https://api.dida365.com/api/v2/batch/task" self.prePare(url,"POST",headers="main") res = self.requests.post(url,data=json.dumps(data)) print(res.text) return res.text def search(self,params={},s): if not params: return [] query = urllib.parse.urlencode(params) url = "https://api.dida365.com/api/v2/search/task?"+query print("搜索",params) self.prePare(url,headers="main") res = self.requests.get(url) result = res.json() # for i in result: # print(i,"\n") return result def getRemote(self,status=0,s): url = "https://api.dida365.com/api/v2/batch/check/%s"%status self.prePare(url,headers="main") # print(url) res = self.requests.get(url) result = res.json().get("syncTaskBean",{}).get("update",{}) # print(json.dumps(result,indent=4,ensure_ascii=False)) return result def search_by_project(self,project,key="",day=0,s): projectId = self.getProjects(key=project) result = self.getRemote() targetItems = [] # print(result) # print(key,key in str(result),type(result),len(result)) if key and key in str(result ): # print(key) for item in result: content = "%s %s %s"%(item["title"],item.get("content",""),item.get("desc","")) print(projectId==item.get("projectId"), content,1) if projectId==item.get("projectId"): if key in str(item) : item["taskId"] = item["id"] targetItems.append( item ) print("%s %s %s"%(item["title"],item.get("content",""),item.get("desc",""))) # elif not key: return result return targetItems # print(json.dumps(result,indent=4,ensure_ascii=False)) def search_by_key(self,key="",append_dict={},arg): result = self.getRemote(1) targetItems = [] if key and key in str(result): for item in result: if key in str(item) : item["taskId"] = item["id"] if append_dict: item.update( append_dict ) targetItems.append( item ) elif not key: return result return targetItems # print(json.dumps(result,indent=4,ensure_ascii=False)) def getProjects(self,update=False,key="",default="",s): if self.projects: if key and key in self.projects: return self.projects.get(key) elif not update: return default url = "https://api.dida365.com/api/v2/projects" self.prePare(url,headers="main") res = self.requests.get(url) projects = res.json() self.projects = {} for project in projects: self.projects[project["name"]] = project["id"] self.projects[project["id"]] = project["name"] print(self.projects) return self.projects def status(self,s): url = "https://api.dida365.com/api/v2/user/status" self.prePare(url,headers="main") res = self.requests.get(url).json() if "username" not in res : return {} result = { res.get("username"):"t=%s"%self.requests.cookies.get("t") } self.local.add(result) return res def check(self,username,s): return username==self.user.username def getRecent(self,day=1,s): # url = "https://api.dida365.com/api/v2/project/all/trash/pagination?start=0&limit=1000&status=1" url = "https://api.dida365.com/api/v2/project/all/completedInAll/" params = { "from": self.getTime(day,template="%Y-%m-%d%%20%H:%M:%S"), "to": self.getTime(template="%Y-%m-%d%%20%H:%M:%S") , "limit": "50", } url += "?"+urllib.parse.urlencode(params) self.prePare(url,headers="main") res = self.requests.get(url) result = res.json() print(json.dumps(result,indent=4,ensure_ascii=False)) return res.json() def habits(self,s): url = "https://api.dida365.com/api/v2/habits/batch" data = {"add":[self.habit()],"update":[],"delete":[]} print(data) self.prePare(url,"POST",headers="main") res = self.requests.post(url,data=json.dumps(data)) print(res.text) class Ticktick( Ticktick_api ): """docstring for Ticktick""" def __init__(self): super(Ticktick, self).__init__() def addTask(self,data,s): # self.add( {"items": items,"status":0,"title":"子任务来了","priority":3,"startDate":"2020-07-19T14:00:00.000+0000","dueDate":"2020-07-19T15:00:00.000+0000","tags":tags } ) self.add(data) def complete_by_key(self,key="",project="",s): if project in self.projects: self.getProjects(update=True) items = self.search_by_project(project=project,key=key) else: items = self.search_by_key(project=project,key=key) if items: self.changeStatus(items,1) def redo_by_key(self,key="",project="",s): if not key: return items = self.search_by_key(project=project,key=key) if items: self.changeStatus(items,0) def delete_by_key(self,key="",project="",s): if project in self.projects: self.getProjects(update=True) items = self.search_by_project(project=project,key=key) else: items = self.search_by_key(project=project,key=key) data = {"add": [],"update": [],"delete": items } if items: return self.submit( data ) def update_by_key(self,key,update={},s): if not key: return items = self.search_by_key(key=key,append_dict=update) data = {"add": [],"update": items,"delete": [] } # print(items) return self.submit( data ) if __name__ == '__main__': TaskList = Ticktick() # 增 # items = TaskList.subTask(["吃饭","喝水","冲冲冲","睡觉","学习","考试"]) # reminders = TaskList.reminders(["-PT11M","-PT15M","-PT33M","-PT41M","-PT61M"]) # TaskList.add( {"reminders":reminders,"items":items,"title":"我的日常","content":"生活只剩下刚需","startDate":TaskList.getTime(hour=-6),"dueDate":TaskList.getTime(hour=-16)},project="学习助手" ) # # 删 # TaskList.delete_by_key(key="我的") # # 改 res = TaskList.update_by_key("背单词",data = { "remaind":["PT0S","-PT30M","-PT13H40M","-PT10H"], "title": "数据结构" , "content": "数据结构" , "project":"学习" , "priority":"1" }) print(res) # TaskList.update_by_key("批量",{"title":"重新修改 支持批量","desc":"相当拉风","content":"没毛病","items":TaskList.subTask(["吃饭","睡觉","学习","喝水","冲冲冲","考试"])}) # TaskList.complete_by_key("批量","洗发膏") # TaskList.redo_by_key("批量","洗发膏") # 查 result = TaskList.search_by_key(key = "学习强国") # TaskList.print(result)
讯享网

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/49819.html