Using dask to schedule tasks for running machine learning models in parallel

So basically I want to run ML pipelines in parallel. I use scikit-learn and I decided to use DaskGridSearchCV.

I have a list of objects gridSearchCV = DaskGridSearchCV(pipeline, grid, scoring=evaluator)and I start each of them sequentially:

for gridSearchCV in list:
    gridSearchCV.fit(train_data, train_target)
    predicted = gridSearchCV.predict(test_data)

If I have N different objects GridSearch, I want to use as many resources as possible. If there are resources to run 2, 3, 4, ... or N at the same time, I want to do this.

So, I started trying a few things based on the dask documentation. At first I tried dask.threadedand dask.multiprocessing, but it ends more slowly, and I keep getting:

/Library/Python/2.7/site-packages/sklearn/externals/joblib/parallel.py:540: UserWarning: Multiprocessing backed parallel loops cannot be nested below threads, setting n_jobs=1

This is a piece of code:

def run_pipeline(self, gs, data):

    train_data, test_data, train_target, expected = train_test_split(data, target, test_size=0.25, random_state=33)

    model = gs.fit(train_data, train_target)
    predicted = gs.predict(test_data)


values = [delayed(run_pipeline)(gs, df) for gs in gs_list]
compute(*values, get=dask.threaded.get)

, , ?

+4
1

, GridSearch, DecisionTree, RandomForest. , .

, . Scikit-Learn Pipelines grid- , GridSearchCV (, scikit-learn, . ). ( ), Pipeline . :

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
import dask_searchcv as dcv

pipeline = Pipeline([('est', DecisionTreeClassifier())])

grid = [
    {'est': [DecisionTreeClassifier()],
     'max_features': ['sqrt', 'log2'],
     # more parameters for DecisionTreeClassifier
    },
    {'est': [RandomForestClassifier()],
     'max_features': ['sqrt', 'log2'],
     # more parameters for RandomForesetClassifier
    },
    # more estimator/parameter subsets
]

gs = dcv.GridSearchCV(pipeline, grid)
gs.fit(train_data, train_target)
gs.predict(test_data)

, ( , :

grid = {'est': [DecisionTreeClassifier(), RandomForestClassifier()],
        'max_features': ['sqrt', 'log2'],
        # more parameters for all estimators}

, - dask.delayed , . fit dask_searchcv.GridSearchCV ( dask ) delayed ( dask ), dask, , .

+2

Source: https://habr.com/ru/post/1676619/


All Articles