aboutsummaryrefslogtreecommitdiff
path: root/metchart/manager.py
diff options
context:
space:
mode:
authorGravatar Jonas Gunz <himself@jonasgunz.de> 2024-10-18 01:40:49 +0200
committerGravatar Jonas Gunz <himself@jonasgunz.de> 2024-10-18 01:40:49 +0200
commit178138d4412aba7379393872c070b3718c94d58f (patch)
tree5f5f9ba233cfb58ef4000a9bf21d8f51efc95816 /metchart/manager.py
parent244e7a27a9a6dca97e0a21bd29bd49e463e243e5 (diff)
downloadmeteo_toolbox-178138d4412aba7379393872c070b3718c94d58f.tar.gz
migrate logic to manager
Diffstat (limited to 'metchart/manager.py')
-rw-r--r--metchart/manager.py95
1 files changed, 95 insertions, 0 deletions
diff --git a/metchart/manager.py b/metchart/manager.py
new file mode 100644
index 0000000..a7e3eaa
--- /dev/null
+++ b/metchart/manager.py
@@ -0,0 +1,95 @@
+from typing import Callable
+
+import yaml
+import json
+import os
+
+from multiprocessing import cpu_count
+from multiprocessing.pool import ThreadPool
+
+
+def run_if_present(key, dct: dict, func: Callable, *args, **kwargs):
+ if key in dct:
+ func(dct[key], *args, **kwargs)
+
+
+class Manager:
+ def __init__(self, filename: str = 'metchart.yaml'):
+ self.aggregators={}
+ self._plotters=[]
+
+ self._filename = filename
+ self._output_dir = './web/data'
+ self._thread_count = max(cpu_count()-1, 1)
+
+ self._load()
+ self._parse()
+
+ def run_plotters(self):
+ index = ThreadPool(self._thread_count).map(lambda p: p['module'].run(**p['cfg']), self._plotters)
+
+ with open(os.path.join(self._output_dir, 'index.json'), 'w') as f:
+ f.write(json.dumps(index, indent=4))
+
+ def _load(self):
+ with open(self._filename, 'r') as f:
+ self._raw_config = yaml.safe_load(f)
+
+ def _parse(self):
+ run_if_present('output', self._raw_config, self._parse_output)
+ run_if_present('thread_count', self._raw_config, self._parse_thread_count)
+ run_if_present('aggregator', self._raw_config, self._parse_module, self._load_aggregator)
+ run_if_present('modifier', self._raw_config, self._parse_module, self._load_modifier)
+
+ run_if_present('plotter', self._raw_config, self._parse_module, self._prepare_plotter)
+
+ def _parse_module(self, data, then: Callable):
+ # TODO abstraction prbly off. anonymous reeks
+ anonymous = False
+ if type(data) is list:
+ anonymous = True
+
+ for key in data:
+ cfg = data[key] if not anonymous else key
+
+ if 'module' not in cfg:
+ print(f'ERROR: {key} is missing the "module" keyword.')
+ continue
+
+ classname = cfg['module']
+ del cfg['module']
+ module = __import__(classname, fromlist=[None])
+
+ then(key if not anonymous else None, module, cfg)
+
+ def _load_aggregator(self, name: str, module, cfg):
+ self.aggregators[name] = module.load_data(name=name, **cfg)
+
+ def _load_modifier(self, name: str, module, cfg):
+ if 'aggregator' in cfg:
+ if type(cfg['aggregator']) == list:
+ cfg['data'] = []
+ for ag in cfg['aggregator']:
+ cfg['data'].append(self.aggregators[ag])
+
+ del cfg['aggregator']
+ else:
+ cfg['data'] = self.aggregators[cfg['aggregator']]
+ del cfg['aggregator']
+
+ self.aggregators[name] = module.run(**cfg)
+
+ def _prepare_plotter(self, _name, module, cfg):
+ if 'aggregator' in cfg:
+ cfg['data'] = self.aggregators[cfg['aggregator']]
+ del cfg['aggregator']
+
+ self._plotters.append({
+ 'module': module,
+ 'cfg': cfg
+ })
+
+ def _parse_output(self, data: str):
+ self._output_dir = data
+ def _parse_thread_count(self, data: int):
+ self._thread_count = data