用Python选一个自己的股票池!堪比资深的炒股选手!_python形态选股

用Python选一个自己的股票池!堪比资深的炒股选手!_python形态选股1 Python 所有方向的学习路线 新版 这是我花了几天的时间去把 Python 所有方向的技术点做的整理 形成各个领域的知识点汇总 它的用处就在于 你可以按照上面的知识点去找对应的学习资源 保证自己学得较为全面 最近我才对这些路线做了一下新的更新

大家好,我是讯享网,很高兴认识大家。

(1)Python所有方向的学习路线(新版)

这是我花了几天的时间去把Python所有方向的技术点做的整理,形成各个领域的知识点汇总,它的用处就在于,你可以按照上面的知识点去找对应的学习资源,保证自己学得较为全面。

最近我才对这些路线做了一下新的更新,知识体系更全面了。

在这里插入图片描述
讯享网

(2)Python学习视频

包含了Python入门、爬虫、数据分析和web开发的学习视频,总共100多个,虽然没有那么全面,但是对于入门来说是没问题的,学完这些之后,你可以按照我上面的学习路线去网上找其他的知识资源进行进阶。

在这里插入图片描述

(3)100多个练手项目

我们在看视频学习的时候,不能光动眼动脑不动手,比较科学的学习方法是在理解之后运用它们,这时候练手项目就很适合了,只是里面的项目比较多,水平也是参差不齐,大家可以挑自己能做的项目去练练。

在这里插入图片描述

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化学习资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

 第一种方式 

讯享网

def download_by_trade_date(start_date, end_date, data_path=“by_trade_date”, worker_size=2, debug=False):
“”"
通过交易日来遍历时间范围内的数据,当交易日的个数小于股票的数量时,效率较高.

