1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| import json import re from urllib.parse import urlparse
from mitmproxy import ctx from mitmproxy import flowfilter from mitmproxy import http
''' 生成接口python代码 '''
class GenCode(object): def __init__(self): ctx.log.info('__init__')
# 趣头条 urls = [ r'taskcenter/getListV2',#tab页:任务 r'readtimer/report', ] self.qu_tou_tiao = flowfilter.parse('|'.join(urls))
# 百度 - 全民小视频 urls = [ r'mvideo/api', # 每日签到 ] self.quan_ming = flowfilter.parse('|'.join(urls))
self.flowfilters = [ self.qu_tou_tiao, self.quan_ming, ]
def load(self, loader): ctx.log.info('event: load')
def configure(self, updated): ctx.log.info('event: configure')
def running(self): ctx.log.info('event: running')
def done(self): ctx.log.info('event: done')
def response(self, flow: http.HTTPFlow): if any( [ filter(flow) for filter in self.flowfilters ] ):
request: http.HTTPRequest = flow.request
parse_result = urlparse(request.url) url_path = parse_result.path
function_name = re.sub(r'[/-]','_', url_path).strip('_') headers_code = self.headers_string(flow) params_code = self.params_string(flow) data_code = self.data_string(flow)
path = f'''/Users/zhoujie/Desktop/api/{function_name}.text''' with open(path, 'a') as f: print(f'''# ---------------------''',file=f)
code = f''' def {function_name}(self):
{headers_code}
{params_code}
{data_code}
url = '{request.scheme}://{request.pretty_host}{url_path}' result = self._{request.method.lower()}(url, headers=headers, params=params, data=data) return result ''' f.write(code)
print(f'''Response:''',file=f) print(f'''{flow.response.text}''',file=f) print(f'''# ---------------------\n\n''',file=f)
def headers_string(self, flow: http.HTTPFlow): lines = '' for key,value in flow.request.headers.items(): lines += f"\n\t\t'{key}': '{value}'," s = f'''headers = {{{lines}\n\t}}''' return s
def params_string(self, flow: http.HTTPFlow): lines = '' for key,value in flow.request.query.items(): lines += f"\n\t\t'{key}': '{value}'," s = f'''params = {{{lines}\n\t}}''' return s
def data_string(self, flow: http.HTTPFlow): ''' Content-Type: application/x-www-form-urlencoded Content-Type: application/json; charset=utf-8 Content-Type: text/plain;charset=utf-8 ''' lines = ''
# [urlencoded_form, multipart_form, plan, json]取其一 for key,value in flow.request.urlencoded_form.items(): lines += f"\n\t\t'{key}': '{value}',"
for key,value in flow.request.multipart_form.items(): key = key.decode(encoding='utf-8') value = value.decode(encoding='utf-8') lines += f"\n\t\t'{key}': '{value}',"
# Todo:复杂json数据还不能代码化 if 'application/json' in flow.request.headers.get('content-type',''): d = json.loads(flow.request.text) for key,value in d.items(): lines += f"\n\t\t'{key}': {value}," s = f'''data = {{{lines}\n\t}}''' return s
addons = [ GenCode() ]
|