pm25_yearly_download.cwl

  1#!/usr/bin/env cwl-runner
  2### Pipeline to aggregate data in NetCDF format over given geographies
  3#  Copyright (c) 2021-2022. Harvard University
  4#
  5#  Developed by Research Software Engineering,
  6#  Faculty of Arts and Sciences, Research Computing (FAS RC)
  7#  Author: Michael A Bouzinier
  8#
  9#  Licensed under the Apache License, Version 2.0 (the "License");
 10#  you may not use this file except in compliance with the License.
 11#  You may obtain a copy of the License at
 12#
 13#         http://www.apache.org/licenses/LICENSE-2.0
 14#
 15#  Unless required by applicable law or agreed to in writing, software
 16#  distributed under the License is distributed on an "AS IS" BASIS,
 17#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 18#  See the License for the specific language governing permissions and
 19#  limitations under the License.
 20#
 21
 22cwlVersion: v1.2
 23class: Workflow
 24
 25requirements:
 26  SubworkflowFeatureRequirement: {}
 27  StepInputExpressionRequirement: {}
 28  InlineJavascriptRequirement: {}
 29  ScatterFeatureRequirement: {}
 30  MultipleInputFeatureRequirement: {}
 31  NetworkAccess:
 32    networkAccess: True
 33
 34
 35doc: |
 36  Workflow to aggregate pollution data coming in NetCDF format
 37  over given geographies (zip codes or counties) and output as 
 38  CSV files. This is a wrapper around actual aggregation of
 39  one file allowing to scatter (parallelize) the aggregation
 40  over years.
 41  
 42  The output of the workflow are gzipped CSV files containing
 43  aggregated data. 
 44  
 45  Optionally, the aggregated data can be ingested into a database
 46  specified in the connection parameters:
 47  
 48  * `database.ini` file containing connection descriptions
 49  * `connection_name`  a string referring to a section in the `database.ini`
 50     file, identifying specific connection to be used.
 51
 52  The workflow can be invoked either by providing command line options 
 53  as in the following example:
 54  
 55      toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
 56          --outdir /scratch/work/exposures/outputs \ 
 57          --workDir /scratch/work/exposures \
 58          pm25_yearly_download.cwl \  
 59          --database /opt/local/database.ini \ 
 60          --connection_name dorieh \ 
 61          --downloads s3://nsaph-public/data/exposures/wustl/ \ 
 62          --strategy default \ 
 63          --geography zcta \ 
 64          --shape_file_collection tiger \ 
 65          --table pm25_annual_components_mean
 66
 67  Or, by providing a YaML file (see [example](../test_exposure_job)) 
 68  with similar options:
 69  
 70      toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
 71          --outdir /scratch/work/exposures/outputs \ 
 72          --workDir /scratch/work/exposures \
 73          pm25_yearly_download.cwl test_exposure_job.yml 
 74  
 75
 76inputs:
 77  proxy:
 78    type: string?
 79    default: ""
 80    doc: HTTP/HTTPS Proxy if required
 81  downloads:
 82    type: Directory
 83    doc: |
 84      Local or AWS bucket folder containing netCDF grid files, downloaded 
 85      and unpacked from Washington University in St. Louis (WUSTL) Box
 86      site. Annual and monthly data repositories are described in
 87      [WUSTL Atmospheric Composition Analysis Group](https://sites.wustl.edu/acag/datasets/surface-pm2-5/).
 88      
 89      The annual data for PM2.5 is also available in 
 90      a Harvard URC AWS Bucket: `s3://nsaph-public/data/exposures/wustl/`
 91  geography:
 92    type: string
 93    doc: |
 94      Type of geography: zip codes or counties
 95      Supported values: "zip", "zcta" or "county"
 96  years:
 97    type: int[]
 98    default: [2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017]
 99  variable:
100    type: string
101    default:  PM25
102    doc: |
103      The main variable that is being aggregated over shapes. We have tested
104      the pipeline for PM25
105  component:
106    type: string[]
107    default: [BC, NH4, NIT, OM, SO4, SOIL, SS]
108    doc: |
109      Optional components provided as percentages in a separate set 
110      of netCDF files
111  strategy:
112    type: string
113    default: auto
114    doc: |
115      Rasterization strategy, see
116      [documentation](https://nsaph-data-platform.github.io/nsaph-platform-docs/common/gridmet/doc/strategy.html)
117      for the list of supported values and explanations
118  ram:
119    type: string
120    default: 2GB
121    doc: Runtime memory, available to the process
122
123  shape_file_collection:
124    type: string
125    default: tiger
126    doc: |
127      [Collection of shapefiles](https://www2.census.gov/geo/tiger), 
128      either GENZ or TIGER
129  database:
130    type: File
131    doc: |
132      Path to database connection file, usually database.ini. 
133      This argument is ignored if `connection_name` == `None`
134
135  connection_name:
136    type: string
137    doc: |
138      The name of the section in the database.ini file or a literal
139      `None` to skip over database ingestion step
140  table:
141    type: string
142    doc: The name of the table to store teh aggregated data in
143    default: pm25_aggregated
144
145
146steps:
147  initdb:
148    run: initdb.cwl
149    doc: Ensure that database utilities are at their latest version
150    in:
151      database: database
152      connection_name: connection_name
153    out:
154      - log
155      - err
156
157  process:
158    doc: Downloads raw data and aggregates it over shapes and time
159    scatter:
160      - year
161    run: aggregate_one_file.cwl
162    in:
163      proxy: proxy
164      downloads: downloads
165      geography: geography
166      shape_file_collection: shape_file_collection
167      year: years
168      variable: variable
169      component: component
170      strategy: strategy
171      ram: ram
172      table: table
173      depends_on: initdb/log
174    out:
175      - shapes
176      - aggregate_data
177      - consolidated_data
178      - aggregate_log
179      - aggregate_err
180      - data_dictionary
181
182  extract_data_dictionary:
183    run:
184      class: ExpressionTool
185      inputs:
186        yaml_files:
187          type: File[]
188      outputs:
189        data_dictionary:
190          type: File
191      expression: |
192        ${
193          return {data_dictionary: inputs.yaml_files[0]}
194        }
195    in:
196      yaml_files: process/data_dictionary
197    out:
198      - data_dictionary
199
200  ingest:
201    run: ingest.cwl
202    when: $(inputs.connection_name.toLowerCase() != 'none')
203    doc: Uploads data into the database
204    in:
205      registry: extract_data_dictionary/data_dictionary
206      domain:
207        valueFrom: "exposures"
208      table: table
209      input: process/aggregate_data
210      database: database
211      connection_name: connection_name
212    out: [log, errors]
213
214  index:
215    run: index.cwl
216    when: $(inputs.connection_name.toLowerCase() != 'none')
217    in:
218      depends_on: ingest/log
219      registry: extract_data_dictionary/data_dictionary
220      domain:
221        valueFrom: "exposures"
222      table: table
223      database: database
224      connection_name: connection_name
225    out: [log, errors]
226
227  vacuum:
228    run: vacuum.cwl
229    when: $(inputs.connection_name.toLowerCase() != 'none')
230    in:
231      depends_on: index/log
232      registry: extract_data_dictionary/data_dictionary
233      domain:
234        valueFrom: "exposures"
235      table: table
236      database: database
237      connection_name: connection_name
238    out: [log, errors]
239
240
241
242outputs:
243  aggregate_data:
244    type: File[]
245    outputSource: process/aggregate_data
246  data_dictionary:
247    type: File
248    outputSource: extract_data_dictionary/data_dictionary
249    doc: Data dictionary file, in YaML format, describing output variables
250  consolidated_data:
251    type: File[]
252    outputSource: process/consolidated_data
253  shapes:
254    type:
255      type: array
256      items:
257        type: array
258        items: [File]
259    outputSource: process/shapes
260
261  aggregate_log:
262    type:
263      type: array
264      items: Any
265
266    outputSource: process/aggregate_log
267  aggregate_err:
268    type: File[]
269    outputSource: process/aggregate_err
270
271  ingest_log:
272    type: File
273    outputSource: ingest/log
274  index_log:
275    type: File
276    outputSource: index/log
277  vacuum_log:
278    type: File
279    outputSource: vacuum/log
280  ingest_err:
281    type: File
282    outputSource: ingest/errors
283  index_err:
284    type: File
285    outputSource: index/errors
286  vacuum_err:
287    type: File
288    outputSource: vacuum/errors