Practical Examples
In addition to the code snippets included below, all of the Groovy and Python code for the recipes below can be downloaded as premade Deephaven Notebooks. To learn more about using Deephaven Notebooks, please refer to the Notebook section of the User Manual.
Download the Query Cookbook Notebooks
Downsampling Trades
Step 1: Pull trades for a certain date and remove some columns.
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-25`")
.view("Sym", "Last", "Size", "ExchangeTimestamp")
Step 2: Decide the value for downsampling and create a new column to hold those values
To downsample our trades, we need to decide on the time intervals to which we want to downsample. For this example, we will specify one minute.
tradesByMin = trades.updateView("MinuteTimeBin=lowerBin(ExchangeTimestamp, MINUTE)")
We haven't downsampled yet. We've simply added a column to bin the time, which is the closest whole minute (rounding down).
Step 3: Downsample trades based on the closest minute after the trade is completed.
downsample = tradesByMin.firstBy("Sym", "MinuteTimeBin")
This query keeps the first trade for each symbol that occurred every minute and discards any other trades for that symbol during that same minute. The table now shows only one trade per minute per symbol.
Note: This recipes downsamples trades
to the first trade of each minute by using lowerBin
and firstBy()
. However, one could downsample to the last trade each minute by using upperBin
and lastBy().
Complete Code Blocks for the Downsampling Trades Recipe
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-25`")
.view("Sym", "Last", "Size", "ExchangeTimestamp")
tradesByMin = trades.updateView("MinuteTimeBin=lowerBin(ExchangeTimestamp, MINUTE)")
downsample = tradesByMin.firstBy("Sym", "MinuteTimeBin")
# Initial import
from deephaven import *
print("Provides:\n"
"\tdeephaven.Calendars as cals\n"
"\tdeephaven.ComboAggregateFactory as caf\n"
"\tdeephaven.DBTimeUtils as dbtu\n"
"\tdeephaven.Plot as plt\n"
"\tdeephaven.Plot.figure_wrapper as figw\n"
"\tdeephaven.QueryScope as qs\n"
"\tdeephaven.TableManagementTools as tmt\n"
"\tdeephaven.TableTools as ttools\n"
"See print(sorted(dir())) for the full namespace contents.")
# Downsampling Trades
trades = db.t("LearnDeephaven", "StockTrades")\
.where("Date=`2017-08-25`")\
.view("Sym", "Last", "Size", "ExchangeTimestamp")
tradesByMin = trades.updateView("MinuteTimeBin=lowerBin(ExchangeTimestamp, MINUTE)")
downsample = tradesByMin.firstBy("Sym", "MinuteTimeBin")
Price Volatility
Step 1: Pull day-by-day trade summaries for each symbol over a year.
summaries = db.t("LearnDeephaven", "EODTrades").where("ImportDate=`2017-11-01`")
Step 2: Calculate daily return for rows with the same ticker value.
returns = summaries.updateView("Return=(Ticker=Ticker_[i-1]) ? log(Close/Close_[i-1]) : NULL_DOUBLE")
Step 3: Calculate standard deviation of returns for each symbol.
std = returns.view("Ticker", "Std=Return").stdBy("Ticker")
Step 4: Create a constant for the annualization factor.
trading_days = 252
annualization = sqrt(trading_days)
Step 5: Calculate volatility.
vol = std.updateView("Volatility=annualization*Std")
Complete Code Blocks for the Price Volatility Recipe
summaries = db.t("LearnDeephaven", "EODTrades").where("ImportDate=`2017-11-01`")
returns = summaries.updateView("Return=(Ticker=Ticker_[i-1]) ? log(Close/Close_[i-1]) : NULL_DOUBLE")
std = returns.view("Ticker", "Std=Return").stdBy("Ticker")
trading_days = 252
annualization = sqrt(trading_days)
vol = std.updateView("Volatility=annualization*Std")
# Initial import
from deephaven import *
# Price Volatility
summaries = db.t("LearnDeephaven", "EODTrades").where("ImportDate=`2017-11-01`")
returns = summaries.updateView("Return=(Ticker=Ticker_[i-1]) ? log(Close/Close_[i-1]) : NULL_DOUBLE")
std = returns.view("Ticker", "Std=Return").stdBy("Ticker")
trading_days = 252
annualization = trading_days ** 0.5
vol = std.updateView("Volatility=annualization*Std")
Returns
Step 1: Pull trades for a certain date and remove some columns.
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-25`")
.view("Sym", "Last", "ExchangeTimestamp")
Step 2: Pull trades for the day before and remove some columns.
tradesDayBefore = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-24`")
.view("Sym", "Last", "ExchangeTimestamp")
Step 3: Get the last trade for each symbol on both days.
currentDayLast = trades.lastBy("Sym")
dayBeforeLast = tradesDayBefore.lastBy("Sym")
Step 4: Join the last trade price from the day before to the current and subtract their prices.
dayDifference = currentDayLast.exactJoin(dayBeforeLast, "Sym", "YesterdayPrice=Last")
.updateView("Change=Last-YesterdayPrice")
Step 5: Calculate returns from the day before and format them to percentages.
dayReturns = dayDifference.updateView("SimpleReturn=Change/YesterdayPrice", "LogReturn=log(Last/YesterdayPrice)")
.formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)")
Next, we need to calculate the hourly returns. This is accomplished over the next three steps.
Step 6: Add a column that holds time values one hour before the ExchangeTimestamp
column.
tradesHourBefore = trades.updateView("HourBefore=ExchangeTimestamp - HOUR")
Step 7: For each row under a given symbol, join the most recent price by the modified time.
priceHourBefore = tradesHourBefore.aj(tradesHourBefore, "Sym, HourBefore=ExchangeTimestamp", "PriceHourBefore=Last")
Step 8: To get a better look at the result, let's get rid of of any rows with empty cells.
Note: The exclamation point changes the meaning of a method. For example, in the previous query, rather than checking if PriceHourBefore
is null, you're checking if PriceHourBefore
is not null.
removeEmpty = priceHourBefore.where("!isNull(PriceHourBefore)")
Step 9: Calculate hourly returns.
hourDifference = removeEmpty.updateView("Change=Last-PriceHourBefore", "SimpleReturn=Change/PriceHourBefore", "LogReturn=log(Last/PriceHourBefore)")
.formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)")
Complete Code Blocks for the Returns Recipe
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-25`")
.view("Sym", "Last", "ExchangeTimestamp")
tradesDayBefore = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-24`")
.view("Sym", "Last", "ExchangeTimestamp")
currentDayLast = trades.lastBy("Sym")
dayBeforeLast = tradesDayBefore.lastBy("Sym")
dayDifference = currentDayLast.exactJoin(dayBeforeLast, "Sym", "YesterdayPrice=Last")
.updateView("Change=Last-YesterdayPrice")
dayReturns = dayDifference.updateView("SimpleReturn=Change/YesterdayPrice", "LogReturn=log(Last/YesterdayPrice)")
.formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)")
tradesHourBefore = trades.updateView("HourBefore=ExchangeTimestamp - HOUR")
priceHourBefore = tradesHourBefore.aj(tradesHourBefore, "Sym, HourBefore=ExchangeTimestamp", "PriceHourBefore=Last")
removeEmpty = priceHourBefore.where("!isNull(PriceHourBefore)")
hourDifference = removeEmpty.updateView("Change=Last-PriceHourBefore", "SimpleReturn=Change/PriceHourBefore", "LogReturn=log(Last/PriceHourBefore)")
.formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)")
# Initial import
from deephaven import *
# Returns
trades = db.t("LearnDeephaven", "StockTrades")\
.where("Date=`2017-08-25`")\
.view("Sym", "Last", "ExchangeTimestamp")
tradesDayBefore = db.t("LearnDeephaven", "StockTrades")\
.where("Date=`2017-08-24`")\
.view("Sym", "Last", "ExchangeTimestamp")
currentDayLast = trades.lastBy("Sym")
dayBeforeLast = tradesDayBefore.lastBy("Sym")
dayDifference = currentDayLast.exactJoin(dayBeforeLast, "Sym", "YesterdayPrice=Last")\
.updateView("Change=Last-YesterdayPrice")
dayReturns = dayDifference.updateView("SimpleReturn=Change/YesterdayPrice", "LogReturn=log(Last/YesterdayPrice)")\
.formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)")
tradesHourBefore = trades.updateView("HourBefore=ExchangeTimestamp - HOUR")
priceHourBefore = tradesHourBefore.aj(tradesHourBefore, "Sym, HourBefore=ExchangeTimestamp", "PriceHourBefore=Last")
removeEmpty = priceHourBefore.where("!isNull(PriceHourBefore)")
hourDifference = removeEmpty.updateView("Change=Last-PriceHourBefore", "SimpleReturn=Change/PriceHourBefore", "LogReturn=log(Last/PriceHourBefore)")\
.formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)")
Pair Trading Analysis
Step 1: Write custom code to calculate rolling averages and standard deviations.
Custom code will be needed later to calculate rolling averages and standard deviations. Let's write these functions first.
rollingAvg = { rows, values ->
calculations = new double[values.size()]
sum = 0
n = 0
for (long i = 0; i < values.size(); ++i)
{
if (i < rows) n++ //n increments with i until n=rows
sum += values.get(i) //add each value to sum
if (i >= rows) sum -= values.get((long)(i - rows)) //subtract when needed
calculations[(int)i] = sum/n //store running average
}
return calculations //return an array of rolling averages
}
rollingStd = { rows, values ->
// NOTE: variance(X) is most efficiently calculated as E(X^2) - E(X)^2
calculations = new double[values.size()]
n = 0
sum = 0
sum2 = 0
for (long i = 0; i < values.size(); ++i)
{
if (i < rows) n++ //n increments with i until n=rows
val = values.get(i)
sum2 += val*val
sum += val
if (i >= rows){
val = values.get((long)(i - rows))
sum -= val //subtract when needed
sum2 -= val*val
}
variance = sum2/n - (sum*sum)/(n*n)
calculations[(int)i] = (variance > 0) ? Math.sqrt(variance): 0 // account for numerical imprecision near 0
}
return calculations
}
Step 2: Start the query script to select two symbols to compare and the latest date to use in the analysis.
pair = ["MSFT", "GOOG"]
latestDate = "2017-08-21"
Step 3: Pull all trades from the latest date onwards for the specified symbols and reduce the table.
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date>=latestDate", "Sym in pair")
.view("Date", "Sym", "Last")
Step 4: Get the last trade price for each day and each symbol.
lastPrices = trades.by(AggCombo(AggLast("Last")), "Date", "Sym")
Step 5: Create two tables, each containing trades for a single symbol and rename their columns.
sym0 = lastPrices.where("Sym=pair[0]")
.renameColumns("SymA=Sym", "PriceA=Last")
sym1 = lastPrices.where("Sym=pair[1]")
.renameColumns("SymB=Sym", "PriceB=Last")
Step 6: Join these tables together so their prices can be side-by-side.
sideBySide = sym0.naturalJoin(sym1, "Date")
Step 7: Calculate daily returns and drop the row without a return.
This will show us how correlated the symbols' returns are before we analyze the pair's prices.
returns = sideBySide.updateView("ReturnA=log(PriceA/PriceA_[i-1])", "ReturnB=log(PriceB/PriceB_[i-1])")
.where("!isNull(ReturnA) && !isNull(ReturnB)")
Step 8: Calculate the square and product for each row.
calculations1 = returns.updateView("SquareA=ReturnA*ReturnA", "SquareB=ReturnB*ReturnB", "Product=ReturnA*ReturnB")
Step 9: Calculate the count and the sum of returns, squares, and products.
calculations2 = calculations1.by(AggCombo(AggCount("N"), AggSum("ReturnA", "ReturnB", "SquareA", "SquareB", "Product")))
All the information needed to calculate the correlation is now available.
Step 10: Calculate the correlation.
correlation = calculations2.view("Correlation=((N * Product) - (ReturnA * ReturnB)) / sqrt((N * SquareA - pow(ReturnA, 2)) * (N * SquareB - pow(ReturnB, 2)))")
.formatColumns("Correlation=Decimal(`0.000%`)")
Step 11: Return to the pair analysis. Calculate the price ratio from the sideBySide table.
priceRatio = sideBySide.updateView("PriceRatio=PriceA/PriceB")
Step 12: Calculate rolling average and standard deviation using our custom functions.
rollingCalc = priceRatio.by()
.updateView("RatioAvg=(double[])rollingAvg.call(20, PriceRatio)", "RatioStd=(double[])rollingStd.call(20, PriceRatio)")
.ungroup()
Step 13: Calculate z-score as well as values two z-scores above and under average.
zScore = rollingCalc.updateView("Zscore=(RatioStd != 0) ? (PriceRatio-RatioAvg)/RatioStd: NULL_DOUBLE",
"UpperThreshold=RatioAvg+2*RatioStd", "LowerThreshold=RatioAvg-2*RatioStd")
Step 14: To create time plots for visualizing the data, we first need to convert our dates to the proper time format.
dataFinal = zScore.updateView("Date=convertDateTime(Date+`T17:30 NY`)")
Step 15: Create a dual time plot for the symbols' prices.
pricePlot = plot(pair[0], dataFinal, "Date", "PriceA")
.twinX().plot(pair[1], dataFinal, "Date", "PriceB").show()
Step 16: Finally, create a time plot for our price ratio, average, and upper/lower thresholds.
ratioPlot = plot("Ratio", dataFinal, "Date", "PriceRatio")
.plot("Average", dataFinal, "Date", "RatioAvg")
.plot("Upper", dataFinal, "Date", "UpperThreshold")
.lineStyle(lineStyle([4,4]))
.plot("Lower", dataFinal, "Date", "LowerThreshold")
.lineStyle(lineStyle([4,4]))
.show()
Complete Code Blocks for the Pairs Trading Analysis Recipe
rollingAvg = { rows, values ->
calculations = new double[values.size()]
sum = 0
n = 0
for (long i = 0; i < values.size(); ++i)
{
if (i < rows) n++ //n increments with i until n=rows
sum += values.get(i) //add each value to sum
if (i >= rows) sum -= values.get((long)(i - rows)) //subtract when needed
calculations[(int)i] = sum/n //store running average
}
return calculations //return an array of rolling averages
}
rollingStd = { rows, values ->
// NOTE: variance(X) is most efficiently calculated as E(X^2) - E(X)^2
calculations = new double[values.size()]
n = 0
sum = 0
sum2 = 0
for (long i = 0; i < values.size(); ++i)
{
if (i < rows) n++ //n increments with i until n=rows
val = values.get(i)
sum2 += val*val
sum += val
if (i >= rows){
val = values.get((long)(i - rows))
sum -= val //subtract when needed
sum2 -= val*val
}
variance = sum2/n - (sum*sum)/(n*n)
calculations[(int)i] = (variance > 0) ? Math.sqrt(variance): 0 // account for numerical imprecision near 0
}
return calculations
}
pair = ["MSFT", "GOOG"]
latestDate = "2017-08-21"
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date>=latestDate", "Sym in pair")
.view("Date", "Sym", "Last")
lastPrices = trades.by(AggCombo(AggLast("Last")), "Date", "Sym")
sym0 = lastPrices.where("Sym=pair[0]")
.renameColumns("SymA=Sym", "PriceA=Last")
sym1 = lastPrices.where("Sym=pair[1]")
.renameColumns("SymB=Sym", "PriceB=Last")
sideBySide = sym0.naturalJoin(sym1, "Date")
returns = sideBySide.updateView("ReturnA=log(PriceA/PriceA_[i-1])", "ReturnB=log(PriceB/PriceB_[i-1])")
.where("!isNull(ReturnA) && !isNull(ReturnB)")
calculations1 = returns.updateView("SquareA=ReturnA*ReturnA", "SquareB=ReturnB*ReturnB", "Product=ReturnA*ReturnB")
calculations2 = calculations1.by(AggCombo(AggCount("N"), AggSum("ReturnA", "ReturnB", "SquareA", "SquareB", "Product")))
correlation = calculations2.view("Correlation=((N * Product) - (ReturnA * ReturnB)) / sqrt((N * SquareA - pow(ReturnA, 2)) * (N * SquareB - pow(ReturnB, 2)))")
.formatColumns("Correlation=Decimal(`0.000%`)")
priceRatio = sideBySide.updateView("PriceRatio=PriceA/PriceB")
rollingCalc = priceRatio.by()
.updateView("RatioAvg=(double[])rollingAvg.call(20, PriceRatio)", "RatioStd=(double[])rollingStd.call(20, PriceRatio)")
.ungroup()
zScore = rollingCalc.updateView("Zscore=(RatioStd != 0) ? (PriceRatio-RatioAvg)/RatioStd: NULL_DOUBLE",
"UpperThreshold=RatioAvg+2*RatioStd", "LowerThreshold=RatioAvg-2*RatioStd")
dataFinal = zScore.updateView("Date=convertDateTime(Date+`T17:30 NY`)")
pricePlot = plot(pair[0], dataFinal, "Date", "PriceA")
.twinX()
.plot(pair[1], dataFinal, "Date", "PriceB")
.show()
ratioPlot = plot("Ratio", dataFinal, "Date", "PriceRatio")
.plot("Average", dataFinal, "Date", "RatioAvg")
.plot("Upper", dataFinal, "Date", "UpperThreshold")
.lineStyle(lineStyle([4,4]))
.plot("Lower", dataFinal, "Date", "LowerThreshold")
.lineStyle(lineStyle([4,4]))
.show()
# Initial import
from deephaven import *
# Pair Trading Analysis
def rollingAvg(rows, values):
calculations = jpy.array('double', values.size()) # create an array of doubles
sum = 0
n = 0
for i in range(values.size()):
if (i < rows): n += 1 # n increments with i until n = rows
sum += values.get(i) # add each value to sum
if (i >= rows): sum -= values.get(i - rows) # subtract when needed
avg = sum/n # get updated average every iteration
calculations[i] = avg # store running average
return calculations # return an array of rolling averages
def rollingStd(rows, values):
# Can we use numpy and convolution here instead?
# some other kind of vectorization?
calculations = jpy.array('double', values.size()) # create an array of doubles
sum2 = 0
sum = 0
n = 0
for i in range(values.size()):
if (i < rows): n += 1 # n increments with i until n=rows
val = values.get(i)
sum += val
sum2 += val*val
if i >= rows:
# subtract when needed
val = values.get(i-rows)
sum -= val
sum2 -= val*val
variance = sum2/n - (sum*sum/(n*n))
calculations[i] = variance**0.5 if variance > 0 else 0 # account for numerical precision near 0
return calculations
pair = jpy.array('java.lang.String', 2)
pair[0] = "MSFT"
pair[1] = "GOOG"
latestDate = "2017-08-21"
trades = db.t("LearnDeephaven", "StockTrades")\
.where("Date>=latestDate", "Sym in pair")\
.view("Date", "Sym", "Last")
lastPrices = trades.by(caf.AggCombo(caf.AggLast("Last")), "Date", "Sym")
sym0 = lastPrices.where("Sym=pair[0]")\
.renameColumns("SymA=Sym", "PriceA=Last")
sym1 = lastPrices.where("Sym=pair[1]")\
.renameColumns("SymB=Sym", "PriceB=Last")
sideBySide = sym0.naturalJoin(sym1, "Date")
returns = sideBySide.updateView("ReturnA=log(PriceA/PriceA_[i-1])", "ReturnB=log(PriceB/PriceB_[i-1])")\
.where("!isNull(ReturnA) && !isNull(ReturnB)")
calculations1 = returns.updateView("SquareA=ReturnA*ReturnA", "SquareB=ReturnB*ReturnB", "Product=ReturnA*ReturnB")
calculations2 = calculations1.by(caf.AggCombo(caf.AggCount("N"), caf.AggSum("ReturnA", "ReturnB", "SquareA", "SquareB", "Product")))
correlation = calculations2.view("Correlation=(N * (Product - (ReturnA * ReturnB))) / sqrt((N * SquareA - pow(ReturnA, 2)) * (N * SquareB - pow(ReturnB, 2)))")\
.formatColumns("Correlation=Decimal(`0.000%`)")
priceRatio = sideBySide.updateView("PriceRatio=PriceA/PriceB")
rollingCalc = priceRatio.by()\
.updateView("RatioAvg=(double[])rollingAvg.call(20, PriceRatio)", "RatioStd=(double[])rollingStd.call(20, PriceRatio)")\
.ungroup()
zScore = rollingCalc.updateView("Zscore=(PriceRatio-RatioAvg)/RatioStd", "UpperThreshold=RatioAvg+2*RatioStd", "LowerThreshold=RatioAvg-2*RatioStd")
dataFinal = zScore.updateView("Date=convertDateTime(Date+`T17:30 NY`)")
Mean Reversion Simulation
Step 1: Write functions to calculate rolling averages and standard deviations.
Custom code will be needed later to calculate rolling averages and standard deviations. Let's write these functions first.
rollingAvg = { rows, values ->
calculations = new double[values.size()]
sum = 0
n = 0
for (long i = 0; i < values.size(); ++i)
{
if (i < rows) n++ //n increments with i until n=rows
sum += values.get(i) //add each value to sum
if (i >= rows) sum -= values.get((long)(i - rows)) //subtract when needed
calculations[(int)i] = sum/n //store running average
}
return calculations //return an array of rolling averages
}
rollingStd = { rows, values ->
// NOTE: variance(X) is most efficiently calculated as E(X^2) - E(X)^2
calculations = new double[values.size()]
n = 0
sum = 0
sum2 = 0
for (long i = 0; i < values.size(); ++i)
{
if (i > rows) n++ //n increments with i until n=rows
val = values.get(i)
sum2 += val*val
sum += val
if (i >= rows){
val = values.get((long)(i - rows))
sum -= val //subtract when needed
sum2 -= val*val
}
variance = sum2/n - (sum*sum)/(n*n)
calculations[(int)i] = (variance > 0) ? Math.sqrt(variance): 0 // account for numerical imprecision near 0
}
return calculations
}
Step 2: Pull trades from a certain date and reduce the table:
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-25`")
.view("Sym", "Last", "Size", "ExchangeTimestamp")
Step 3: Downsample trades to every thirty minutes:
trades30min = trades.updateView("TimeBin=lowerBin(ExchangeTimestamp, 30*MINUTE)")
.firstBy("Sym", "TimeBin")
Step 4: Calculate rolling averages and standard deviations:
rollingCalc = trades30min.by("Sym")
.update("Avg=(double[])rollingAvg.call(30, Last)", "Std=(double[])rollingStd.call(30, Last)")
.ungroup()
Step 5: Set some constants used to calculate target positions:
minEdge = 0.5d
maxPos = 3.0d
liquidity = 1e6d
Step 6: Calculate target positions:
targetPos = rollingCalc.updateView("Zscore=(Std > 0) ? (Avg-Last)/Std : NULL_DOUBLE", "AdjZscore=signum(Zscore) * min(maxPos, max(abs(Zscore)-minEdge), 0.0)", "TargetPosition=(int)(liquidity*AdjZscore/Last)")
.dropColumns("ExchangeTimestamp", "Avg", "Std", "Zscore", "AdjZscore")
Step 7: Calculate start and end indexes so we can later calculate shares traded every thirty minutes:
timeBinIndexes = targetPos.leftJoin(trades30min, "Sym", "Times=ExchangeTimestamp, SharesTraded=Size")
.updateView("StartIndex=binSearchIndex(Times, TimeBin-30*MINUTE, BS_LOWEST)", "EndIndex=binSearchIndex(Times, TimeBin, BS_HIGHEST)")
.dropColumns("Times")
Step 8: Calculate the amount of shares traded every thirty minutes:
shares30min = timeBinIndexes.updateView("SharesTraded30Min=sum(SharesTraded.subArray(StartIndex, EndIndex))")
.dropColumns("SharesTraded", "StartIndex", "EndIndex")
Step 9: Create a class that simulates trade actions and positions for each row of every symbol:
class SimulatorState
{
private HashMap<String, double[]> hm = new HashMap<>()
double[] update(String sym, int targetPos, int shares10s){
if (!hm.containsKey(sym)) hm.put(sym, new double[2])
double[] tradedAndPosition = hm.get(sym)
tradedAndPosition[0] = isNull(targetPos) ? 0.0 : signum(targetPos - tradedAndPosition[1]) * min(abs(targetPos - tradedAndPosition[1]), shares10s * 0.1d)
tradedAndPosition[1] += tradedAndPosition[0]
return Arrays.copyOf(tradedAndPosition, tradedAndPosition.length)
}
}
ss = new SimulatorState()
Step 10: Utilize class to add columns for trade actions and positions:
simulation = shares30min.update("Values=(double[])ss.update(Sym, TargetPosition, SharesTraded30Min)", "PositionChange=Values[0]", "Position=Values[1]")
.dropColumns("Values")
Complete Code Blocks for the Mean Reversion Simulation Recipe
rollingAvg = { rows, values ->
calculations = new double[values.size()]
sum = 0
n = 0
for (long i = 0; i < values.size(); ++i)
{
if (i < rows) n++ //n increments with i until n=rows
sum += values.get(i) //add each value to sum
if (i >= rows) sum -= values.get((long)(i - rows)) //subtract when needed
calculations[(int)i] = sum/n //store running average
}
return calculations //return an array of rolling averages
}
rollingStd = { rows, values ->
// NOTE: variance(X) is most efficiently calculated as E(X^2) - E(X)^2
calculations = new double[values.size()]
n = 0
sum = 0
sum2 = 0
for (long i = 0; i < values.size(); ++i)
{
if (i > rows) n++ //n increments with i until n=rows
val = values.get(i)
sum2 += val*val
sum += val
if (i >= rows){
val = values.get((long)(i - rows))
sum -= val //subtract when needed
sum2 -= val*val
}
variance = sum2/n - (sum*sum)/(n*n)
calculations[(int)i] = (variance > 0) ? Math.sqrt(variance): 0 // account for numerical imprecision near 0
}
return calculations
}
trades = db.t("LearnDeephaven", "StockTrades")
.where("Date=`2017-08-25`")
.view("Sym", "Last", "Size", "ExchangeTimestamp")
trades30min = trades.updateView("TimeBin=lowerBin(ExchangeTimestamp, 30*MINUTE)")
.firstBy("Sym", "TimeBin")
rollingCalc = trades30min.by("Sym")
.update("Avg=(double[])rollingAvg.call(30, Last)","Std=(double[])rollingStd.call(30, Last)")
.ungroup()
minEdge = 0.5d
maxPos = 3.0d
liquidity = 1e6d
targetPos = rollingCalc.updateView("Zscore=(Std > 0) ? (Avg-Last)/Std : NULL_DOUBLE", "AdjZscore=signum(Zscore) * min(maxPos, max(abs(Zscore)-minEdge), 0.0)", "TargetPosition=(int)(liquidity*AdjZscore/Last)")
.dropColumns("ExchangeTimestamp", "Avg", "Std", "Zscore", "AdjZscore")
timeBinIndexes = targetPos.leftJoin(trades30min, "Sym", "Times=ExchangeTimestamp, SharesTraded=Size")
.updateView("StartIndex=binSearchIndex(Times, TimeBin-30*MINUTE, BS_LOWEST)", "EndIndex=binSearchIndex(Times, TimeBin, BS_HIGHEST)")
.dropColumns("Times")
shares30min = timeBinIndexes.updateView("SharesTraded30Min=sum(SharesTraded.subArray(StartIndex, EndIndex))")
.dropColumns("SharesTraded", "StartIndex", "EndIndex")
class SimulatorState
{
private HashMap<String, double[]> hm = new HashMap<>()
double[] update(String sym, int targetPos, int shares10s){
if (!hm.containsKey(sym)) hm.put(sym, new double[2])
double[] tradedAndPosition = hm.get(sym)
tradedAndPosition[0] = isNull(targetPos) ? 0.0 : signum(targetPos - tradedAndPosition[1]) * min(abs(targetPos - tradedAndPosition[1]), shares10s * 0.1d)
tradedAndPosition[1] += tradedAndPosition[0]
return Arrays.copyOf(tradedAndPosition, tradedAndPosition.length)
}
}
ss = new SimulatorState()
simulation = shares30min.update("Values=(double[])ss.update(Sym, TargetPosition, SharesTraded30Min)", "PositionChange=Values[0]", "Position=Values[1]")
.dropColumns("Values")
# Initial import
from deephaven import *
# Mean Reversion Simulation
def rollingAvg(rows, values):
calculations = jpy.array('double', values.size()) # create an array of doubles
sum = 0
n = 0
for i in range(values.size()):
if i < rows:
n += 1 # n increments with i until n = rows
sum += values.get(i) # add each value to sum
if i >= rows:
sum -= values.get(i - rows) # subtract when needed
avg = sum/n # get updated average every iteration
calculations[i] = avg # store running average
return calculations # return an array of rolling averages
def rollingStd(rows, values):
# Can we use numpy and convolution here instead?
# some other kind of vectorization?
calculations = jpy.array('double', values.size()) # create an array of doubles
sum2 = 0
sum = 0
n = 0
for i in range(values.size()):
if i < rows:
n += 1 # n increments with i until n=rows
val = values.get(i)
sum += val
sum2 += val*val
if i >= rows:
# subtract when needed
val = values.get(i-rows)
sum -= val
sum2 -= val*val
variance = sum2/n - (sum*sum/(n*n))
calculations[i] = variance**0.5 if variance > 0 else 0 # account for numerical precision near 0
return calculations
trades = db.t("LearnDeephaven", "StockTrades")\
.where("Date=`2017-08-25`")\
.view("Sym", "Last", "Size", "ExchangeTimestamp")
trades30min = trades.updateView("TimeBin=lowerBin(ExchangeTimestamp, 30*MINUTE)")\
.firstBy("Sym", "TimeBin")
rollingCalc = trades30min.by("Sym")\
.update("Avg=(double[])rollingAvg.call(30, Last)", "Std=(double[])rollingStd.call(30, Last)")\
.ungroup()
minEdge = 0.5
maxPos = 3.0
liquidity = 1e6
targetPos = rollingCalc.updateView("Zscore= Std > 0 ? (Avg-Last)/Std : NULL_DOUBLE",
"AdjZscore=signum(Zscore) * min(maxPos, max(abs(Zscore)-minEdge), 0.0)",
"TargetPosition=(int)(liquidity*AdjZscore/Last)")\
.dropColumns("ExchangeTimestamp", "Avg", "Std", "Zscore", "AdjZscore")
timeBinIndexes = targetPos.leftJoin(trades30min, "Sym", "Times=ExchangeTimestamp, SharesTraded=Size")\
.updateView("StartIndex=binSearchIndex(Times, TimeBin-30*MINUTE, BS_LOWEST)",
"EndIndex=binSearchIndex(Times, TimeBin, BS_HIGHEST)")\
.dropColumns("Times")
shares30min = timeBinIndexes.updateView("SharesTraded30Min=sum(SharesTraded.subArray(StartIndex, EndIndex))")\
.dropColumns("SharesTraded", "StartIndex", "EndIndex")
from math import copysign
class SimulatorState:
def __init__(self):
self.hm = {}
def __call__(self, sym, targetPos, shares10s):
if sym not in self.hm:
self.hm[sym] = [0.0, ] * 2
tradedAndPosition = self.hm[sym]
tradedAndPosition[0] = 0.0 if (targetPos is None) else copysign(1, targetPos - tradedAndPosition[1]) * min(abs(targetPos - tradedAndPosition[1]), shares10s * 0.1)
tradedAndPosition[1] += tradedAndPosition[0]
return jpy.array('double', list(tradedAndPosition))
ss = SimulatorState()
simulation = shares30min.update("Values=(double[])ss.call(Sym, TargetPosition, SharesTraded30Min)",
"PositionChange=Values[0]", "Position=Values[1]")\
.dropColumns("Values")
Last Updated: 28 February 2020 12:20 -05:00 UTC Deephaven v.1.20200121 (See other versions)
Deephaven Documentation Copyright 2016-2020 Deephaven Data Labs, LLC All Rights Reserved