.
, , R-.
Rserve/RSclient, -CRAN-.
R
R localhost. Rserve .
library(Rserve)
port = 6311:6312
invisible(sapply(port, function(port) Rserve(debug = FALSE, port = port, args = c("--no-save"))))
# populate data
set.seed(123)
x = runif(n=5e6,min=0, max=3)
y = sample(x=5e6,replace=FALSE)
factors = runif(n=5e6, min=0, max=2)
factors = floor(factors)
df = data.frame(factors, x, y)
# connect Rserve nodes
library(RSclient)
rscl = sapply(port, function(port) RS.connect(port = port))
# assign chunks to R nodes
sapply(seq_along(rscl), function(i) RS.assign(rscl[[i]], name = "x", value = df[df$factors == (i-1),]))
# assign magic function to R nodes
foo = function(df) df
sapply(rscl, RS.assign, name = "foo", value = foo)
( wait=FALSE RS.collect), .
lapply + RS.eval
l = lapply(rscl, RS.eval, foo(x))
rbindlist(l)
invisible(sapply(rscl, RS.eval, foo(x), wait=FALSE))
l = lapply(rscl, RS.collect)
rbindlist(l)
big.data.table::rscl.*
big.data.table RSclient::RS.*, R.
data.table - , data.frame, vector R, . data.frame.
library(big.data.table)
l = rscl.eval(rscl, foo(x), simplify=FALSE)
rbindlist(l)
invisible(rscl.eval(rscl, foo(x), wait=FALSE))
l = rscl.collect(rscl, simplify=FALSE)
rbindlist(l)
big.data.table
, data.tables, api .
library(big.data.table)
rscl.require(rscl, "data.table")
rscl.eval(rscl, is.data.table(setDT(x)))
bdt = big.data.table(rscl = rscl)
bdt[, foo(.SD), factors]
bdt[, foo(.SD)]
bdt[[expr = foo(x)]]
rscl.close(rscl)
l = lapply(setNames(nm = port), function(port) tryCatch(RSconnect(port = port), error = function(e) e, warning = function(w) w))
invisible(lapply(l, function(rsc) if(inherits(rsc, "sockconn")) RSshutdown(rsc)))