1#!/usr/bin/env cwl-runner
2### Pipeline to aggregate data in NetCDF format over given geographies
3# Copyright (c) 2021-2022. Harvard University
4#
5# Developed by Research Software Engineering,
6# Faculty of Arts and Sciences, Research Computing (FAS RC)
7# Author: Michael A Bouzinier
8#
9# Licensed under the Apache License, Version 2.0 (the "License");
10# you may not use this file except in compliance with the License.
11# You may obtain a copy of the License at
12#
13# http://www.apache.org/licenses/LICENSE-2.0
14#
15# Unless required by applicable law or agreed to in writing, software
16# distributed under the License is distributed on an "AS IS" BASIS,
17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18# See the License for the specific language governing permissions and
19# limitations under the License.
20#
21
22cwlVersion: v1.2
23class: Workflow
24
25requirements:
26 SubworkflowFeatureRequirement: {}
27 StepInputExpressionRequirement: {}
28 InlineJavascriptRequirement: {}
29 ScatterFeatureRequirement: {}
30 MultipleInputFeatureRequirement: {}
31 NetworkAccess:
32 networkAccess: True
33
34
35doc: |
36 Workflow to aggregate pollution data coming in NetCDF format
37 over given geographies (zip codes or counties) and output as
38 CSV files. This is a wrapper around actual aggregation of
39 one file allowing to scatter (parallelize) the aggregation
40 over years.
41
42 The output of the workflow are gzipped CSV files containing
43 aggregated data.
44
45 Optionally, the aggregated data can be ingested into a database
46 specified in the connection parameters:
47
48 * `database.ini` file containing connection descriptions
49 * `connection_name` a string referring to a section in the `database.ini`
50 file, identifying specific connection to be used.
51
52 The workflow can be invoked either by providing command line options
53 as in the following example:
54
55 toil-cwl-runner --retryCount 1 --cleanWorkDir never \
56 --outdir /scratch/work/exposures/outputs \
57 --workDir /scratch/work/exposures \
58 pm25_yearly_download.cwl \
59 --database /opt/local/database.ini \
60 --connection_name dorieh \
61 --downloads s3://nsaph-public/data/exposures/wustl/ \
62 --strategy default \
63 --geography zcta \
64 --shape_file_collection tiger \
65 --table pm25_annual_components_mean
66
67 Or, by providing a YaML file (see [example](../test_exposure_job))
68 with similar options:
69
70 toil-cwl-runner --retryCount 1 --cleanWorkDir never \
71 --outdir /scratch/work/exposures/outputs \
72 --workDir /scratch/work/exposures \
73 pm25_yearly_download.cwl test_exposure_job.yml
74
75
76inputs:
77 proxy:
78 type: string?
79 default: ""
80 doc: HTTP/HTTPS Proxy if required
81 downloads:
82 type: Directory
83 doc: |
84 Local or AWS bucket folder containing netCDF grid files, downloaded
85 and unpacked from Washington University in St. Louis (WUSTL) Box
86 site. Annual and monthly data repositories are described in
87 [WUSTL Atmospheric Composition Analysis Group](https://sites.wustl.edu/acag/datasets/surface-pm2-5/).
88
89 The annual data for PM2.5 is also available in
90 a Harvard URC AWS Bucket: `s3://nsaph-public/data/exposures/wustl/`
91 geography:
92 type: string
93 doc: |
94 Type of geography: zip codes or counties
95 Supported values: "zip", "zcta" or "county"
96 years:
97 type: int[]
98 default: [2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017]
99 variable:
100 type: string
101 default: PM25
102 doc: |
103 The main variable that is being aggregated over shapes. We have tested
104 the pipeline for PM25
105 component:
106 type: string[]
107 default: [BC, NH4, NIT, OM, SO4, SOIL, SS]
108 doc: |
109 Optional components provided as percentages in a separate set
110 of netCDF files
111 strategy:
112 type: string
113 default: auto
114 doc: |
115 Rasterization strategy, see
116 [documentation](https://nsaph-data-platform.github.io/nsaph-platform-docs/common/gridmet/doc/strategy.html)
117 for the list of supported values and explanations
118 ram:
119 type: string
120 default: 2GB
121 doc: Runtime memory, available to the process
122
123 shape_file_collection:
124 type: string
125 default: tiger
126 doc: |
127 [Collection of shapefiles](https://www2.census.gov/geo/tiger),
128 either GENZ or TIGER
129 database:
130 type: File
131 doc: |
132 Path to database connection file, usually database.ini.
133 This argument is ignored if `connection_name` == `None`
134
135 connection_name:
136 type: string
137 doc: |
138 The name of the section in the database.ini file or a literal
139 `None` to skip over database ingestion step
140 table:
141 type: string
142 doc: The name of the table to store teh aggregated data in
143 default: pm25_aggregated
144
145
146steps:
147 initdb:
148 run: initdb.cwl
149 doc: Ensure that database utilities are at their latest version
150 in:
151 database: database
152 connection_name: connection_name
153 out:
154 - log
155 - err
156
157 process:
158 doc: Downloads raw data and aggregates it over shapes and time
159 scatter:
160 - year
161 run: aggregate_one_file.cwl
162 in:
163 proxy: proxy
164 downloads: downloads
165 geography: geography
166 shape_file_collection: shape_file_collection
167 year: years
168 variable: variable
169 component: component
170 strategy: strategy
171 ram: ram
172 table: table
173 depends_on: initdb/log
174 out:
175 - shapes
176 - aggregate_data
177 - consolidated_data
178 - aggregate_log
179 - aggregate_err
180 - data_dictionary
181
182 extract_data_dictionary:
183 run:
184 class: ExpressionTool
185 inputs:
186 yaml_files:
187 type: File[]
188 outputs:
189 data_dictionary:
190 type: File
191 expression: |
192 ${
193 return {data_dictionary: inputs.yaml_files[0]}
194 }
195 in:
196 yaml_files: process/data_dictionary
197 out:
198 - data_dictionary
199
200 ingest:
201 run: ingest.cwl
202 when: $(inputs.connection_name.toLowerCase() != 'none')
203 doc: Uploads data into the database
204 in:
205 registry: extract_data_dictionary/data_dictionary
206 domain:
207 valueFrom: "exposures"
208 table: table
209 input: process/aggregate_data
210 database: database
211 connection_name: connection_name
212 out: [log, errors]
213
214 index:
215 run: index.cwl
216 when: $(inputs.connection_name.toLowerCase() != 'none')
217 in:
218 depends_on: ingest/log
219 registry: extract_data_dictionary/data_dictionary
220 domain:
221 valueFrom: "exposures"
222 table: table
223 database: database
224 connection_name: connection_name
225 out: [log, errors]
226
227 vacuum:
228 run: vacuum.cwl
229 when: $(inputs.connection_name.toLowerCase() != 'none')
230 in:
231 depends_on: index/log
232 registry: extract_data_dictionary/data_dictionary
233 domain:
234 valueFrom: "exposures"
235 table: table
236 database: database
237 connection_name: connection_name
238 out: [log, errors]
239
240
241
242outputs:
243 aggregate_data:
244 type: File[]
245 outputSource: process/aggregate_data
246 data_dictionary:
247 type: File
248 outputSource: extract_data_dictionary/data_dictionary
249 doc: Data dictionary file, in YaML format, describing output variables
250 consolidated_data:
251 type: File[]
252 outputSource: process/consolidated_data
253 shapes:
254 type:
255 type: array
256 items:
257 type: array
258 items: [File]
259 outputSource: process/shapes
260
261 aggregate_log:
262 type:
263 type: array
264 items: Any
265
266 outputSource: process/aggregate_log
267 aggregate_err:
268 type: File[]
269 outputSource: process/aggregate_err
270
271 ingest_log:
272 type: File
273 outputSource: ingest/log
274 index_log:
275 type: File
276 outputSource: index/log
277 vacuum_log:
278 type: File
279 outputSource: vacuum/log
280 ingest_err:
281 type: File
282 outputSource: ingest/errors
283 index_err:
284 type: File
285 outputSource: index/errors
286 vacuum_err:
287 type: File
288 outputSource: vacuum/errors