1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
from typing import Callable
import yaml
import json
import os
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
def run_if_present(key, dct: dict, func: Callable, *args, **kwargs):
if key in dct:
func(dct[key], *args, **kwargs)
class Manager:
def __init__(self, filename: str = 'metchart.yaml'):
self.aggregators={}
self._plotters=[]
self._filename = filename
self._output_dir = './web/data'
self._thread_count = max(cpu_count()-1, 1)
self._load()
self._parse()
def run_plotters(self):
index = ThreadPool(self._thread_count).map(lambda p: p['module'].run(**p['cfg']), self._plotters)
with open(os.path.join(self._output_dir, 'index.json'), 'w') as f:
f.write(json.dumps(index, indent=4))
def _load(self):
with open(self._filename, 'r') as f:
self._raw_config = yaml.safe_load(f)
def _parse(self):
run_if_present('output', self._raw_config, self._parse_output)
run_if_present('thread_count', self._raw_config, self._parse_thread_count)
run_if_present('aggregator', self._raw_config, self._parse_module, self._load_aggregator)
run_if_present('modifier', self._raw_config, self._parse_module, self._load_modifier)
run_if_present('plotter', self._raw_config, self._parse_module, self._prepare_plotter)
def _parse_module(self, data, then: Callable):
# TODO abstraction prbly off. anonymous reeks
anonymous = False
if type(data) is list:
anonymous = True
for key in data:
cfg = data[key] if not anonymous else key
if 'module' not in cfg:
print(f'ERROR: {key} is missing the "module" keyword.')
continue
classname = cfg['module']
del cfg['module']
module = __import__(classname, fromlist=[None])
then(key if not anonymous else None, module, cfg)
def _load_aggregator(self, name: str, module, cfg):
self.aggregators[name] = module.load_data(name=name, **cfg)
def _load_modifier(self, name: str, module, cfg):
if 'aggregator' in cfg:
if type(cfg['aggregator']) == list:
cfg['data'] = []
for ag in cfg['aggregator']:
cfg['data'].append(self.aggregators[ag])
del cfg['aggregator']
else:
cfg['data'] = self.aggregators[cfg['aggregator']]
del cfg['aggregator']
self.aggregators[name] = module.run(**cfg)
def _prepare_plotter(self, _name, module, cfg):
if 'aggregator' in cfg:
cfg['data'] = self.aggregators[cfg['aggregator']]
del cfg['aggregator']
self._plotters.append({
'module': module,
'cfg': cfg
})
def _parse_output(self, data: str):
self._output_dir = data
def _parse_thread_count(self, data: int):
self._thread_count = data
|