Data Processing Pipelines

Introduction

Most computational studies acquire and process data using a data processing pipeline. Such pipelines are composed of multiple steps, where each step may be a script, binary executable, or a specific data transformation. These steps often depend on the completion and outcome of previous steps, so the entire workflow can be naturally represented as a Directed Acyclic Graph (DAG): nodes represent workflow steps, and edges indicate their dependencies.

This DAG-based representation allows for complex pipeline topologies, including parallelization and multiple dependencies. However, describing such logic in procedural programming languages quickly becomes unwieldy and hard to maintain. To address this challenge, domain-specific workflow description languages (DSLs) have been developed.

Workflow Description Languages

For reproducibility and repeatability, pipelines are commonly specified using descriptive workflow DSLs. In bioinformatics and other scientific domains, three such languages are widely used. The most prevalent is the Common Workflow Language (CWL), due to its extensive community support, number of published workflows, and broad platform compatibility. All pipelines published in this documentation use CWL.

Descriptive workflow languages separate the definition of pipeline structure (topology, inputs, outputs, requirements) from the implementation of processing steps. While these languages were pioneered in bioinformatics—often operating on well-defined inputs and known tools—they are also applicable to fields with more diverse and less standardized data, such as population health research. In these domains, many workflow steps may focus on complex data transformation and harmonization.

Some workflows require database connection during the execution. See Managing database connections section. for details

Running Workflows

Tested runners

CWL is a “write once, run anywhere” language. A pipeline developed and tested in one environment (such as a laptop) will run on clusters and cloud platforms using any compliant runner. For the latest list of compatible runners, visit the CWL Implementations page.

We have successfully used cwltool, CWL-Airflow, and Toil:

  • Toil: Highly featured, supports AWS Batch and pipeline resumption after failures. Output can be quite verbose. We recommend it for production and development use.

  • CWL-Airflow: Provides a graphical interface for managing and visualizing workflows.

  • cwltool: Lightweight reference implementation, ideal for development for those who prefer to avoid Toil.

See the Toil documentation for additional details on using Toil for running CWL workflows.

Providing parameters to the pipelines

Pipeline parameters are supplied on the command line (as double-dash -- options) or via YAML or JSON files.

When using YAML to specify files or directories as inputs, use this structure:

my_file:
  path: /path/to/data.nc
  class: File
my_directory:
  path: /path/to/data/downloads
  class: Directory

Using Toil

For hands-on examples, refer to the Dorieh Examples Folder on GitHub. There you will find:

Quick Start with Toil

  1. Install Toil with AWS and CWL support:

     pip install toil[aws,cwl]
    
  2. Enable pipeline resumption: Use the --jobStore option when running the pipeline. To resume after a failure, use the same command plus --restart.

Sample command:

toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
  --outdir /scratch/work/exposures/outputs \ 
  --workDir /scratch/work/exposures \
  --jobStore /scratch/work/someDir123
  pm25_yearly_download.cwl test_exposure_job.yml 

Most Dorieh workflows consist of multiple steps, each producing two log files: a progress log (*.log) and an error log (*.err). On success, error logs are usually empty; all logs are collected under the --outdir directory.

A successful pipeline run typically emits a JSON object in standard output or log file, for example:

{
  "qc_ev_create_log": {
    "location": "file:///shared/dorieh-logs/data_loader-2025-03-05-09-48-27.log_2",
    "basename": "data_loader-2025-03-05-09-48-27.log",
    "nameroot": "data_loader-2025-03-05-09-48-27",
    "nameext": ".log",
    "class": "File",
    "checksum": "sha1$fa01481303c9030c8095387661a3bdc6851fc1ed",
    "size": 12060,
    "path": "/shared/dorieh-logs/data_loader-2025-03-05-09-48-27.log_2"
  },
  "registry": {
    "location": "file:///shared/dorieh-logs/data.yaml",
    "basename": "data.yaml",
    "nameroot": "data",
    "nameext": ".yaml",
    "class": "File",
    "checksum": "sha1$73700ade239b3a0c5f755ef694f05aebb4442c68",
    "size": 140457,
    "path": "/shared/dorieh-logs/ingestion/2015/outputs/cms.yaml"
  }
}

Alternatively, you can confirm successful completion by searching the logs for either of: Finished toil run successfully or CWL run complete!:

grep 'Finished toil run successfully' 1_2015prod.log
grep 'CWL run complete!' 1_2015prod.log

A failed run usually ends with a PermanentFail message, although abrupt system failures may not log this. If a log file remains unmodified for several hours, the workflow is likely not running.

Some workflow steps, especially those involving database transformations, may run for extended periods but produce little log output. Check individual step logs for progress if you suspect issues.

Troubleshooting Workflows run by Toil

To check for errors across all runs:

find /shared/dorieh-logs/ -type f -name "*.err" -size +0c -exec ls -alF {} \;

No output indicates no errors. Non-empty .err files should be examined.

For logs from a specific run (e.g., subdirectory per run):

find /shared/dorieh-logs/toilwf-c36b795b68935d99be01ed1556c85b1e/ -type f -name "*.err" -size +0c -exec ls -alF {} \;

To see all error logs regardless of whether they are empty or not omit the -size +0c filter:

find /shared/dorieh-logs/toilwf-c36b795b68935d99be01ed1556c85b1e/ -type f -name "*.err" -exec ls -alF {} \;

Testing workflows

Pipelines can be tested using included DBT Pipeline Testing Framework

More detailed document that describes testing is: Testing bundled workflows.

Published and tested workflows

Developing New Workflows

Combining included CWL tools into a new workflow

Dorieh provides multiple pre-packaged CWL tools, which you can mix and match into custom workflows. Use the CWL output collection utility to help generate CWL code snippets for new workflows.

Wrapping python modules as CWL tools

Consider using cwl2argparse or browse other CWL development tools for converting Python modules into CWL tools.