Data Processing Pipelines
Introduction
Most computational studies acquire and process data using a data processing pipeline. Such pipelines are composed of multiple steps, where each step may be a script, binary executable, or a specific data transformation. These steps often depend on the completion and outcome of previous steps, so the entire workflow can be naturally represented as a Directed Acyclic Graph (DAG): nodes represent workflow steps, and edges indicate their dependencies.
This DAG-based representation allows for complex pipeline topologies, including parallelization and multiple dependencies. However, describing such logic in procedural programming languages quickly becomes unwieldy and hard to maintain. To address this challenge, domain-specific workflow description languages (DSLs) have been developed.
Workflow Description Languages
For reproducibility and repeatability, pipelines are commonly specified using descriptive workflow DSLs. In bioinformatics and other scientific domains, three such languages are widely used. The most prevalent is the Common Workflow Language (CWL), due to its extensive community support, number of published workflows, and broad platform compatibility. All pipelines published in this documentation use CWL.
Descriptive workflow languages separate the definition of pipeline structure (topology, inputs, outputs, requirements) from the implementation of processing steps. While these languages were pioneered in bioinformatics—often operating on well-defined inputs and known tools—they are also applicable to fields with more diverse and less standardized data, such as population health research. In these domains, many workflow steps may focus on complex data transformation and harmonization.
Some workflows require database connection during the execution. See Managing database connections section. for details
Running Workflows
Tested runners
CWL is a “write once, run anywhere” language. A pipeline developed and tested in one environment (such as a laptop) will run on clusters and cloud platforms using any compliant runner. For the latest list of compatible runners, visit the CWL Implementations page.
We have successfully used cwltool, CWL-Airflow, and Toil:
Toil: Highly featured, supports AWS Batch and pipeline resumption after failures. Output can be quite verbose. We recommend it for production and development use.
CWL-Airflow: Provides a graphical interface for managing and visualizing workflows.
cwltool: Lightweight reference implementation, ideal for development for those who prefer to avoid Toil.
See the Toil documentation for additional details on using Toil for running CWL workflows.
Providing parameters to the pipelines
Pipeline parameters are supplied on the command line
(as double-dash -- options) or via YAML or JSON files.
When using YAML to specify files or directories as inputs, use this structure:
my_file:
path: /path/to/data.nc
class: File
my_directory:
path: /path/to/data/downloads
class: Directory
Using Toil
For hands-on examples, refer to the Dorieh Examples Folder on GitHub. There you will find:
Quick Start with Toil
Install Toil with AWS and CWL support:
pip install toil[aws,cwl]
Enable pipeline resumption: Use the
--jobStoreoption when running the pipeline. To resume after a failure, use the same command plus--restart.
Sample command:
toil-cwl-runner --retryCount 1 --cleanWorkDir never \
--outdir /scratch/work/exposures/outputs \
--workDir /scratch/work/exposures \
--jobStore /scratch/work/someDir123
pm25_yearly_download.cwl test_exposure_job.yml
Most Dorieh workflows consist of multiple steps, each producing two log
files: a progress log (*.log) and an error log (*.err). On success,
error logs are usually empty; all logs are collected under the
--outdir directory.
A successful pipeline run typically emits a JSON object in standard output or log file, for example:
{
"qc_ev_create_log": {
"location": "file:///shared/dorieh-logs/data_loader-2025-03-05-09-48-27.log_2",
"basename": "data_loader-2025-03-05-09-48-27.log",
"nameroot": "data_loader-2025-03-05-09-48-27",
"nameext": ".log",
"class": "File",
"checksum": "sha1$fa01481303c9030c8095387661a3bdc6851fc1ed",
"size": 12060,
"path": "/shared/dorieh-logs/data_loader-2025-03-05-09-48-27.log_2"
},
"registry": {
"location": "file:///shared/dorieh-logs/data.yaml",
"basename": "data.yaml",
"nameroot": "data",
"nameext": ".yaml",
"class": "File",
"checksum": "sha1$73700ade239b3a0c5f755ef694f05aebb4442c68",
"size": 140457,
"path": "/shared/dorieh-logs/ingestion/2015/outputs/cms.yaml"
}
}
Alternatively, you can confirm successful completion by searching the
logs for either of: Finished toil run successfully or
CWL run complete!:
grep 'Finished toil run successfully' 1_2015prod.log
grep 'CWL run complete!' 1_2015prod.log
A failed run usually ends with a PermanentFail message, although
abrupt
system failures may not log this. If a log file remains unmodified for
several hours, the workflow is likely not running.
Some workflow steps, especially those involving database transformations, may run for extended periods but produce little log output. Check individual step logs for progress if you suspect issues.
Troubleshooting Workflows run by Toil
To check for errors across all runs:
find /shared/dorieh-logs/ -type f -name "*.err" -size +0c -exec ls -alF {} \;
No output indicates no errors. Non-empty
.errfiles should be examined.
For logs from a specific run (e.g., subdirectory per run):
find /shared/dorieh-logs/toilwf-c36b795b68935d99be01ed1556c85b1e/ -type f -name "*.err" -size +0c -exec ls -alF {} \;
To see all error logs regardless of whether they are empty or not omit
the -size +0c filter:
find /shared/dorieh-logs/toilwf-c36b795b68935d99be01ed1556c85b1e/ -type f -name "*.err" -exec ls -alF {} \;
Testing workflows
Pipelines can be tested using included DBT Pipeline Testing Framework
More detailed document that describes testing is: Testing bundled workflows.
Published and tested workflows
- Pipeline to aggregate data from Climatology Lab
- Pipeline to ingest Monthly Pollution data downloaded from WashU Box
- Pipeline to aggregate data in NetCDF format over given geographies
- Full EPA AQS Processing Pipeline
- Full EPA AirNow Processing Pipeline (including downloading shapefiles)
- Medicare data ingestion and processing pipeline
- Full Medicaid Processing Pipeline
- census_workflow.cwl
Developing New Workflows
Combining included CWL tools into a new workflow
Dorieh provides multiple pre-packaged CWL tools, which you can mix and match into custom workflows. Use the CWL output collection utility to help generate CWL code snippets for new workflows.
Wrapping python modules as CWL tools
Consider using cwl2argparse or browse other CWL development tools for converting Python modules into CWL tools.