| Title: | Orchestrate Functional Workflows |
|---|---|
| Description: | Make it easy for users to adopt a functional approach to their analysis. Pipelines are defined using a reduced subset of R and, when run, use an intelligent approach to work planning and object caching. Whilst the goal is to be "good enough" for many pieces of analysis, by encouraging a functional approach, it remains easy to move to more advanced alternatives such as targets (<doi:10.32614/CRAN.package.targets>). |
| Authors: | Tim Taylor [aut, cre, cph] (ORCID: <https://orcid.org/0000-0002-8587-7113>) |
| Maintainer: | Tim Taylor <[email protected]> |
| License: | GPL-2 | GPL-3 |
| Version: | 0.0.0.9165 |
| Built: | 2026-05-22 16:29:08 UTC |
| Source: | https://codeberg.org/TimTaylor/pipeline |
These functions allow users to attach (and detach) the pipeline functions
and scenario to the R search path which can be useful during development.
They are attached as local:pipelinedevenv. Whether this is a good idea or
not is to be determined.
pipeline_attach( scenario = NULL, scenario_default = getOption("pipeline.scenario_default", "default"), pipeline_dir = ".", relative_config_file = getOption("pipeline.config_file", "config.R"), relative_function_dir = getOption("pipeline.function_dir", "R"), relative_output_dir = getOption("pipeline.output_dir", "output") ) pipeline_detach()pipeline_attach( scenario = NULL, scenario_default = getOption("pipeline.scenario_default", "default"), pipeline_dir = ".", relative_config_file = getOption("pipeline.config_file", "config.R"), relative_function_dir = getOption("pipeline.function_dir", "R"), relative_output_dir = getOption("pipeline.output_dir", "output") ) pipeline_detach()
scenario |
Configuration you wish to attach. If Character input will be treated as configuration you wish to consider. In
this case the configuration file will first be parsed looking for a
named-list entry corresponding to the value of the |
scenario_default |
The default scenario to consider. If |
pipeline_dir |
The directory you wish to run the the pipeline relative to. |
relative_config_file |
The configuration file. Must be a none-nested and given relative to |
relative_function_dir |
The directory to look for user defined pipeline functions. Must be a none-nested and given relative to |
relative_output_dir |
The directory for output. Must be a none-nested and given relative to |
NULL (invisibly). Called only for side effects.
# generate a demo pipeline with a single scenario dir <- pipeline_skeleton(tempfile()) ## Not run: # If we attach the 'default' scenario then both the CONFIG and the # defined functions will be available to us. pipeline_attach(scenario = "default", pipeline_dir = dir) CONFIG exists("load_dat") && is.function(load_dat) pipeline_attach(scenario = "production", pipeline_dir = dir) CONFIG pipeline_detach() exists("load_dat") ## End(Not run) unlink(dir)# generate a demo pipeline with a single scenario dir <- pipeline_skeleton(tempfile()) ## Not run: # If we attach the 'default' scenario then both the CONFIG and the # defined functions will be available to us. pipeline_attach(scenario = "default", pipeline_dir = dir) CONFIG exists("load_dat") && is.function(load_dat) pipeline_attach(scenario = "production", pipeline_dir = dir) CONFIG pipeline_detach() exists("load_dat") ## End(Not run) unlink(dir)
pipeline_run() manages the running of a user defined pipeline simplifying
object caching and configuration management across different scenarios.
pipeline_run( x, ..., scenario = NULL, scenario_default = getOption("pipeline.scenario_default", "default"), pipeline_dir = ".", relative_config_file = getOption("pipeline.config_file", "config.R"), relative_function_dir = getOption("pipeline.function_dir", "R"), relative_output_dir = getOption("pipeline.output_dir", "output"), force = FALSE, saveRDS_args = list(), readRDS_args = list(), return = TRUE, gc = FALSE )pipeline_run( x, ..., scenario = NULL, scenario_default = getOption("pipeline.scenario_default", "default"), pipeline_dir = ".", relative_config_file = getOption("pipeline.config_file", "config.R"), relative_function_dir = getOption("pipeline.function_dir", "R"), relative_output_dir = getOption("pipeline.output_dir", "output"), force = FALSE, saveRDS_args = list(), readRDS_args = list(), return = TRUE, gc = FALSE )
x |
R Expression of pipeline assignments. Normally this will involve multiple assignments and will need to be embraced to represent a single expression. |
... |
Not currently used. |
scenario |
If Character input will be treated as configurations you wish to loop over.
In this case the configuration file will first be parsed looking for a
named-list entry corresponding to the value of the |
scenario_default |
The default scenario to consider. If If |
pipeline_dir |
The directory you wish to run the the pipeline relative to. |
relative_config_file |
The configuration file. Must be a none-nested and given relative to |
relative_function_dir |
The directory to look for user defined pipeline functions. Must be a none-nested and given relative to |
relative_output_dir |
The directory for output. Must be a none-nested and given relative to |
force |
Do you want to force a run of the pipeline. If TRUE, then cached objects are removed and the pipeline is (re)run. If a character vector then the corresponding object(s) are removed from the cache and the pipeline is rerun. |
saveRDS_args |
List of additional arguments passed to This argument allows you to pass additional arguments to that function
(e.g. |
readRDS_args |
List of additional arguments passed to |
return |
Should the output be returned. Defaults to TRUE, but when running across multiple scenarios and with outputs
that use a large amount of memory it can be useful to set If |
gc |
Should we force calls to For pipeline's creating large objects, setting this to TRUE may help reduce memory consumption. |
A named list of outputs for each configuration.
# generate a demo pipeline with a single scenario dir <- pipeline_skeleton(tempfile(), single = TRUE) # Note the configuration file and the folder of R functions list.files(dir, all.files = TRUE, recursive = TRUE, no.. = TRUE) # Run the pipeline out <- pipeline_run( { raw <- load_dat(CONFIG$in_csv) clean <- wrangle_dat(raw, CONFIG$rows) plot <- plot_dat(clean, CONFIG$out_plot) }, pipeline_dir = dir ) # output in a list str(out$default$raw) str(out$default$clean) out$default$plot unlink(dir)# generate a demo pipeline with a single scenario dir <- pipeline_skeleton(tempfile(), single = TRUE) # Note the configuration file and the folder of R functions list.files(dir, all.files = TRUE, recursive = TRUE, no.. = TRUE) # Run the pipeline out <- pipeline_run( { raw <- load_dat(CONFIG$in_csv) clean <- wrangle_dat(raw, CONFIG$rows) plot <- plot_dat(clean, CONFIG$out_plot) }, pipeline_dir = dir ) # output in a list str(out$default$raw) str(out$default$clean) out$default$plot unlink(dir)
Sets up an example pipeline which can be used as the basis for your own.
pipeline_skeleton(dir = "DemoPipeline", single = FALSE)pipeline_skeleton(dir = "DemoPipeline", single = FALSE)
dir |
Directory you wish to create the pipeline in. |
single |
Whether the demo should use a single or multiple scenarios in the demo. |
Absolute path of the created pipeline directory (invisibly).
dir <- pipeline_skeleton(tempfile()) # Note the configuration file and the folder of R functions list.files(dir, all.files = TRUE, recursive = TRUE, no.. = TRUE) unlink(dir)dir <- pipeline_skeleton(tempfile()) # Note the configuration file and the folder of R functions list.files(dir, all.files = TRUE, recursive = TRUE, no.. = TRUE) unlink(dir)