1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
#!/usr/bin/env python3
import requests
import datetime
import pytz
import requests
import os
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
import subprocess
import xarray as xr
import misc
BASE='https://opendata.dwd.de/weather/nwp'
def get_current_run():
# we allow up to 3h of slack for DWD to upload the latest run
tz = pytz.timezone('UTC')
now = datetime.datetime.now(datetime.timezone.utc)
corrected = now - datetime.timedelta(hours=3)
run = int(corrected.hour / 6) * 6
return (f'{run:02d}', corrected.strftime('%Y%m%d'))
def download_url(args):
url, dest = args
r = requests.get(url)
try:
with open(dest, 'wb') as f:
f.write(r.content)
print(f'Downloaded {dest}')
except Exception as e:
print(f'Failed to download {dest}:\n', e)
def unpack_bz2(dest):
res = subprocess.run(['bzip2', '-df', dest])
if res.returncode != 0:
print(f'There was an error unpacking {dest}:', res.stderr)
def download_dwd_gribs(
date, run, target, output, model, steps, model_long,
pressure_level_parameters, parameter_caps_in_filename,
single_level_parameters, pressure_levels
):
misc.create_output_dir(output)
to_download = []
for step in steps:
step_str = f'{step:03d}'
for parameter in pressure_level_parameters:
parameter2 = parameter.upper() if parameter_caps_in_filename else parameter
for level in pressure_levels:
filename = f'{model_long}_regular-lat-lon_pressure-level_{date}{run}_{step_str}_{level}_{parameter2}.grib2.bz2'
URL = f'{BASE}/{model}/grib/{run}/{parameter}/{filename}'
to_download.append((URL, os.path.join(output, filename)))
for parameter in single_level_parameters:
parameter2 = parameter.upper() if parameter_caps_in_filename else parameter
filename = f'{model_long}_regular-lat-lon_single-level_{date}{run}_{step_str}_{parameter2}.grib2.bz2'
URL = f'{BASE}/{model}/grib/{run}/{parameter}/{filename}'
to_download.append((URL, os.path.join(output, filename)))
for _ in ThreadPool(cpu_count()).imap_unordered(download_url, to_download):
pass
print('Done Downloading. Uncompressing...')
for _ in ThreadPool(cpu_count()).imap_unordered(unpack_bz2, [dest for _, dest in to_download]):
pass
downloaded_gribs = [dest.removesuffix('.bz2') for _, dest in to_download]
res = subprocess.run(['grib_copy'] + downloaded_gribs + [target])
if res.returncode != 0:
print('grib_copy failed with: ', res.stderr)
res = subprocess.run(['rm', '-f'] + downloaded_gribs)
if res.returncode != 0:
print('rm failed with: ', res.stderr)
def load_data(name, output, description = None, **kwargs):
run, date = get_current_run()
target = os.path.join(output, f'{name}_{date}_{run}.grib2')
if not os.path.exists(target):
download_dwd_gribs(date, run, target, output, **kwargs)
else:
print(f'{target} alreasy exists. Using the cached version.')
ds = xr.load_dataset(target, engine='cfgrib')
if description is not None:
ds.attrs['_description'] = description
return ds
debug_config = {
'output':'dwd_icon-eu',
'model':'icon-eu',
'model_long':'icon-eu_europe',
'parameter_caps_in_filename':True,
'pressure_level_parameters': [
't',
'relhum',
'u',
'v',
'fi',
'clc'
],
'single_level_parameters': [
'pmsl',
't_2m',
'relhum_2m'
],
'pressure_levels':[ 1000, 950, 925, 900, 875, 850, 825, 800, 775, 700, 600, 500, 400, 300, 250, 200, 150, 100 ],
'steps':[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48]
}
if __name__ == '__main__':
load_data('test_icon_eu', **debug_config)
|