Parallel * layer inside functions

I want to use the parallel functionality of the plyr package inside functions.

I would think that the correct way to export objects that were created in the body of the function (in this example, the df_2 object) is as follows

 # rm(list=ls()) library(plyr) library(doParallel) workers=makeCluster(2) registerDoParallel(workers,core=2) plyr_test=function() { df_1=data.frame(type=c("a","b"),x=1:2) df_2=data.frame(type=c("a","b"),x=3:4) #export df_2 via .paropts ddply(df_1,"type",.parallel=TRUE,.paropts=list(.export="df_2"),.fun=function(y) { merge(y,df_2,all=FALSE,by="type") }) } plyr_test() stopCluster(workers) 

However this causes an error

 Error in e$fun(obj, substitute(ex), parent.frame(), e$data) : unable to find variable "df_2" 

So, I did some research and found out that it works if I export df_2 manually

 workers=makeCluster(2) registerDoParallel(workers,core=2) plyr_test_2=function() { df_1=data.frame(type=c("a","b"),x=1:2) df_2=data.frame(type=c("a","b"),x=3:4) #manually export df_2 clusterExport(cl=workers,varlist=list("df_2"),envir=environment()) ddply(df_1,"type",.parallel=TRUE,.fun=function(y) { merge(y,df_2,all=FALSE,by="type") }) } plyr_test_2() stopCluster(workers) 

It gives the correct result.

  type xx xy 1 a 1 3 2 b 2 4 

But I also found that the following code works

 workers=makeCluster(2) registerDoParallel(workers,core=2) plyr_test_3=function() { df_1=data.frame(type=c("a","b"),x=1:2) df_2=data.frame(type=c("a","b"),x=3:4) #no export at all! ddply(df_1,"type",.parallel=TRUE,.fun=function(y) { merge(y,df_2,all=FALSE,by="type") }) } plyr_test_3() stopCluster(workers) 

plyr_test_3() also gives the correct result, and I don't understand why. I would think that I need to export df_2 ...

My question is: what is the correct way to solve parallel *ply functions inside? Obviously plyr_test() is incorrect. I somehow feel that manual export to plyr_test_2() useless. But I also think plyr_test_3() is kind of a bad coding style. Can anyone comment on this? Thanks guys!

+5
source share
3 answers

The problem with plyr_test is that df_2 is defined in plyr_test , which is not available from the doParallel package, and therefore it fails when it tries to export df_2 . So this is a problem. plyr_test2 fixes this problem because it does not try to use the .export parameter, but, as you might guess, calling clusterExport not required.

The reason for the success of plyr_test2 and plyr_test3 is that df_2 serialized along with an anonymous function, which is passed to the ddply function using the .fun argument. In fact, both df_1 and df_2 serialized along with an anonymous function, because this function is defined inside plyr_test2 and plyr_test3 . It is useful that df_2 included in this case, but the inclusion of df_1 not necessary and may harm your work.

As long as df_2 is df_2 in an anonymous function environment, no other df_2 value will ever be used, no matter what you export. If you can not prevent his capture, it is pointless to export it either via .export , or clusterExport , because it will use the captured value. You may get into trouble (as you did .export ) by trying to export it to workers.

Note that in this case, foreach will not auto- df_2 because it cannot df_2 body of the anonymous function to see which characters are referenced. If you call foreach directly without using an anonymous function, then it will see the link and automatically export it, which makes it unnecessary to explicitly export it using .export .

You can prevent serialization of the plyr_test environment with an anonymous function by changing its environment before passing it to ddply :

 plyr_test=function() { df_1=data.frame(type=c("a","b"),x=1:2) df_2=data.frame(type=c("a","b"),x=3:4) clusterExport(cl=workers,varlist=list("df_2"),envir=environment()) fun=function(y) merge(y, df_2, all=FALSE, by="type") environment(fun)=globalenv() ddply(df_1,"type",.parallel=TRUE,.fun=fun) } 

One of the advantages of the foreach package is that it does not encourage you to create a function inside another function that can accidentally capture a bunch of variables.


This issue tells me that foreach should include the option .exportenv , which is similar to option clusterExport envir . This would be very useful for plyr , as it would allow df_2 to be correctly exported using .export . However, this exported value will still not be used if the environment containing df_2 not been removed from the .fun function.

+1
source

It looks like a problem with the area.

Here is my "test suite" that allows me to export various variables or not create df_2 inside a function. I add and remove dumm_df_2 and df_3 outside the function and compare.

 library(plyr) library(doParallel) workers=makeCluster(2) registerDoParallel(workers,core=2) plyr_test=function(exportvar,makedf_2) { df_1=data.frame(type=c("a","b"),x=1:2) if(makedf_2){ df_2=data.frame(type=c("a","b"),x=3:4) } print(ls()) ddply(df_1,"type",.parallel=TRUE,.paropts=list(.export=exportvar,.verbose = TRUE),.fun=function(y) { z <- merge(y,df_2,all=FALSE,by="type") }) } ls() rm(df_2,df_3) plyr_test("df_2",T) plyr_test("df_2",F) plyr_test("df_3",T) plyr_test("df_3",F) plyr_test(NULL,T) #ok plyr_test(NULL,F) df_2='hi' ls() plyr_test("df_2",T) #ok plyr_test("df_2",F) plyr_test("df_3",T) plyr_test("df_3",F) plyr_test(NULL,T) #ok plyr_test(NULL,F) df_3 = 'hi' ls() plyr_test("df_2",T) #ok plyr_test("df_2",F) plyr_test("df_3",T) #ok plyr_test("df_3",F) plyr_test(NULL,T) #ok plyr_test(NULL,F) rm(df_2) ls() plyr_test("df_2",T) plyr_test("df_2",F) plyr_test("df_3",T) #ok plyr_test("df_3",F) plyr_test(NULL,T) #ok plyr_test(NULL,F) 

I don’t know why, but .export is looking for df_2 in the global environment outside the function (I saw parent.env () in the code, which may be “more correct” than the global environment), while the calculation requires the variable to be found in the same environment as ddply, and automatically exported it.

Using a dummy variable for df_2 outside the function allows .export to work, and the calculation uses df_2 inside.

When .export cannot find the variable outside the function, it returns:

 Error in e$fun(obj, substitute(ex), parent.frame(), e$data) : unable to find variable "df_2" 

With dm_2 a dummy variable outside the function, but without an internal one, .export is fine, but ddply outputs:

 Error in do.ply(i) : task 1 failed - "object 'df_2' not found" 

Perhaps, since this is a small example or perhaps not parallelized, it actually runs on a single core and avoids the need to export anything. A larger example might fail without .export, but someone might try this.

+1
source

Thanks @ARobertson for your help! It is very interesting that plyr_test("df_2",T) works when the dummy object df_2 was defined outside the function body.

It seems that ddply ultimately calls llply , which in turn calls foreach(...) %dopar% {...} .

I also tried to reproduce the problem with foreach , but foreach works fine.

 library(plyr) library(doParallel) workers=makeCluster(2) registerDoParallel(workers,core=2) foreach_test=function() { df_1=data.frame(type=c("a","b"),x=1:2) df_2=data.frame(type=c("a","b"),x=3:4) foreach(y=split(df_1,df_1$type),.combine="rbind",.export="df_2") %dopar% { #also print process ID to be sure that we really use different R script processes cbind(merge(y,df_2,all=FALSE,by="type"),Sys.getpid()) } } foreach_test() stopCluster(workers) 

He gives a warning

 Warning message: In e$fun(obj, substitute(ex), parent.frame(), e$data) : already exporting variable(s): df_2 

but returns the correct result

  type xx xy Sys.getpid() 1 a 1 3 216 2 b 2 4 1336 

So foreach will automatically export df_2 . Indeed, foreach vignette claims that

Function

...% dopar% noticed that these variables are referenced and that they were defined in the current environment. In this case,% dopar% automatically exports them in parallel execution of workers once, and use them for all expression evaluations for this foreach execution ....

Therefore, we can omit .export="df_2" and use

 library(plyr) library(doParallel) workers=makeCluster(2) registerDoParallel(workers,core=2) foreach_test_2=function() { df_1=data.frame(type=c("a","b"),x=1:2) df_2=data.frame(type=c("a","b"),x=3:4) foreach(y=split(df_1,df_1$type),.combine="rbind") %dopar% { #also print process ID to be sure that we really use different R script processes cbind(merge(y,df_2,all=FALSE,by="type"),Sys.getpid()) } } foreach_test_2() stopCluster(workers) 

instead of this. This is appreciated without warning.

An example of the dummy variable ARobertson and the fact that foreach works fine makes me now think that there is a problem with how * ply handles environments.

My conclusion:

Both plyr_test_3() and foreach_test_2() (which do not explicitly export df_2 ) both work without errors and give the same result. Therefore ddply with parallel=TRUE basically works. BUT, using a more “detailed” coding style (that is, explicitly exporting df_2 ), for example, in plyr_test() , it generates an error, while foreach(...) %dopar% {...} generates only a warning.

0
source

Source: https://habr.com/ru/post/1209156/


All Articles