讯享网一年一般220个交易日左右,但是股票却有3800多个,那么通过交易日来下载数据就高效的多了 """ now = datetime.now() start\_time = now try: start\_date\_ = datetime.strptime(start\_date, DATE\_FORMAT) end\_date\_ = datetime.strptime(end\_date, DATE\_FORMAT) if end\_date\_ < start\_date\_: sys.exit("起始时间应该大于结束时间") if start\_date\_ > now: sys.exit("起始时间应该大于当前时间") if end\_date\_ > now: end\_date = now.strftime(DATE\_FORMAT) except Exception: traceback.print\_exc("") sys.exit("传入的start\_date\[%s\]或end\_date\[%s\]时间格式不正确, 格式应该像" % (start\_date, end\_date)) # 获取交易日历 try: trade\_cal = pro.trade\_cal(exchange="SSE", is\_open="1", start\_date=start\_date, end\_date=end\_date, fields="cal\_date") except Exception: sys.exit("获取交易日历失败") trade\_date\_lst = trade\_cal.cal\_date pool = ThreadPoolExecutor(max\_workers=worker\_size) print("准备开始获取 %s到%s 的股票数据" % (start\_date, end\_date)) def worker(trade\_date): # 用偏函数包装一下 # pro = ts.pro\_api(TS\_TOKEN) fn = partial(pro.daily, trade\_date=trade\_date) return retry(fn) # 最终保存到一个列表中 ret = defaultdict(list) # future 列表 fs\_lst = \[\] # 通过线程并发获取数据 for trade\_date in trade\_date\_lst: # print(trade\_date) # 这里不使用pool.map的原因是, map返回的future列表会乱序 # submit的位置参数不需要需要放到可迭代对象里面(一般是元组), 卧槽。。。 fs = pool.submit(worker, trade\_date) fs\_lst.append(fs) # break # \*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\* # 获取每个交易日的股票数据 # \*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\* for trade\_date, fs in zip(trade\_date\_lst, fs\_lst): if debug: print("开始获取交易日\[%s\]的数据" % trade\_date) # 如果有异常或者结果为空的话 if fs.exception() or not isinstance(fs.result(), pd.DataFrame): print(fs.exception()) sys.exit("在交易日\[%s\]超过重试最大的次数也没有获取到数据" % trade\_date) day\_df = fs.result() columns = day\_df.columns # 遍历一个交易日的所有股票数据 # print(datetime.now()) # 遍历day\_df.values 大概2ms # 2 ms ± 63.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) # 遍历day\_df.iterrows() 大概285ms # 285 ms ± 2.64 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) for row in day\_df.values: ts\_code = row\[0\] ret\[ts\_code\].append(row) # print(datetime.now()) merge\_start\_time = datetime.now() new\_ret = {} for key, value in ret.items(): new\_ret\[key\] = pd.DataFrame(value, columns=columns) merge\_end\_time = datetime.now() # 组合\[series...\] 需要142ms # 142 ms ± 1.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) # 组合\[array....\] 需要6.56ms # 6.56 ms ± 66.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) print("合并共花费时间: %s" % (merge\_end\_time - merge\_start\_time)) # \*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\* # 获取将结果保存到本地的csv文件中 # \*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\* print("数据已经获取完毕准备保存到本地") save\_to\_csv(new\_ret, data\_path=data\_path) end\_time = datetime.now() print("\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*") print("下载完成, 共花费时间%s" % (end\_time - start\_time)) print("\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*") 
 第二种方式 

def download_by_ts_code(start_date, end_date, data_path=“by_ts_code”, debug=False, worker_size=3):
“”“因为按股票代码的方式实在太慢了(如果你宽带速度比较快的话), 也就没必要多线程了”“”
now = datetime.now()
start_time = now
try:
start_date_ = datetime.strptime(start_date, DATE_FORMAT)
end_date_ = datetime.strptime(end_date, DATE_FORMAT)

讯享网 if end\_date\_ < start\_date\_: sys.exit("起始时间应该大于结束时间") if start\_date\_ > now: sys.exit("起始时间应该大于当前时间") if end\_date\_ > now: end\_date = now.strftime(DATE\_FORMAT) except Exception: traceback.print\_exc("") sys.exit("传入的start\_date\[%s\]或end\_date\[%s\]时间格式不正确, 格式应该像" % (start\_date, end\_date)) def worker(ts\_code): fn = partial(ts.pro\_bar, ts\_code=ts\_code, adj='qfq', start\_date=start\_date, end\_date=end\_date) return retry(fn) pool = ThreadPoolExecutor(max\_workers=worker\_size) print("准备开始获取 %s到%s 的股票数据" % (start\_date, end\_date)) # 不指定任何参数会获取5000条最近的数据,ts\_code会重复 day = pro.daily() # 固定顺序,set是无法包装有序的 all\_ts\_code = list(set(day.ts\_code)) fs\_lst = \[\] for ts\_code in all\_ts\_code: fs\_lst.append(pool.submit(worker, ts\_code)) # break for ts\_code, fs in zip(all\_ts\_code, fs\_lst): # 如果有异常或者结果为空的话 if fs.exception() or not isinstance(fs.result(), pd.DataFrame): print(fs.exception()) sys.exit("在交易日\[%s\]超过重试最大的次数也没有获取到数据" % trade\_date) df = fs.result() # %timeit df.sort\_index() # 192 µs ± 3.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each) # %timeit df.sort\_index(inplace=True) # 2.54 µs ± 177 ns per loop (mean ± std. dev. of 7 runs,  loops each) df.sort\_index(inplace=True, ascending=False) if not isinstance(df, pd.DataFrame): sys.exit("在股票\[%s\]超过重试最大的次数也没有获取到数据" % ts\_code) save\_to\_csv({ts\_code: df}, data\_path=data\_path) if debug: print("股票\[%s\]历史数据下载保存完成" % ts\_code) end\_time = datetime.now() print("\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*") print("下载完成, 共花费时间%s" % (end\_time - start\_time)) print("\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*") 
 我感觉我写了好多注释以及一些方法之间性能对比,关于各个方法之间的性能对比大家还是需要注意的,因为虽然平时感受不出来,但是在较多次数的循环的时候就会发现性能差异了。 至此需要的历史数据就得到了。 如果对读写速度很在意的话,存储在sqlite数据库或者其他其他数据会快很多。 股票池的选择条件 无论是主观交易还是量化交易,选股的方式个人认为大概分为以下三类。 1. 技术选股 通过技术指标比如MACD, KDJ或者K线形态等技术指标来选股。 2. 基本面选股 通过财务报表或者一些金融指标来选股。 3. 消息选股 新闻消息或者小道消息选股。 本文主要集中在以下几种方式 1. 形态选股 通过k线的形态找到股票中的十字星等形态。 2. 交易额/流通股本选股 通过对交易额或者流通股本排序选择前面的股票。 3. 相似度选股 通过选定基准股票趋势,发现相似的股票。 4. 趋势选股 挑选最近六个月股票趋势向上的股票。 形态选股 股票的形态多种多样,这里以选择股票中的黄昏(早晨)十字星为例。 黄昏十字星与早晨十字星的区别在于所处趋势不一样。 图示如下: ![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9wMS10dC5ieXRlaW1nLmNvbS9sYXJnZS9wZ2MtaW1hZ2UvYjlkZjE0ODA2NTliNDJiOWFkMzQ0YmVkNmMxODM0YmY?x-oss-process=image/format,png) 如果需要程序判断,那么这些长度需要量化,不能模棱两可。所以根据十字星的定义,量化如下: 实体长度: 超过2.5%才算长实体,而且上下影线不能超过0.3% 十字星: 实体长度不超过1.5% 趋势: 包括十字星在内往前数6根k线,其中的第1根k线收盘价均小于它后面4根收盘价为向上趋势,反之趋势向下,并且幅度超过2.5%。 这里我只是主观的量化没有任何交易经验的选择,如果不认同的话,可以自行修改代码。 代码如下: 

def select_doji():
# fp = “by_ts_code/.SH-.csv”
fp = “by_ts_code/.SZ-.csv”
df = pd.read_csv(fp, index_col=“trade_date”, parse_dates=[“trade_date”])
df = df[[“open”, “high”, “low”, “close”]]

讯享网# k线数量 k\_size = 6 # 起始幅度大小 treand\_threshold = 0.025 # 长实体最小长度 entity\_length\_threshold = 0.025 # 长实体上下最大影线长度 entity\_shadow\_line\_length\_threshold = 0.03 # 十字星实体长度最大长度 doji\_entity\_length\_threshold = 0.015 # 十字星上下影线长度最小长度 # doji\_shadow\_line\_length\_threshold = 0.005 trend\_map = {1: "向上", -1: "向下"} def up\_or\_down\_trend(row): """1代表向上, -1代表向下, 0代表震荡""" first = row\[0\] last = row\[-1\] if all(first > row\[1:\]) and first > (last \* treand\_threshold): return -1 elif all(first < row\[1:\]) and last > (first \* treand\_threshold): return 1 else: return 0 df\["trend"\] = df.close.rolling(k\_size).apply(up\_or\_down\_trend, raw=True) df.fillna(value=0, inplace=True) def k\_sharp(k): """返回k线的上下影线长度, 实体长度""" open\_, high, low, close = k\[:4\] if open\_ > close: upper\_line\_length = (high - open\_) / high lower\_line\_length = (close - low) / close entity\_length = (open\_ - close) / open\_ else: upper\_line\_length = (high - close) / high lower\_line\_length = (open\_ - low) / open\_ entity\_length = (close - open\_) / close return upper\_line\_length, lower\_line\_length, entity\_length def is\_up\_or\_down\_doji(k\_lst, trend): # open, high, low, close if len(k\_lst) != 3: sys.exit("判断十字星需要三根K线") is\_ok = False k1, k2, k3 = k\_lst # 判断是否跳空 # 通过high, close过于严格 if trend > 0: # 趋势向上时,最低点是否大于两个实体的最高价 if k2\[0\] < k1\[1\] or k2\[0\] < k3\[1\]: # if k2\[0\] < k1\[1\] : return is\_ok else: # 趋势向下时,最高点是否小于两个实体的最高价 if k2\[0\] > k1\[2\] or k2\[0\] > k3\[2\]: return is\_ok k1\_sharp = k\_sharp(k1) # print("k1 sharp") # print(k1\_sharp) # 判断是否为长实体 if (k1\_sharp\[2\] < entity\_length\_threshold or k1\_sharp\[0\] > entity\_shadow\_line\_length\_threshold or k1\_sharp\[1\] > entity\_shadow\_line\_length\_threshold): return is\_ok k3\_sharp = k\_sharp(k3) # print("k3 sharp") # print(k3\_sharp) if (k3\_sharp\[2\] < entity\_length\_threshold or k3\_sharp\[0\] > entity\_shadow\_line\_length\_threshold or k3\_sharp\[1\] > entity\_shadow\_line\_length\_threshold): return is\_ok # print("ok") # 判断是否为十字星 k2\_sharp = k\_sharp(k2) # print("k2 sharp") # print(k2\_sharp) # 实体长度不超过0.2%, 上下影线长度超过0.6%, 如果规定上下影线的长度不太好找 # if (k2\_sharp\[2\] > doji\_entity\_length\_threshold # or k2\_sharp\[0\] < doji\_shadow\_line\_length\_threshold # or k2\_sharp\[1\] < doji\_shadow\_line\_length\_threshold): if k2\_sharp\[2\] > doji\_entity\_length\_threshold: return is\_ok return True df\_values = df.values ret = \[\] for index in range(len(df\_values)): if index < k\_size: continue trend = df\_values\[index - 1\]\[-1\] 

最后

🍅 硬核资料:关注即可领取PPT模板、简历模板、行业经典书籍PDF。
🍅 技术互助:技术群大佬指点迷津,你的问题可能不是问题,求资源在群里喊一声。
🍅 面试题库:由技术群里的小伙伴们共同投稿,热乎的大厂面试真题,持续更新中。
🍅 知识体系:含编程语言、算法、大数据生态圈组件(Mysql、Hive、Spark、Flink)、数据仓库、Python、前端等等。

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化学习资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

小讯
上一篇 2025-03-30 18:57
下一篇 2025-02-05 14:16

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/58357.html