中债5年期到期收益率在数据库107端口,具体长成:
# -- 中债
# select * from biz_factor_cal_result
# where factor_id='';
# factor_value={"中债国债到期收益率":"2.4859"}
# factor_name='中债国债到期收益率'
# factor_cal_time='5.0'
# quote_time='2021-01-19'
FROO7到期收益率在数据库107端口是:
# -- fr007
# select * from biz_factor_cal_result
# where factor_id='';
# factor_value={"FR007利率互换_5Y":"2.524"}
# factor_name='FR007利率互换_5Y'
# factor_cal_time='5Y'
# quote_time='2023-08-01'
自己写的版本(能跑):
每次是一天一天获取行情数据处理,并非一次性获取完再处理,另外数据库里面的数据格式也有可能错误,字符型和数字型很混乱。
下载winSCP查看后台日志:WinSCP_官方电脑版_51下载
登陆上去主机名和密码测试链接,所有日志都在这里看到:

也可以在查看:
import json import random import traceback import numpy as np import pandas as pd from datetime import datetime, timedelta from data.logger import logger from factor.conf.status_codes import FACTOR_DEPLOY_STATUS from factor.data.db_handler_plat import query_factor_cal_result_by_quotetime, query_factor_cal_result,query_factor_cal_result_temp from factor.data.kslib_util import getYieldCurveInstance, getYieldRateByYear class FactorResult: """因子计算结果类""" def __init__(self): # 初始化 self.value = None # 因子值,取值范围[-99999, 99999] self.position = None # 持仓方向,应取'L', 'S', 'N'(分别代表多,空,不动)中的一个 self.x1 = None # 时间1,2,3,应为datetime或字符串类型 self.y1 = None # 数值1,2,3 self.x2 = None self.y2 = None self.x3 = None self.y3 = None self.message = None # 消息,长度限制100个字符 def __str__(self): # 属性名和属性值格式化为字符串,然后将它们添加到一个列表中 """打印因子计算结果对象查看参数""" from collections.abc import Collection lines = [f"{self.__class__.__name__:^40}", '=' * 40] # 创建一个包含两个元素的列表,第一个是字符串,总宽度为40个字符,#第二个是40个=的分隔线。 for key, value in vars(self).items(): # 遍历一个对象(self)的所有属性 if isinstance(value, float): value_str = f"{value:.6f}" elif isinstance(value, np.ndarray): value_str = f"{value.shape} array" elif isinstance(value, pd.DataFrame): value_str = f"{value.shape} DataFrame" elif isinstance(value, Collection) and not isinstance(value, str): # 是个集合且非字符串类型,将其格式化为长度(len)和类名的字符串。 value_str = f"len {len(value)} {value.__class__.__name__}" else: value_str = str(value) value_len_limit = 38 - len(key) if len(value_str) > value_len_limit: left = (value_len_limit - 2) // 2 # 整数除法,它返回商的整数部分 right = (value_len_limit - 3) // 2 value_str = value_str[:left] + '...' + value_str[-right:] lines.append(f"{key}: {' ' * (value_len_limit - len(value_str))}{value_str}") # 形如 key: value return '\n'.join(lines) # 将所有格式化后的字符串通过换行符连接起来 class FactorCompute: """因子计算""" def __init__(self, quote_time: datetime, job_time: str = None, symbol: str = None, exchange: str = None, quotation_type: str = None): """入参中调度时间为datetime类型""" self.quote_time = quote_time.strftime("%Y-%m-%d") # 行情时间,对应公布时间, factor_cal_time对应的是期限时间 self.factorA_id = "" # A因子ID——中债 self.factorB_id = "" # B因子ID——fr007 self.res = FactorResult() # 计算结果对象,返回格式化为字符串的属性名和属性值 def main(self) -> FactorResult: #定义一个方法main,它的返回类型是FactorResult,->用于指定函数或方法的返回值类型。 a = {"quote_time":[],"factor_cal_time": [], "factor_value": []} b = {"quote_time":[],"factor_cal_time": [], "factor_value": []} yield_data_a= {"quote_time":[],"factor_cal_time": [], "factor_value": []} # 行情日期,因子期限,因子值 yield_data_b= {"quote_time":[],"factor_cal_time": [], "factor_value": []} # 行情日期,因子期限,因子值 # 每次只获取一天的行情,所以不用加判断同一天的逻辑,d = date_list[i]逻辑不用 yield_a = None yield_b = None #每次获取、接收一个日期的数据,每次遍历后重新调用该class类 yield_data_a = self.query_factorA_yield(self.factorA_id, self.quote_time) # yield_data_a 和 yield_data_b 是字典列表 yield_data_b = self.query_factorA_yield(self.factorB_id, self.quote_time) print("中债:",yield_data_a)#在后台打印日志 print("FR007:",yield_data_b) data_list_a = yield_data_a data_list_b = yield_data_b for item in data_list_a: print("中债每个元素",item) quote_time1 = pd.to_datetime(item["quote_time"], format="%Y-%m-%d") print("中债每个元素的期限",item["factor_cal_time"]) if item["factor_cal_time"] == 5.0 or item["factor_cal_time"] == '5.0': yield_a = item["factor_value"] # a["quote_time"].append(quote_time1) # a["factor_cal_time"].append(float(item["factor_cal_time"])) # a["factor_value"].append(item["factor_value"]) else: continue # a = pd.DataFrame(a) for item in data_list_b: print("fr007每个元素",item) yield_b = item["factor_value"] # item["factor_cal_time"] = float(str(item["factor_cal_time"]).replace('Y', '')) # quote_time2 = pd.to_datetime(item["quote_time"], format="%Y-%m-%d") # b["quote_time"].append(quote_time2) # b["factor_cal_time"].append(item["factor_cal_time"])#日期 # b["factor_value"].append(item["factor_value"]) # b = pd.DataFrame(b) print('------------------')#都是打印日志 print(yield_a) print(yield_b) print('------------------') #能同时获取到值,不然获取不到值为None,类型为Nonetype,无法计算差值,转换成浮点型,不确定是字符型 if yield_a and yield_b: print('1') self.res.value = float(yield_a)-float(yield_b) else: self.res.value = None # 计算a和b同一天的利差 return self.res # date_list = sorted(list(set(a["quote_time"]).intersection(set((b["quote_time"]))))) # 取b和a的交集日期 # # date_list_str = [date.strftime("%Y-%m-%d %H:%M:%S") for date in date_list] # for i in range(len(date_list)): # d = date_list[i] # print("") # ind_a = a.loc[a['quote_time'] == d].index.item() # ind_b = b.loc[b['quote_time'] == d].index.item() # yield_a = a["factor_value"][ind_a] # yield_b = b["factor_value"][ind_b] # cal_time = float(a["factor_cal_time"][ind_a]) # res = yield_a - yield_b # # self.res.results.append({ # # "factor_value": res, # # "factor_cal_time": cal_time, # # "quote_time": d # # }) # if yield_a and yield_b: # self.res.value = yield_a - yield_b # else: # self.res.value = None # -- 中债 # select * from biz_factor_cal_result # where factor_id=''; # factor_value={"中债国债到期收益率":"2.4859"} # factor_name='中债国债到期收益率' # factor_cal_time='5.0' # quote_time='2021-01-19' # -- fr007 # select * from biz_factor_cal_result # where factor_id=''; # factor_value={"FR007利率互换_5Y":"2.524"} # factor_name='FR007利率互换_5Y' # factor_cal_time='5Y' # quote_time='2023-08-01' # 查询因子日终收盘收益率期限收益率 def query_factorA_yield(self, factor_id: str, job_time: str): df = [] select_key = ["factor_value", "factor_cal_time"]#关键字段换成factor_cal_time,不要code cal_frequency_type = "58004" try: df = query_factor_cal_result_temp(factor_id, select_key, job_time, job_time, cal_frequency_type ) for row in df: if row["factor_value"] is not None: quote_time = row["quote_time"] factor_name = row["factor_name"] factor_cal_time = row["factor_cal_time"] factor_value = float(json.loads(row["factor_value"])[factor_name]) row["factor_value"] = factor_value except: pass # logger.error(traceback.format_exc()) # if self.status_code == 200: # self.err_msg = "数据库查询因子数据错误" # self.status_code = FACTOR_DEPLOY_STATUS["Database select error"] return df
讯享网
大佬给的代码:
获取一天行情后,指定得到一天当中==5.0的到期收益率,我的是做判断,大同小异。
为了了解出错在那里,最好是在后台打印日志方便查看哪里出现错误。
讯享网# info = BondInfo( # security_id=item["bondCode"], # 债券代码 # name=item["bondShortName"], # 债券简称 # bond_type=item["bondType"], # 债权类型 # face_value=float(item["faceValue"]), # 面值 # dated_date=datetime.strptime(item["valueDate"], '%Y%m%d'), # 当前日期 # maturity_date=datetime.strptime(item["maturityDate"], '%Y%m%d'), # 到期日期 # frequency=frequency_dict.get(item["schedPayFreq"], ""), # 付息频a率, "6" # coupon_rate_type=item["couponRateType"], # 票面利率类型, "0" # coupon=float(item["fixedRate"]), # 票面利率, 100.00 # day_count=item["basis"], # 债券计息基础, "3"(ACT-365) # term_to_maturity=item["termToMaturityString"] # 到期期限, "9.3123Y" # ) """ 因子计算模板 ---------- 入参:合约代码,行情时间 出参:因子值,持仓方向,用于画图的三个坐标,消息 """ import json import random import traceback from datetime import datetime, timedelta import numpy as np import pandas as pd from data.logger import logger from factor.conf.status_codes import FACTOR_DEPLOY_STATUS from factor.data.db_handler_plat import query_factor_cal_result_by_quotetime, query_factor_cal_result, \ query_factor_cal_result_temp from factor.data.kslib_util import getYieldCurveInstance, getYieldRateByYear class FactorResult: """因子计算结果类""" def __init__(self): self.value = None # 因子值,取值范围[-99999, 99999] self.position = None # 持仓方向,应取'L', 'S', 'N'(分别代表多,空,不动)中的一个 self.x1 = None # 时间1,应为datetime或字符串类型 self.y1 = None # 数值1 self.x2 = None # 时间2,应为datetime或字符串类型 self.y2 = None # 数值2 self.x3 = None # 时间3,应为datetime或字符串类型 self.y3 = None # 数值3 self.message = None # 消息,长度限制100个字符 def __str__(self): """打印因子计算结果对象查看参数""" from collections.abc import Collection lines = [f"{self.__class__.__name__:^40}", '=' * 40] for key, value in vars(self).items(): if isinstance(value, float): value_str = f"{value:.6f}" elif isinstance(value, np.ndarray): value_str = f"{value.shape} array" elif isinstance(value, pd.DataFrame): value_str = f"{value.shape} DataFrame" elif isinstance(value, Collection) and not isinstance(value, str): value_str = f"len {len(value)} {value.__class__.__name__}" else: value_str = str(value) value_len_limit = 38 - len(key) if len(value_str) > value_len_limit: left = (value_len_limit - 2) // 2 right = (value_len_limit - 3) // 2 value_str = value_str[:left] + '...' + value_str[-right:] lines.append(f"{key}: {' ' * (value_len_limit - len(value_str))}{value_str}") return '\n'.join(lines) class FactorCompute: def __init__(self, quote_time: datetime, symbol: str = None, exchange: str = None, quotation_type: str = None): """入参中调度时间为datetime类型""" self.quote_time = quote_time.strftime("%Y-%m-%d") # 调度时间 self.factorA_id = "" # A因子ID self.factorB_id = "" self.res = FactorResult() # 计算结果对象 self.start_time = quote_time.strftime("%Y-%m-%d") self.end_time = quote_time.strftime("%Y-%m-%d") def main(self) -> FactorResult: # a = {"factor_cal_time": [], "factor_value": []} # b = {"factor_cal_time": [], "factor_value": []} yield_a = None yield_b = None yield_data_a = self.query_factorA_yield(self.factorA_id, self.quote_time) yield_data_b = self.query_factorA_yield(self.factorB_id, self.quote_time) print("中债数据库查出的值", yield_data_a) print("FR007数据库查出的值", yield_data_b) if len(yield_data_a) == 17 and len(yield_data_b) == 1: yield_a = yield_data_a[9]["factor_value"] yield_b = yield_data_b[0]["factor_value"] print("被减数", yield_a) print("减数", yield_b) # date_list = list(set(a["factor_cal_time"]).intersection(set((b["factor_cal_time"])))) # for i in range(len(date_list)): # d = date_list[i] # ind_a = a["factor_cal_time"].index(d) # ind_b = b["factor_cal_time"].index(d) # yield_a = a["factor_value"][ind_a] # yield_b = b["factor_value"][ind_b] # res = yield_a - yield_b # # 4.保存 # self.res.value.append({ # "variety_code": ".IB&.IB", # "factor_value": res, # "udf_value": d, # "job_time": self.quote_time # }) if yield_a and yield_b: self.res.value = yield_a - yield_b else: self.res.value = None return self.res # 查询因子A中的因子值 def query_factorA_yield(self, factor_id: str, job_time: str): df = [] select_key = ["factor_value", "factor_cal_time"] frequency_type = "58004" try: df = query_factor_cal_result_temp(factor_id, select_key, job_time, job_time, frequency_type ) for row in df: if row["factor_value"] is not None: quote_time = row["quote_time"] factor_name = row["factor_name"] # variety_code = row["variety_code"] factor_value = float(json.loads(row["factor_value"])[factor_name]) row["factor_value"] = factor_value except: logger.error(traceback.format_exc()) if self.status_code == 200: self.err_msg = "数据库查询因子数据错误" self.status_code = FACTOR_DEPLOY_STATUS["Database select error"] return df

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/67281.html