Rollapply for big data using sparklyr

I want to estimate the moving risk value for a data set of about 22.5 million cases, so I want to use sparklyr for quick calculation. Here is what I did (using an example database):

library(PerformanceAnalytics) library(reshape2) library(dplyr) data(managers) data <- zerofill(managers) data<-as.data.frame(data) class(data) data$date=row.names(data) lmanagers<-melt(data, id.vars=c('date')) 

Now I am evaluating VaR with dplyr and PerformanceAnalytics packages:

 library(zoo) # for rollapply() var <- lmanagers %>% group_by(variable) %>% arrange(variable,date) %>% mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T)) 

It works great. Now I am doing this to use sparklyr:

 library(sparklyr) sc <- spark_connect(master = "local") lmanagers_sp <- copy_to(sc,lmanagers) src_tbls(sc) var_sp <- lmanagers_sp %>% group_by(variable) %>% arrange(variable,date) %>% mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T)) %>% collect 

But this gives the following error:

 Error: Unknown input type: pairlist 

Can someone tell me where the error is and what is the correct code? Or any other solution for evaluating the VaR rolling speed is also appreciated.

+5
source share
2 answers

For custom dplyr backends, such as sparklyr , mutate does not currently support arbitrary R functions defined in other packages; therefore rollapply() is not currently supported.

To calculate the risk value in sparklyr , one approach is to extend sparklyr with Scala and R and follow an approach similar to: Assess financial risk with Apache Spark .

+4
source

Let me break your question down into two tasks:

  • how to perform a sliding self- a.manager_id = b.manager_id and a.date < b.date and b.date <= a.date + 10 (i.e. a.manager_id = b.manager_id and a.date < b.date and b.date <= a.date + 10 ) with sparklyr interface
  • how to use a custom function (i.e. VaR ) with sparklyr interface

The first task may be possible with dplyr verbs that support a limited set of window functions, including lead() and lag() . You will probably end up with something really ugly, along the lines of (lag(return,1) + lag(return,2) + lag(return,3))/(3 - is.na(lag(return,1)) - is.na(lag(return,2)) - is.na(lag(return,3)) is just a common example. (Unfortunately, conditional joins, such as date windows, are still unsupported in dplyr - this the question seems to frequently arise, for example this one .)

It would be much easier to write the first task in Direct Spark SQL (with conditional self-join described above) wrapped in DBI::dbGetQuery() .

The second task is statistical, which cannot be performed simply using dplyr or direct SQL, and it has a library dependency that sparklyr does not support, so you need to use Scala (or Python) user-defined function (UDF) to calculate VaR, for example already linked in another answer.

tl; dr The first task is done using sparklyr (but using SQL, not dplyr ). The second task requires external UDF, after which you can invoke() through sparklyr .

0
source

Source: https://habr.com/ru/post/1271472/


All Articles