4 Created on Mon Nov 2 15:09:44 2015
5 script to run analytic uncertainty analysis of rrs data
16 import multiprocessing
as mp
19 from datetime
import datetime
as DT
24 Class to get Rrs uncertainties for a given L2 granule. Includes methods to:
25 * get baseline from L2 granule
26 * calculate uncertainties (as rmse) from corresponding perturbed L2
28 * save uncertainty variables in original unperturbed granule
32 2- noisyDir -- directory where noisy files are located
37 self.
logger = logging.getLogger(
'%s.MakeUnc' % parent_logger_name)
47 self.
dimsDict = dict.fromkeys([
'rrs_unc_%s' % band
for band
in self.bands])
48 self.
dTypeDict = dict.fromkeys([
'rrs_unc_%s' % band
for band
in self.bands])
53 self.
dimsDict.
update(dict.fromkeys([
'lt_unc_%s' % band
for band
in self.bands]))
54 self.
dTypeDict.
update(dict.fromkeys([
'lt_unc_%s' % band
for band
in self.bands]))
60 self.
logger.info(
'chl_a_unc will be included')
61 otherProdkeys.append(
'chlor_a')
62 otherProdkeys.append(
'chlor_a_unc')
64 self.
logger.info(
'nflh_unc will be included')
65 otherProdkeys.append(
'nflh')
66 otherProdkeys.append(
'nflh_unc')
69 attrUncKeys = [x
for x
in otherProdkeys
if re.search(
'_unc', x)]
76 Method to copy backup of unprocessed silent L2
77 This so as not to redo entire processing if a problem arises.
78 If a copy already exists, it is assumed this is not the first
79 processing attempt and the silent L2 is now tainted. It is removed and
80 a clean copy is generated from the backup.
84 if os.path.exists(cpy):
86 self.
logger.info(
'%s already exists. Removing original %s' % (cpy,
89 shutil.copy2(cpy, orig)
90 self.
logger.info(
'Copying silent from %s' % cpy)
92 shutil.copy2(orig, cpy)
93 self.
logger.info(
'No copy for %s. Generating copy' % self.
silFile)
98 with nc.Dataset(self.
silFile,
'r+')
as dsSil:
99 geoGr = dsSil.groups[
'geophysical_data']
100 geoVar = geoGr.variables
101 for band
in self.bands:
102 rrsUncProdName =
'Rrs_unc_%s' % band
103 if rrsUncProdName
not in geoVar:
105 varRrsUnc = geoGr.createVariable(rrsUncProdName,
112 varRrsUnc = geoVar[rrsUncProdName]
115 ltUncProdName =
'Lt_unc_%s' % band
116 if ltUncProdName
not in geoVar:
117 varLtUnc = geoGr.createVariable(ltUncProdName,
122 varLtUnc = geoVar[ltUncProdName]
125 if 'chlor_a_unc' not in geoVar:
126 varChloraUnc = geoGr.createVariable(
'chlor_a_unc',
131 varChloraUnc = geoVar[
'chlor_a_unc']
135 if 'nflh_unc' not in geoVar:
136 self.
logger.info(
'nflh_unc there; creating variable...')
137 varNflhUnc = geoGr.createVariable(
'nflh_unc',
142 self.
logger.info(
'nflh_unc available, using existing variable...')
143 varNflhUnc = geoVar[
'nflh_unc']
150 Calculates rrs uncertainty as st.dev of rrs. Note that to save time
151 I use unperturbed rrs as the rrs baseline for the simulation
153 fBaseName = self.
silFile.split(
'/')[-1].split(
'.')[0].split(
'_')[0]
154 matchFilPatt = os.path.join(self.
noisyDir,
'%s*%s*' % (fBaseName, noisySfx))
155 self.
logger.info(
"searching for %s..." % matchFilPatt)
156 flis = glob.glob(matchFilPatt)
159 self.
logger.error(
'No files to process with pattern %s' % matchFilPatt)
162 self.
logger.info(
"%d files to be processed" % lflis)
163 rrsAggDataDict = {band: np.zeros_like(self.
rrsSilDict[band])
166 ltAggDataDict = {band: np.zeros_like(self.
ltSilDict[band])
173 for fcount, fname
in enumerate(flis):
174 prcDone = 100 * fcount/ (lflis - 1)
175 self.
logger.info(
"Loading and reading %s -- %.1f%%" %
177 with nc.Dataset(fname)
as nds:
178 nGeoGr = nds.groups[
'geophysical_data']
179 nGeoVar = nGeoGr.variables
180 for band
in self.bands:
182 noisyRrs = nGeoVar[
'Rrs_%s' % band][:]
183 rrsAggDataDict[band] += (noisyRrs -
185 except KeyError
as e:
188 noisyLt = nGeoVar[
'Lt_%s' % band][:]
189 ltAggDataDict[band] += (noisyLt -
192 noisyChl = nGeoVar[
'chlor_a'][:]
195 noisyNflh = nGeoVar[
'nflh'][:]
198 for band
in self.bands:
199 self.
logger.debug(
"computing deviation for band %s" % band)
201 self.
rrsUncArrDict[band] = np.ma.sqrt(rrsAggDataDict[band]/ lflis)
205 self.
ltUncArrDict[band] = np.sqrt(ltAggDataDict[band]/ lflis)
206 self.
logger.debug(
'running sanity check for band %s' % band)
210 self.
logger.debug(
'computing deviation for chlor a')
214 self.
logger.debug(
'computing deviation for nflh')
215 self.
logger.info(
"\nProcessed %d files " % lflis)
219 '''Reads Baseline file
220 Flags: l2bin default flags, namely ATMFAIL(1), LAND(2), HIGLINT(8),
221 HILT(16), HISATZEN(32), STRAYLIGHT(256), CLDICE(512),
222 COCCOLITH(1024), HISOLZEN(4096), LOWLW(16384), CHLFAIL(32768),
223 NAVWARN(65536), MAXAERITER(524288), CHLWARN(2097152),
224 ATMWARN(4194304), NAVFAIL(33554432), FILTER(67108864)
225 flagBits = 1 + 2 + 8 + 16 + 32 + 256 + 512 + 1024 + 4096 + 16384 +
226 32768 + 65536 + 524288 + 2097152 + 4194304 + 33554432 + 67108864
227 l2flags = geoVar['l2_flags'][:]
228 flagMaskArr = (l2flags & flagBits > 0)
230 self.
logger.debug(
'attemping to open silent file %s' % self.
silFile)
231 with nc.Dataset(self.
silFile,
'r')
as dsSil:
232 geoGr = dsSil.groups[
'geophysical_data']
233 geoVar = geoGr.variables
234 for band
in self.bands:
236 rrs = geoVar[
'Rrs_%s' % band]
238 self.
attrRrsUncDict[band] = {
'long_name':
'Uncertainty in %s' % rrs.long_name,
239 '_FillValue': rrs._FillValue,
'units': rrs.units,
240 'scale_factor': rrs.scale_factor,
241 'add_offset': rrs.add_offset}
242 self.
dimsDict[
'rrs_unc_%s' % band] = rrs.dimensions
243 self.
dTypeDict[
'rrs_unc_%s' % band] = rrs.dtype
244 except KeyError
as e:
245 self.
logger.info(
"Rrs_%s not available, skipping" % band)
248 self.
dimsDict.pop(
'rrs_unc_%s' % band)
251 self.
logger.debug(
'setting up to run sanity check for band %s' % band)
252 lt = geoVar[
'Lt_%s' % band]
254 self.
attrLtUncDict[band] = {
'long_name':
'Uncertainty in %s' % lt.long_name,
255 '_FillValue': lt._FillValue,
'units': lt.units}
256 self.
dimsDict[
'lt_unc_%s' % band] = lt.dimensions
257 self.
dTypeDict[
'lt_unc_%s' % band] = lt.dtype
259 self.
logger.debug(
'setting up to compute chla uncertainty')
260 chla = geoVar[
'chlor_a']
263 'Uncertainty in %s' % chla.long_name,
264 '_FillValue': chla._FillValue,
267 self.
dTypeDict[
'chlor_a_unc'] = chla.dtype
268 self.
dimsDict[
'chlor_a_unc'] = chla.dimensions
270 self.
logger.debug(
'setting up to compute nflh uncertainty')
271 nflh = geoVar[
'nflh']
274 'Uncertainty in %s' % nflh.long_name,
275 '_FillValue': nflh._FillValue,
277 'scale_factor': nflh.scale_factor,
278 'add_offset': nflh.add_offset}
279 self.
dimsDict[
'nflh_unc'] = nflh.dimensions
285 """Uncertainty subclass for SeaWiFS"""
289 [
'412',
'443',
'490',
'510',
'555',
'670',
291 self.
colDict = {
'412':
'#001166',
'443':
'#004488',
'490':
'#116688',
292 '510':
'#228844',
'555':
'#667722',
'670':
'#aa2211',
293 '765':
'#770500',
'865':
'#440000'}
294 super(MakeSwfUnc, self).
__init__(*args)
299 """Uncertainty engine for HMODISA"""
303 [
'412',
'443',
'488',
'531',
'547',
'555',
304 '645',
'667',
'678',
'748',
'859',
'869',
305 '1240',
'1640',
'2130'])
306 super(MakeHMA, self).
__init__(*args)
307 self.
colDict = {
'412':
'#001166',
'443':
'#004488',
'488':
'#1166FF',
308 '531':
'#337722',
'547':
'#557733',
'555':
'#669922',
309 '645':
'#883311',
'667':
'#aa2211',
'678':
'#dd3300'}
315 l2PathsGen = glob.iglob(matchPattern)
316 spatt = re.compile(
'(S[0-9]+)')
317 for l2path
in l2PathsGen:
318 if os.path.isdir(l2path):
319 basename = spatt.findall(l2path)[0]
320 l2Pa = os.path.join(l2MainPath, basename)
321 silFiPa = os.path.join(l2Pa, basename) +
'_silent.L2'
322 noiDiPa = os.path.join(l2Pa,
'Noisy/')
325 yield [silFiPa, noiDiPa]
330 Class to manage complete uncertainty generation; from processing of L1As to
331 creation of uncertainty from noisy L2 files, to the final packing of new
332 uncertainty products into the baseline L2.
337 Takes a directory containing L2 directories
341 if self.
pArgs.sensor ==
'SeaWiFS':
345 def _BatchRun(self, sArgs):
348 uncObj.ReadFromSilent()
349 uncObj.BuildUncs(self.
pArgs.nsfx)
350 uncObj.WriteToSilent()
351 return uncObj.silFile
356 with mp.Pool()
as pool:
357 results = pool.map(self.
_BatchRun, paramGen)
362 parser = argparse.ArgumentParser(prog=
"MakeUnc")
363 parser.add_argument(
'-i',
'--ifile', help=
'Initial L2 file path.',
365 parser.add_argument(
'-j',
'--ipath',
366 help=
'Main L2 path for batch processing.', type=str)
367 parser.add_argument(
'-n',
'--npath', help=
'Path to noisy data directory.',
369 parser.add_argument(
'-s',
'--nsfx',
370 help=
'Noisy file suffix for pattern matching.',
371 type=str, default=
'_noisy_')
372 parser.add_argument(
'-c',
'--dochl', help=
'Compute chloropyll uncertainty. Default is False',
373 action=
'store_true', default=
False)
374 parser.add_argument(
'-f',
'--doflh',
375 help=
'Compute normalized fluorescence line height. Default is False',
376 action=
'store_true', default=
False)
377 parser.add_argument(
'-p',
'--psafe', help=
'Back source file up. Default is False',
378 action=
'store_true', default=
False)
379 parser.add_argument(
'-a',
'--sanity', help=
'Do sanity check. Default is False',
380 action=
'store_true', default=
False)
381 parser.add_argument(
'-b',
'--batch', help=
'Batch processing option. Default is False',
382 action=
'store_true', default=
False)
383 parser.add_argument(
'-w',
'--workers',
384 help=
'Number of concurrent processes. Default is 1',
386 parser.add_argument(
'-z',
'--sensor',
387 help=
'Specify sensor data originates from. Default is SeaWiFS',
388 type=str, default=
'SeaWiFS')
389 parser.add_argument(
'-e',
'--debug', help=
'increase output verbosity',
390 action=
'store_true', default=
False)
391 parsedArgs = parser.parse_args(args)
393 if ((
not parsedArgs.ifile
and not parsedArgs.batch)
or
394 (parsedArgs.batch
and not parsedArgs.ipath)):
403 sets logger with more verbose format if dbg_lvl=True
405 logger_name =
'MakeUNC_%s_T_%s' % (DT.date(DT.now()), DT.time(DT.now()))
406 logfn =
'%s.log' % logger_name
407 logger = logging.getLogger(logger_name)
409 level = logging.DEBUG
410 formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s -'
411 +
' [%(module)s..%(funcName)s..%(lineno)d]'
415 formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
416 logger.setLevel(level)
417 fh = logging.FileHandler(logfn)
419 fh.setFormatter(formatter)
420 ch = logging.StreamHandler()
421 ch.setLevel(logging.WARNING)
422 ch.setFormatter(formatter)
423 logger.addHandler(ch)
424 logger.addHandler(fh)
425 logger.debug(
'logging')
436 mainLogger.info(
'Initializing batch processor')
438 res = bRunner.ProcessL2s()
439 pickle.dump(res, open(
'L2BatchList.pkl',
'wb'))
441 baseLineFile = pArgs.ifile
442 noisyDataDir = pArgs.npath
443 noisySfx = pArgs.nsfx
444 baseLineFname = baseLineFile.split(
'/')[-1]
445 if noisyDataDir[-1] !=
'/':
447 if baseLineFname[0] ==
'S':
448 mainLogger.info(
'processing SeaWiFS data')
450 elif baseLineFname[0] ==
'A':
451 mainLogger.info(
'processing MODISA data')
453 uncObj.ReadFromSilent()
454 uncObj.BuildUncs(noisySfx)
455 uncObj.WriteToSilent()
457 if __name__ ==
"__main__":