[docs]classDaskLocalExecutor(BaseExecutor):"""Dask executor with a local cluster Creates a `LocalCluster <https://docs.dask.org/en/stable/deploying-python.html#localcluster>`_ and uses it to parallelize execution over local CPU cores (same node as the benchmark) """def__init__(self,**kwargs):"""Create a ``LocalCluster`` and a ``Client`` connected to it. :meta public: """# disable GPU diagnostics to prevent Dask from crashingdask.config.set({"distributed.diagnostics.nvml":False})self.cluster=LocalCluster()self.client=Client(self.cluster)n_workers=kwargs.get("n_workers",1)self.wait_for_workers(n_workers)def__del__(self):"""Shut down the client and the cluster in the end of benchmarking. :meta public: """ifhasattr(self,'cluster')andself.clusterisnotNone:self.cluster.close()ifhasattr(self,'client')andself.clientisnotNone:self.client.close()
[docs]def_execute(self,func,args,**kwargs):"""Execute ``func`` over ``args`` in parallel using ``distributed.Client::submit()``. :meta public: """#scattering is not working well#args_sc = self.client.scatter(args)args_sc=argsfutures=[self.client.submit(func,arg,**kwargs)forarginargs_sc]results=self.client.gather(futures)results=list(results)returnresults
[docs]classDaskGatewayExecutor(BaseExecutor):"""Dask Gateway executor Searches for an existing Gateway cluster and uses it to parallelize execution over multiple nodes using a batch system defined in Dask Gateway's backend (e.g. Slurm). """def__init__(self,**kwargs):self.gateway=Gateway("http://dask-gateway-k8s.geddes.rcac.purdue.edu/",proxy_address="traefik-dask-gateway-k8s.cms.geddes.rcac.purdue.edu:8786",)self._find_gateway_client()n_workers=kwargs.get("n_workers",1)self.wait_for_workers(n_workers)workers=self.cluster.scheduler_info["workers"]self.worker_cores=workers[list(workers.keys())[0]]["nthreads"]def_find_gateway_client(self):"""Searches for an existing Dask Gateway cluster and connects to it automatically. If no Gateway clusters are found, an error is raised. If more than one Gateway cluster is found, connect to the first found one. """clusters=self.gateway.list_clusters()iflen(clusters)==0:raiseException("No Dask Gateway clusters found")first_cluster_name=clusters[0].nameiflen(clusters)>1:print(f"More than 1 Dask Gateway clusters found, will connect to the 1st one: {first_cluster_name}")self.cluster=self.gateway.connect(first_cluster_name)self.client=self.cluster.get_client()
[docs]def_execute(self,func,args,**kwargs):"""Execute ``func`` over ``args`` in parallel using ``distributed.Client::submit()``. :meta public: """#scattering is not working well#args_sc = self.client.scatter(args)args_sc=argsfutures=[self.client.submit(func,arg,**kwargs)forarginargs_sc]results=self.client.gather(futures)results=list(results)returnresults