Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
high = mid - 1
if mid >= total:
return (0, 0)
start_pos = mid
low = mid
high = total - 1
while low <= high:
tmp_datetime = Datetime(data.index[high])
if end_datetime > tmp_datetime:
mid = high + 1
break
tmp_datetime = Datetime(data.index[low])
if tmp_datetime >= end_datetime:
mid = low
break
mid = (low + high) // 2
tmp_datetime = Datetime(data.index[mid])
if end_datetime > tmp_datetime:
low = mid + 1
else:
high = mid - 1
end_pos = total if mid >= total else mid
if start_pos >= end_pos:
return (0,0)
return (start_pos, end_pos)
:param int end_ix: 结束位置
:param KRecordListPtr out_buffer: 传入的数据缓存,读取数据后使用
out_buffer.append(krecord) 加入数据
"""
if start_ix >= end_ix or start_ix <0 or end_ix <0:
return
data = self._get_bars(market, code, ktype)
if len(data) < start_ix:
return
total = end_ix if end_ix < len(data) else len(data)
for i in range(start_ix, total):
record = KRecord()
record.datetime = Datetime(data[i].get('datetime'))
record.openPrice = data[i].get('open')
record.highPrice = data[i].get('high')
record.lowPrice = data[i].get('low')
record.closePrice = data[i].get('close')
record.transAmount = data[i].get('amount')
record.transCount = data[i].get('vol')
out_buffer.append(record)
"""转化为numpy结构数组"""
t_type = np.dtype({'names':['证券代码', '证券名称', '买入日期',
'已持仓天数', '持仓数量', '投入金额',
'当前市值', '盈亏金额', '盈亏比例'],
'formats':['U10','U20', 'datetime64[D]', 'i', 'i', 'd',
'd', 'd', 'd']})
sm = StockManager.instance()
query = Query(-1)
data = []
for pos in pos_list:
invest = pos.buyMoney - pos.sellMoney + pos.totalCost
k = pos.stock.getKData(query)
cur_val = k[0].closePrice * pos.number
bonus = cur_val - invest
date_list = sm.getTradingCalendar(QueryByDate(Datetime(pos.takeDatetime.date())))
data.append((pos.stock.market_code, pos.stock.name,
pos.takeDatetime, len(date_list), pos.number,
invest, cur_val, bonus,
100 * bonus / invest))
return np.array(data, dtype=t_type)
:param str market: 市场标识
:param str code: 证券代码
:param int pos: 指定位置(大于等于0)
:param KQuery.KType ktype: K线类型
"""
record = KRecord()
if pos < 0:
return record
data = self._get_bars(market, code, ktype)
if data is None:
return record
if pos < len(data):
record.datetime = Datetime(data[pos].get('datetime'))
record.openPrice = data[pos].get('open')
record.highPrice = data[pos].get('high')
record.lowPrice = data[pos].get('low')
record.closePrice = data[pos].get('close')
record.transAmount = data[pos].get('amount')
record.transCount = data[pos].get('vol')
return record
def indicator_getitem(data, i):
"""
:param i: int | Datetime | slice | str 类型
"""
if isinstance(i, int):
length = len(data)
index = length + i if i < 0 else i
if index < 0 or index >= length:
raise IndexError("index out of range: %d" % i)
return data.get(index)
elif isinstance(i, slice):
return [data.get(x) for x in range(*i.indices(len(data)))]
elif isinstance(i, Datetime):
return data.getByDate(i)
elif isinstance(i, str):
return data.getByDate(Datetime(i))
else:
raise IndexError("Error index type")
"""
if isinstance(i, int):
length = len(data)
index = length + i if i < 0 else i
if index < 0 or index >= length:
raise IndexError("index out of range: %d" % i)
return data.get(index)
elif isinstance(i, slice):
return [data.get(x) for x in range(*i.indices(len(data)))]
elif isinstance(i, Datetime):
return data.getByDate(i)
elif isinstance(i, str):
return data.getByDate(Datetime(i))
else:
raise IndexError("Error index type")
:param int end_ix: 结束位置
:param KRecordListPtr out_buffer: 传入的数据缓存,读取数据后使用
out_buffer.append(krecord) 加入数据
"""
if start_ix >= end_ix or start_ix <0 or end_ix <0:
return
data = self._get_bars(market, code, ktype)
if len(data) < start_ix:
return
total = end_ix if end_ix < len(data) else len(data)
for i in range(start_ix, total):
record = KRecord()
record.datetime = Datetime(data[i].get('datetime'))
record.openPrice = data[i].get('open')
record.highPrice = data[i].get('high')
record.lowPrice = data[i].get('low')
record.closePrice = data[i].get('close')
record.transAmount = data[i].get('amount')
record.transCount = data[i].get('vol')
out_buffer.append(record)