aboutsummaryrefslogtreecommitdiff
path: root/aggregator/dwd_icon.py
blob: 6df905cb150d0e826222a30a75f5c5692e7ad18a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3

import requests
import datetime
import pytz
import requests
import os

from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool

import subprocess

import xarray as xr

import misc

BASE='https://opendata.dwd.de/weather/nwp'

def get_current_run():
    # we allow up to 3h of slack for DWD to upload the latest run
    tz = pytz.timezone('UTC')
    now = datetime.datetime.now(datetime.timezone.utc)
    corrected = now - datetime.timedelta(hours=3)

    run = int(corrected.hour / 6) * 6

    return (f'{run:02d}', corrected.strftime('%Y%m%d'))

def download_url(args):
    url, dest = args
    r = requests.get(url)
    try:
        with open(dest, 'wb') as f:
            f.write(r.content)
        print(f'Downloaded {dest}')
    except Exception as e:
        print(f'Failed to download {dest}:\n', e)

def unpack_bz2(dest):
    res = subprocess.run(['bzip2', '-df', dest])
    if res.returncode != 0:
        print(f'There was an error unpacking {dest}:', res.stderr)

def download_dwd_gribs(
        date, run, target, output, model, steps, model_long,
        pressure_level_parameters, parameter_caps_in_filename,
        single_level_parameters, pressure_levels
):
    misc.create_output_dir(output)

    to_download = []

    for step in steps:
        step_str = f'{step:03d}'

        for parameter in pressure_level_parameters:
            parameter2 = parameter.upper() if parameter_caps_in_filename else parameter

            for level in pressure_levels:
                filename = f'{model_long}_regular-lat-lon_pressure-level_{date}{run}_{step_str}_{level}_{parameter2}.grib2.bz2'
                URL = f'{BASE}/{model}/grib/{run}/{parameter}/{filename}'

                to_download.append((URL, os.path.join(output, filename)))

        for parameter in single_level_parameters:
            parameter2 = parameter.upper() if parameter_caps_in_filename else parameter
            filename = f'{model_long}_regular-lat-lon_single-level_{date}{run}_{step_str}_{parameter2}.grib2.bz2'
            URL = f'{BASE}/{model}/grib/{run}/{parameter}/{filename}'

            to_download.append((URL, os.path.join(output, filename)))


    for _ in ThreadPool(cpu_count()).imap_unordered(download_url, to_download):
        pass

    print('Done Downloading. Uncompressing...')

    for _ in ThreadPool(cpu_count()).imap_unordered(unpack_bz2, [dest for _, dest in to_download]):
        pass

    downloaded_gribs = [dest.removesuffix('.bz2') for _, dest in to_download]

    res = subprocess.run(['grib_copy'] + downloaded_gribs + [target])
    if res.returncode != 0:
        print('grib_copy failed with: ', res.stderr)

    res = subprocess.run(['rm', '-f'] + downloaded_gribs)
    if res.returncode != 0:
        print('rm failed with: ', res.stderr)

def clean_output_dir(directory, target):
    to_delete = [f for f  in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
    if target in to_delete:
        del to_delete[to_delete.index(target)]

    for f in to_delete:
        os.unlink(os.path.join(directory, f))

def load_data(name, output, description = None, clean = False, force_filename = None, **kwargs):
    target = force_filename

    if target is None:
        run, date = get_current_run()
        filename = f'{name}_{date}_{run}.grib2'
        target = os.path.join(output, filename)

        if not os.path.exists(target):
            download_dwd_gribs(date, run, target, output, **kwargs)
        else:
            print(f'{target} already exists. Using the cached version.')

        if clean:
            clean_output_dir(output, filename)

    # we drop heightAboveGround to allow 2m and 10m values to be merged down to one dataset
    ds = xr.load_dataset(target, engine='cfgrib', drop_variables='heightAboveGround')

    if description is not None:
        ds.attrs['_description'] = description
    return ds


debug_config = {
    'output':'dwd_icon-eu',
    'model':'icon-eu',
    'model_long':'icon-eu_europe',
    'clean': True,
    'parameter_caps_in_filename':True,
    'pressure_level_parameters': [
        't',
        'relhum',
        'u',
        'v',
        'fi',
        'clc'
    ],
    'single_level_parameters': [
        'pmsl',
        't_2m',
        'relhum_2m'
    ],
    'pressure_levels':[ 1000, 950, 925, 900, 875, 850, 825, 800, 775, 700, 600, 500, 400, 300, 250, 200, 150, 100 ],
    'steps':[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48]
}

if __name__ == '__main__':
    load_data('test_icon_eu', **debug_config)