R: Quickly perform operations on subsets of a data frame and then re-aggregate the result without an internal function

We have a very large data frame dfthat can be divided into factors. On each subset of the data frame created by this section, we need to perform an operation to increase the number of rows of this subset until it is determined length. Subsequently, we are rbindsubsets to get a larger version df.

Is there a way to do this quickly without using an internal function?

Say our subset operation (in a separate .R file):

foo <- function(df) { magic }

We came up with several ways to do this:

1)

df <- split(df, factor)
df <- lapply(df, foo)
rbindlist(df)

2)

assign('list.df', list(), envir=.GlobalEnv) 
assign('i', 1, envir=.GlobalEnv)

dplyr::group_by(df, factor)
dplyr::mutate(df, foo.list(df.col))
df <- rbindlist(list.df)
rm('list.df', envir=.GlobalEnv)
rm('i', envir=.GlobalEnv)

(In a separate file)
foo.list <- function(df.cols) {
    magic; 
    list.df[[i]] <<- magic.df
    i <<- i + 1
    return(dummy)
}

The problem with the first approach is time. Simple is just too long to really be desirable (about an hour with our data set).

- . , , .

, substitute ( , R ).

SO- (R, , , R: Pass by reference ..), .

, :

 x <- runif(n=10, min=0, max=3)
 y <- sample(x=10, replace=FALSE)
 factors <- runif(n=10, min=0, max=2)
 factors <- floor(factors)
 df <- data.frame(factors, x, y)

df ( 10): Original df

 ## We group by factor, then run foo on the groups.

 foo <- function(df.subset) {
   min <- min(df.subset$y)
   max <- max(df.subset$y)

   ## We fill out df.subset to have everything between the min and
   ## max values of y. Then we assign the old values of df.subset
   ## to the corresponding spots.

   df.fill <- data.frame(x=rep(0, max-min+1),
                         y=min:max,
                         factors=rep(df.subset$factors[1], max-min+1))
   df.fill$x[which(df.subset$y %in%(min:max))] <- df.subset$x
   df.fill
 }

df ( 18): New df

+4
4

data.table, - . , . split-apply-comb :

library(data.table)
system.time(
df2 <- setDT(df)[,foo(df), factors]
)
#   user  system elapsed 
#   1.63    0.39    2.03
+5

.
, , R-.
Rserve/RSclient, -CRAN-.

R

R localhost. Rserve .

# start R nodes
library(Rserve)
port = 6311:6312
invisible(sapply(port, function(port) Rserve(debug = FALSE, port = port, args = c("--no-save"))))

# populate data
set.seed(123)
x = runif(n=5e6,min=0, max=3)
y = sample(x=5e6,replace=FALSE)
factors = runif(n=5e6, min=0, max=2)
factors = floor(factors)
df = data.frame(factors, x, y)

# connect Rserve nodes
library(RSclient)
rscl = sapply(port, function(port) RS.connect(port = port))

# assign chunks to R nodes
sapply(seq_along(rscl), function(i) RS.assign(rscl[[i]], name = "x", value = df[df$factors == (i-1),]))

# assign magic function to R nodes
foo = function(df) df
sapply(rscl, RS.assign, name = "foo", value = foo)

( wait=FALSE RS.collect), .


lapply + RS.eval

# sequentially
l = lapply(rscl, RS.eval, foo(x))
rbindlist(l)

# parallely
invisible(sapply(rscl, RS.eval, foo(x), wait=FALSE))
l = lapply(rscl, RS.collect)
rbindlist(l)

big.data.table::rscl.*

big.data.table RSclient::RS.*, R.
data.table - , data.frame, vector R, . data.frame.

library(big.data.table)

# sequentially
l = rscl.eval(rscl, foo(x), simplify=FALSE)
rbindlist(l)

# parallely
invisible(rscl.eval(rscl, foo(x), wait=FALSE))
l = rscl.collect(rscl, simplify=FALSE)
rbindlist(l)

big.data.table

, data.tables, api .

library(big.data.table)
rscl.require(rscl, "data.table")
rscl.eval(rscl, is.data.table(setDT(x))) # is.data.table to suppress collection of `setDT` results

bdt = big.data.table(rscl = rscl)
# parallely by default
bdt[, foo(.SD), factors]
# considering we have data partitioned using `factors` field, the `by` is redundant in that case
bdt[, foo(.SD)]
# optionally use `[[` to access R nodes environment directly
bdt[[expr = foo(x)]]

# disconnect
rscl.close(rscl)

# shutdown nodes started from R
l = lapply(setNames(nm = port), function(port) tryCatch(RSconnect(port = port), error = function(e) e, warning = function(w) w))
invisible(lapply(l, function(rsc) if(inherits(rsc, "sockconn")) RSshutdown(rsc)))
+2

Another change using data.table. First get the part min(y):max(y), and then attach + update:

require(data.table)
ans = setDT(df)[, .(x=0, y=min(y):max(y)), by=factors
              ][df, x := i.x, on=c("factors", "y")][]
ans
#     factors          x  y
#  1:       0 1.25104362  1
#  2:       0 0.16729068  2
#  3:       0 0.00000000  3
#  4:       0 0.02533907  4
#  5:       0 0.00000000  5
#  6:       0 0.00000000  6
#  7:       0 1.80547980  7
#  8:       1 0.34043937  3
#  9:       1 0.00000000  4
# 10:       1 1.51742163  5
# 11:       1 0.15709287  6
# 12:       1 0.00000000  7
# 13:       1 1.26282241  8
# 14:       1 2.88292354  9
# 15:       1 1.78573288 10
+2
source

I do not think that your function works as intended. He relies on ywhich is ordered.

Try using a data.table connection with a grouping:

library(data.table)
setDT(df)
df2 <- df[, .SD[data.table(y=seq(.SD[, min(y)], .SD[, max(y)], by = 1)), .SD, 
                  on = "y"], #data.table join
                    by = factors] #grouping
df2[is.na(x), x:= 0]
setkey(df2, factors, y, x)
+1
source

Source: https://habr.com/ru/post/1626365/


All Articles