import pandas as pd
import numpy as np
from scipy.stats import norm
import random
from tqdm.notebook import tqdm
n = norm.pdf
N = norm.cdf
def update_epsilon(e,min_e,decay):
if e >= min_e:
e *= decay
return e
# Define number of paths being generated for training and testing
TRAINING_SAMPLE = 10000000
TESTING_SAMPLE = 100000
# Number of trading period
M = 10
# Number of possible positions in heading
N_POSITION = 11
# Action
N_ACTION = 11
# MARKET_IMPACT (X% per share)
MARKET_IMPACT = 0.05
# Define variables for reinforcement learning training
# min epsilon
MIN_e = 0.05
# intial epsilon:
e = 1
# decay
DECAY = 0.99999999
# parameters for Q table update
C1 = 1
C2 = 0.1
ALPHA = 0.01
#Number of States
BID_ASK_STATE = 11
# Uniformly sample bid-ask spread from 0.5% to 1.5% in 10 incremtnal
bid_ask_table = (np.random.randint(-5,6,[TRAINING_SAMPLE,M]) * 10 + 100)/100
print("\n Bid-Ask spread training samples have been generated")
# Convert bid-ask spread state into 0 to 10. With 0 equal to bid-ask spread = 0.5% and 10 equal to bid-ask spread = 1.5%
bid_ask_state_table = (bid_ask_table*100 - 100)/10 + 5
bid_ask_state_table = bid_ask_state_table.astype(int)
# Initiate Q Table with Q-value = 0 for all state-action pairs
q = np.zeros((M,BID_ASK_STATE,N_POSITION,N_ACTION))
print("\n Training is in progress:")
# Initiate Q Table
q = np.zeros((M,BID_ASK_STATE,N_POSITION,N_ACTION))
# Training
for i in tqdm(range(len(bid_ask_table))):
bid_ask = bid_ask_table[i]
bid_ask_state = bid_ask_state_table[i]
position = 10
position_list = np.empty(0, dtype=int)
reward_list = np.empty(0, dtype=int)
action_list = np.empty(0, dtype=int)
total_reward = np.empty(0, dtype=int)
for t in range(M-1):
if np.random.rand() <= e:
action = random.randrange(0,N_ACTION)
else:
if t == 0:
lookup_position = 10
else:
lookup_position = position_list[t-1]
try:
action = np.where(q[t][bid_ask_state[t]][lookup_position]==np.min(q[t][bid_ask_state[t]][lookup_position][np.nonzero(q[t][bid_ask_state[t]][lookup_position])]))[0][0]
except:
action = random.randrange(0,N_ACTION)
if (position - action < 0) | (t == M-1):
action = position
new_position = 0
else:
new_position = position - action
reward = (bid_ask[t] + MARKET_IMPACT*action) * action
position_list= np.append(position_list,new_position)
action_list = np.append(action_list,action)
position = new_position
reward_list = np.append(reward_list,reward)
position_list = np.append(position_list,0)
action_list = np.append(action_list,0)
total_reward = np.append(total_reward,sum(reward_list))
#update Q table
for t in range(M-1):
if t == 0:
lookup_position = 10
else:
lookup_position = position_list[t-1]
qnew = reward_list[t] + q[t+1][bid_ask_state[t+1]][position_list[t]][action_list[t+1]]
q[t][bid_ask_state[t]][lookup_position][action_list[t]] = q[t][bid_ask_state[t]][lookup_position][action_list[t]] + ALPHA * (qnew - q[t][bid_ask_state[t]][lookup_position][action_list[t]])
if e > MIN_e:
e = e * DECAY
# Testing
# Generate test data
bid_ask_table_test = (np.random.randint(-5,6,[TESTING_SAMPLE,M]) * 10 + 100)/100
print("\n Bid-Ask spread test samples have been generated")
print("\n Test Result is in progress:")
total_rl_reward = np.empty(0, dtype=int)
bid_ask_state_table_test = (bid_ask_table_test * 100 - 100)/10 + 5
bid_ask_state_table_test = bid_ask_state_table_test.astype(int)
for i in tqdm(range(len(bid_ask_table_test))):
bid_ask = bid_ask_table_test[i]
bid_ask_state = bid_ask_state_table_test[i]
position = 10
position_list = np.empty(0, dtype=int)
reward_list = np.empty(0, dtype=int)
action_list = np.empty(0, dtype=int)
for t in range(M-1):
if t == 0:
lookup_position = 10
else:
lookup_position = position_list[t-1]
try:
action = np.where(q[t][bid_ask_state[t]][lookup_position]==np.min(q[t][bid_ask_state[t]][lookup_position][np.nonzero(q[t][bid_ask_state[t]][lookup_position])]))[0][0]
except:
action = random.randrange(0,N_ACTION)
if (position - action < 0) | (t == M-1):
action = position
new_position = 0
else:
new_position = position - action
reward = (bid_ask[t] + MARKET_IMPACT*action) * action
position_list= np.append(position_list,new_position)
action_list = np.append(action_list,action)
position = new_position
reward_list = np.append(reward_list,reward)
position_list = np.append(position_list,0)
action_list = np.append(action_list,0)
total_rl_reward = np.append(total_rl_reward,sum(reward_list))
print('RL: Average bid-ask spread paid(%):', np.average(total_rl_reward)/10)
print('RL: Standard Deviation of bid-ask spread paid(%):', np.std(total_rl_reward)/10)
print('Linear Execution: Average bid-ask spread paid(%):', np.average(np.sum(bid_ask_table_test+MARKET_IMPACT,axis = 1))/10)
print('Linear Execution: Standard Deviation of bid-ask spread paid(%):', np.std(np.sum(bid_ask_table_test+MARKET_IMPACT,axis = 1))/10)
table_index = ['Bid-Ask Spread = ' + str((10 * i + 50)/100) + "%" for i in range(11)]
table_columns = ['Holding = 10']
decision_table_t0 = pd.DataFrame(index = table_index, columns = table_columns)
t = 0
for i in range(11):
decision_table_t0.iloc[i,0] = np.where(q[t][i][10]==np.min(q[t][i][10][np.nonzero(q[t][i][10])]))[0][0]
decision_table_t0
t = 1
table_index = ['Bid-Ask Spread = ' + str((10 * i + 50)/100) + "%" for i in range(11)]
table_columns = ['Holding = ' + str(i+1) for i in range(10)]
decision_table = pd.DataFrame(index = table_index, columns = table_columns)
for p in range(10):
for i in range(11):
decision_table.iloc[i,p] = np.where(q[t][i][p+1]==np.min(q[t][i][p+1][np.nonzero(q[t][i][p+1])]))[0][0]
decision_table