This repository was archived by the owner on Nov 10, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparser.py
402 lines (346 loc) · 13.3 KB
/
parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
#!/usr/bin/python
import math
import datetime
import dateutil.parser
import dateutil.tz
import csv
import json
DEBUG = True
def void():
pass
def log(x):
print(x)
debug_log = log if DEBUG else void
def avg(items):
return float(sum(items)) / max(len(items), 1)
ISO_8601_UTC_MEAN = dateutil.tz.tzoffset(None, 0)
# Convert the given ISO time string to timestamps in seconds.
def ISOTimeString2TimeStamp(timeStr):
time = dateutil.parser.parse(timeStr)
isoStartTime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, ISO_8601_UTC_MEAN)
return int((time - isoStartTime).total_seconds())
def tempUnit2K(value, unit):
if unit == 'Deg C':
return value + 273.15
elif unit == 'Deg F':
return (value + 459.67) * 5 / 9
elif unit == 'Deg K':
return value
else:
raise ValueError('Unsupported unit "%s".' % unit)
def relHumidUnit2Percent(value, unit):
if unit == '%':
return value
else:
raise ValueError('Unsupported unit "%s".' % unit)
def speedUnit2MeterPerSecond(value, unit):
if unit == 'meters/second':
return value
else:
raise ValueError('Unsupported unit "%s".' % unit)
def extractXFactor(magnitude, degreeFromNorth):
return magnitude * math.sin(math.radians(degreeFromNorth));
def extractYFactor(magnitude, degreeFromNorth):
return magnitude * math.cos(math.radians(degreeFromNorth));
STATION_GEOMETRY = {
'type': 'Point',
'coordinates': [
# SW Corner.
# @see {@link https://github.com/terraref/extractors-metadata/blob/master/sensorposition/terra.sensorposition.py#L68}
33.0745666667,
-111.9750833333,
0
]
}
# 'AirTC': 'air_temperature',
# 'RH': 'relative_humidity',
# 'Pyro': 'surface_downwelling_shortwave_flux_in_air',
# 'PAR_ref': 'surface_downwelling_photosynthetic_photon_flux_in_air',
# 'WindDir': 'wind_to_direction',
# 'WS_ms': 'wind_speed',
# 'Rain_mm_Tot': 'precipitation_rate'
# Each mapping function can decide to return one or multiple tuple, so leave the list to them.
PROP_MAPPING = {
'AirTC': lambda d: [(
'air_temperature',
tempUnit2K(float(d['value']), d['meta']['unit'])
)],
'RH': lambda d: [(
'relative_humidity',
relHumidUnit2Percent(float(d['value']), d['meta']['unit'])
)],
'Pyro': lambda d: [(
'surface_downwelling_shortwave_flux_in_air',
float(d['value'])
)],
'PAR_ref': lambda d: [(
'surface_downwelling_photosynthetic_photon_flux_in_air',
float(d['value'])
)],
# If Wind Direction is present, split into speed east and speed north it if we can find Wind Speed.
'WindDir': lambda d: [
('eastward_wind', extractXFactor(float(d['record']['WS_ms']), float(d['value']))),
('northward_wind', extractYFactor(float(d['record']['WS_ms']), float(d['value'])))
],
# If Wind Speed is present, process it if we can find Wind Direction.
'WS_ms': lambda d: [(
'wind_speed',
speedUnit2MeterPerSecond(float(d['value']), d['meta']['unit'])
)],
'Rain_mm_Tot': lambda d: [(
'precipitation_rate',
float(d['value'])
)]
}
# Aggregation functions for each property.
PROP_AGGREGATE = {
'air_temperature': avg,
'relative_humidity': avg,
'surface_downwelling_shortwave_flux_in_air': avg,
'surface_downwelling_photosynthetic_photon_flux_in_air': avg,
'eastward_wind': avg,
'northward_wind': avg,
'wind_speed': avg,
'precipitation_rate': sum
}
def transformProps(propMetaDict, propValDict):
newProps = []
for propName in propValDict:
if propName in PROP_MAPPING:
newProps += PROP_MAPPING[propName]({
'meta': propMetaDict[propName],
'value': propValDict[propName],
'record': propValDict
})
return dict(newProps)
def parse_file_header_line(linestr):
return [json.loads(x) for x in str(linestr).split(',')]
# ----------------------------------------------------------------------
# Parse the CSV file and return a list of dictionaries.
def parse_file(filepath, utc_offset = ISO_8601_UTC_MEAN):
results = []
with open(filepath) as csvfile:
# First line is always the header.
# @see {@link https://www.manualslib.com/manual/538296/Campbell-Cr9000.html?page=41#manual}
header_lines = [
csvfile.readline()
]
file_format, station_name, logger_model, logger_serial, os_version, dld_file, dld_sig, table_name = parse_file_header_line(header_lines[0])
if file_format != 'TOA5':
raise ValueError('Unsupported format "%s".' % file_format)
# For TOA5, there are in total 4 header lines.
# @see {@link https://www.manualslib.com/manual/538296/Campbell-Cr9000.html?page=43#manual}
while (len(header_lines) < 4):
header_lines.append(csvfile.readline())
prop_names = parse_file_header_line(header_lines[1])
prop_units = parse_file_header_line(header_lines[2])
prop_sample_method = parse_file_header_line(header_lines[3])
# Associate the above lists.
props = dict()
import logging
for x in range(0, len(prop_names)):
props[prop_names[x]] = {
'title': prop_names[x],
'unit': prop_units[x],
'sample_method': prop_sample_method[x]
}
# [DEBUG] Print the property details if needed.
#print json.dumps(props)
reader = csv.DictReader(csvfile, fieldnames=prop_names)
for row in reader:
timestamp = datetime.datetime.strptime(row['TIMESTAMP'], '%Y-%m-%d %H:%M:%S').isoformat() + utc_offset.tzname(None)
newResult = {
# @type {string}
'start_time': timestamp,
# @type {string}
'end_time': timestamp,
'properties': transformProps(props, row),
# @type {string}
'type': 'Feature',
'geometry': STATION_GEOMETRY
}
# Enable this if the raw data needs to be kept.
# newResult['properties']['_raw'] = {
# 'data': row,
# 'units': prop_units,
# 'sample_method': prop_sample_method
# }
results.append(newResult)
return results
# ----------------------------------------------------------------------
# Aggregate the list of parsed results.
# The aggregation starts with the input data and no state given.
# This function returns a list of aggregated data packages and a state package
# which should be fed back into the function to continue or end the aggregation.
# If there's no more data to input, provide None and the aggregation will stop.
# When aggregation ended, the state package returned should be None to indicate that.
# Note: data has to be sorted by time.
# Note: cutoffSize is in seconds.
def aggregate(cutoffSize, tz, inputData, state):
# This function should always return this complex package no matter what happens.
result = {
'packages': [],
# In case the input data does nothing, inherit the state first.
'state': None if state == None else dict(state)
}
# The aggregation ends when no more data is available. (inputData is None)
# In which case it needs to recover leftover data in the state package.
if inputData == None:
debug_log('Ending aggregation...')
# The aggregation is ending, try recover leftover data from the state.
if state == None:
# There is nothing to do.
pass
else:
# Recover leftover data from state.
data = state['leftover']
if len(data) == 0:
# There is nothing to recover.
pass
else:
# Aggregate leftover data.
# Assume leftover data never contain more data than the cutoff allows.
startTime = state['starttime']
# Use the latest date in the data entries.
# Assuming the data is always sorted, the last one should be the latest.
endTime = ISOTimeString2TimeStamp(data[-1]['end_time'])
newPackage = aggregate_chunk(data, tz, startTime, endTime)
if newPackage != None:
result['packages'].append(newPackage)
# Mark state with None to indicate the aggregation is done.
result['state'] = None
else:
debug_log('Aggregating...')
data = inputData
# More data is provided, continue aggregation.
if state == None:
debug_log('Fresh start...')
# There is no previous state, starting afresh.
# Use the earliest date in the input data entries.
# Assuming the input data is always sorted, the first one should be the earliest.
startTime = ISOTimeString2TimeStamp(data[0]['start_time'])
else:
debug_log('Continuing...')
# Resume aggregation from a previous state.
startTime = state['starttime']
# Left over data should be part of the data being processed.
data = state['leftover'] + inputData
startIndex = 0
# Keep aggregating until all the data is consumed.
while startIndex < len(data):
# Find the nearest cut-off point.
endTimeCutoff = startTime - startTime % cutoffSize + cutoffSize
# Scan the input data to find the portion that fits in the cutoff.
endIndex = startIndex
while endIndex < len(data) and ISOTimeString2TimeStamp(data[endIndex]['end_time']) < endTimeCutoff:
endIndex += 1
# If everything fits in the cutoff, there may be more data in the next run.
# Otherwise, these data should be aggregated.
if endIndex >= len(data):
# End of data reached, but cutoff is not.
# Save everything into state.
result['state'] = {
'starttime': startTime,
'leftover': data[startIndex:]
}
else:
# Cutoff reached.
# Aggregate this chunk.
newPackage = aggregate_chunk(data[startIndex:endIndex], tz, startTime, endTimeCutoff)
if newPackage != None:
result['packages'].append(newPackage)
# Update variables for the next loop.
startTime = endTimeCutoff
startIndex = endIndex
# The above loop should end with some chunks aggregated into result['packages'],
# with the last chunk saved in result['state']['leftover']
return result
# Helper function for aggregating a chunk of data.
# @param {timestamp} startTime
# @param {timestamp} endTime
def aggregate_chunk(dataChunk, tz, startTime, endTime):
if len(dataChunk) == 0:
# There is nothing to aggregate.
return None
else:
# Prepare the list of properties for aggregation.
propertiesList = [x['properties'] for x in dataChunk]
return {
'start_time': datetime.datetime.fromtimestamp(startTime, tz).isoformat(),
'end_time': datetime.datetime.fromtimestamp(endTime, tz).isoformat(),
'properties': aggregateProps(propertiesList),
'type': 'Point',
'geometry': STATION_GEOMETRY
}
def aggregateProps(propertiesList):
collection = {}
for properties in propertiesList:
for key in properties:
# Properties start with "_" shouldn't be processed.
if key.startswith('_'):
continue
value = properties[key]
# Collect property values and save them into collection
if key not in collection:
collection[key] = [value]
else:
collection[key].append(value)
result = {}
for key in properties:
# If there is no aggregation function, ignore the property.
if key not in PROP_AGGREGATE:
continue
func = PROP_AGGREGATE[key]
result[key] = func(collection[key])
return result
if __name__ == "__main__":
size = 5 * 60
tz = dateutil.tz.tzoffset("-07:00", -7 * 60 * 60)
packages = []
file = './test-input-1.dat'
parse = parse_file(file, tz)
result = aggregate(
cutoffSize=size,
tz=tz,
inputData=parse,
state=None
)
packages += result['packages']
print(json.dumps(result['state']))
file = './test-input-2.dat'
parse = parse_file(file, tz)
result = aggregate(
cutoffSize=size,
tz=tz,
inputData=parse,
state=result['state']
)
packages += result['packages']
print(json.dumps(result['state']))
result = aggregate(
cutoffSize=size,
tz=tz,
inputData=None,
state=result['state']
)
packages += result['packages']
print('Package Count: %s' % (len(packages)))
packages = []
file = './test-input-3.dat'
parse = parse_file(file, tz)
result = aggregate(
cutoffSize=size,
tz=tz,
inputData=parse,
state=result['state']
)
packages += result['packages']
result = aggregate(
cutoffSize=size,
tz=tz,
inputData=None,
state=result['state']
)
packages += result['packages']
print('Package Count: %s' % (len(packages)))
print(json.dumps(packages))