# ************** Cookbook / Practical Examples / Python ************** # ************** Copyright 2019 Deephaven Data Labs, LLC ************** # This notebook is part of the Deephaven Query Cookbook. For more information, # please refer to the Deephaven Documentation Portal at # https://docs.deephaven.io/latest/Content/quickReference/cookbook/cookbook.htm # Downsampling Trades trades = db.t("LearnIris", "StockTrades")\ .where("Date=`2017-08-25`")\ .view("Sym", "Last", "Size", "ExchangeTimestamp") tradesByMin = trades.updateView("MinuteTimeBin=lowerBin(ExchangeTimestamp, MINUTE)") downsample = tradesByMin.firstBy("Sym", "MinuteTimeBin") # Price Volatility summaries = db.t("LearnIris", "EODTrades").where("ImportDate=`2017-11-01`") returns = summaries.updateView("Return=(Ticker=Ticker_[i-1]) ? log(Close/Close_[i-1]) : NULL_DOUBLE") std = returns.view("Ticker", "Std=Return").stdBy("Ticker") trading_days = 252 annualization = trading_days ** 0.5 vol = std.updateView("Volatility=annualization*Std") # Returns trades = db.t("LearnIris", "StockTrades")\ .where("Date=`2017-08-25`")\ .view("Sym", "Last", "ExchangeTimestamp") tradesDayBefore = db.t("LearnIris", "StockTrades")\ .where("Date=`2017-08-24`")\ .view("Sym", "Last", "ExchangeTimestamp") currentDayLast = trades.lastBy("Sym") dayBeforeLast = tradesDayBefore.lastBy("Sym") dayDifference = currentDayLast.exactJoin(dayBeforeLast, "Sym", "YesterdayPrice=Last")\ .updateView("Change=Last-YesterdayPrice") dayReturns = dayDifference.updateView("SimpleReturn=Change/YesterdayPrice", "LogReturn=log(Last/YesterdayPrice)")\ .formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)") tradesHourBefore = trades.updateView("HourBefore=ExchangeTimestamp - HOUR") priceHourBefore = tradesHourBefore.aj(tradesHourBefore, "Sym, HourBefore=ExchangeTimestamp", "PriceHourBefore=Last") removeEmpty = priceHourBefore.where("!isNull(PriceHourBefore)") hourDifference = removeEmpty.updateView("Change=Last-PriceHourBefore", "SimpleReturn=Change/PriceHourBefore", "LogReturn=log(Last/PriceHourBefore)")\ .formatColumns("SimpleReturn=Decimal(`0.000%`)", "LogReturn=Decimal(`0.000%`)") # Pair Trading Analysis def rollingAvg(rows, values): calculations = jpy.array('double', values.size()) # create an array of doubles sum = 0 n = 0 for i in range(values.size()): if (i < rows): n += 1 # n increments with i until n = rows sum += values.get(i) # add each value to sum if (i >= rows): sum -= values.get(i - rows) # subtract when needed avg = sum/n # get updated average every iteration calculations[i] = avg # store running average return calculations # return an array of rolling averages def rollingStd(rows, values): # Can we use numpy and convolution here instead? # some other kind of vectorization? calculations = jpy.array('double', values.size()) # create an array of doubles sum2 = 0 sum = 0 n = 0 for i in range(values.size()): if (i < rows): n += 1 # n increments with i until n=rows val = values.get(i) sum += val sum2 += val*val if i >= rows: # subtract when needed val = values.get(i-rows) sum -= val sum2 -= val*val variance = sum2/n - (sum*sum/(n*n)) calculations[i] = variance**0.5 if variance > 0 else 0 # account for numerical precision near 0 return calculations import illumon.iris.ComboAggregateFactory as caf pair = jpy.array('java.lang.String', 2) pair[0] = "MSFT" pair[1] = "GOOG" latestDate = "2017-08-21" trades = db.t("LearnIris", "StockTrades")\ .where("Date>=latestDate", "Sym in pair")\ .view("Date", "Sym", "Last") lastPrices = trades.by(caf.AggCombo(caf.AggLast("Last")), "Date", "Sym") sym0 = lastPrices.where("Sym=pair[0]")\ .renameColumns("SymA=Sym", "PriceA=Last") sym1 = lastPrices.where("Sym=pair[1]")\ .renameColumns("SymB=Sym", "PriceB=Last") sideBySide = sym0.naturalJoin(sym1, "Date") returns = sideBySide.updateView("ReturnA=log(PriceA/PriceA_[i-1])", "ReturnB=log(PriceB/PriceB_[i-1])")\ .where("!isNull(ReturnA) && !isNull(ReturnB)") calculations1 = returns.updateView("SquareA=ReturnA*ReturnA", "SquareB=ReturnB*ReturnB", "Product=ReturnA*ReturnB") calculations2 = calculations1.by(caf.AggCombo(caf.AggCount("N"), caf.AggSum("ReturnA", "ReturnB", "SquareA", "SquareB", "Product"))) correlation = calculations2.view("Correlation=(N * (Product - (ReturnA * ReturnB))) / sqrt((N * SquareA - pow(ReturnA, 2)) * (N * SquareB - pow(ReturnB, 2)))")\ .formatColumns("Correlation=Decimal(`0.000%`)") priceRatio = sideBySide.updateView("PriceRatio=PriceA/PriceB") rollingCalc = priceRatio.by()\ .updateView("RatioAvg=(double[])rollingAvg.call(20, PriceRatio)", "RatioStd=(double[])rollingStd.call(20, PriceRatio)")\ .ungroup() zScore = rollingCalc.updateView("Zscore=(PriceRatio-RatioAvg)/RatioStd", "UpperThreshold=RatioAvg+2*RatioStd", "LowerThreshold=RatioAvg-2*RatioStd") dataFinal = zScore.updateView("Date=convertDateTime(Date+`T17:30 NY`)") # Mean Reversion Simulation def rollingAvg(rows, values): calculations = jpy.array('double', values.size()) # create an array of doubles sum = 0 n = 0 for i in range(values.size()): if i < rows: n += 1 # n increments with i until n = rows sum += values.get(i) # add each value to sum if i >= rows: sum -= values.get(i - rows) # subtract when needed avg = sum/n # get updated average every iteration calculations[i] = avg # store running average return calculations # return an array of rolling averages def rollingStd(rows, values): # Can we use numpy and convolution here instead? # some other kind of vectorization? calculations = jpy.array('double', values.size()) # create an array of doubles sum2 = 0 sum = 0 n = 0 for i in range(values.size()): if i < rows: n += 1 # n increments with i until n=rows val = values.get(i) sum += val sum2 += val*val if i >= rows: # subtract when needed val = values.get(i-rows) sum -= val sum2 -= val*val variance = sum2/n - (sum*sum/(n*n)) calculations[i] = variance**0.5 if variance > 0 else 0 # account for numerical precision near 0 return calculations trades = db.t("LearnIris", "StockTrades")\ .where("Date=`2017-08-25`")\ .view("Sym", "Last", "Size", "ExchangeTimestamp") trades30min = trades.updateView("TimeBin=lowerBin(ExchangeTimestamp, 30*MINUTE)")\ .firstBy("Sym", "TimeBin") rollingCalc = trades30min.by("Sym")\ .update("Avg=(double[])rollingAvg.call(30, Last)", "Std=(double[])rollingStd.call(30, Last)")\ .ungroup() minEdge = 0.5 maxPos = 3.0 liquidity = 1e6 targetPos = rollingCalc.updateView("Zscore= Std > 0 ? (Avg-Last)/Std : NULL_DOUBLE", "AdjZscore=signum(Zscore) * min(maxPos, max(abs(Zscore)-minEdge), 0.0)", "TargetPosition=(int)(liquidity*AdjZscore/Last)")\ .dropColumns("ExchangeTimestamp", "Avg", "Std", "Zscore", "AdjZscore") timeBinIndexes = targetPos.leftJoin(trades30min, "Sym", "Times=ExchangeTimestamp, SharesTraded=Size")\ .updateView("StartIndex=binSearchIndex(Times, TimeBin-30*MINUTE, BS_LOWEST)", "EndIndex=binSearchIndex(Times, TimeBin, BS_HIGHEST)")\ .dropColumns("Times") shares30min = timeBinIndexes.updateView("SharesTraded30Min=sum(SharesTraded.subArray(StartIndex, EndIndex))")\ .dropColumns("SharesTraded", "StartIndex", "EndIndex") from math import copysign class SimulatorState: def __init__(self): self.hm = {} def __call__(self, sym, targetPos, shares10s): if sym not in self.hm: self.hm[sym] = [0.0, ] * 2 tradedAndPosition = self.hm[sym] tradedAndPosition[0] = 0.0 if (targetPos is None) else copysign(1, targetPos - tradedAndPosition[1]) * min(abs(targetPos - tradedAndPosition[1]), shares10s * 0.1) tradedAndPosition[1] += tradedAndPosition[0] return jpy.array('double', list(tradedAndPosition)) ss = SimulatorState() simulation = shares30min.update("Values=(double[])ss.call(Sym, TargetPosition, SharesTraded30Min)", "PositionChange=Values[0]", "Position=Values[1]")\ .dropColumns("Values")