Skip to content

Commit

Permalink
historic param values
Browse files Browse the repository at this point in the history
  • Loading branch information
fenke committed Jun 18, 2024
1 parent 45ae9f3 commit 8aed56c
Show file tree
Hide file tree
Showing 5 changed files with 881 additions and 92 deletions.
11 changes: 9 additions & 2 deletions corebridge/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
'corebridge.aicorebridge.AICoreModule.get_callargs': ( 'aicorebridge.html#aicoremodule.get_callargs',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.infer': ( 'aicorebridge.html#aicoremodule.infer',
'corebridge/aicorebridge.py')},
'corebridge.core': { 'corebridge.core.set_time_index_zone': ('core.html#set_time_index_zone', 'corebridge/core.py'),
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.init_annotated_param': ( 'aicorebridge.html#aicoremodule.init_annotated_param',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.build_historic_args': ( 'aicorebridge.html#build_historic_args',
'corebridge/aicorebridge.py')},
'corebridge.core': { 'corebridge.core.pop_nan_values': ('core.html#pop_nan_values', 'corebridge/core.py'),
'corebridge.core.set_time_index_zone': ('core.html#set_time_index_zone', 'corebridge/core.py'),
'corebridge.core.timeseries_dataframe': ('core.html#timeseries_dataframe', 'corebridge/core.py'),
'corebridge.core.timeseries_dataframe_from_datadict': ( 'core.html#timeseries_dataframe_from_datadict',
'corebridge/core.py'),
'corebridge.core.timeseries_dataframe_resample': ( 'core.html#timeseries_dataframe_resample',
'corebridge/core.py'),
'corebridge.core.timeseries_dataframe_to_datadict': ( 'core.html#timeseries_dataframe_to_datadict',
'corebridge/core.py')}}}
124 changes: 100 additions & 24 deletions corebridge/aicorebridge.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/01_aicorebridge.ipynb.

# %% auto 0
__all__ = ['syslog', 'AICoreModule']
__all__ = ['syslog', 'annotated_arg_builders', 'build_historic_args', 'AICoreModule']

# %% ../nbs/01_aicorebridge.ipynb 4
import typing
Expand All @@ -10,9 +10,11 @@
import inspect
import datetime
import json
import pandas as pd
import numpy as np
import os
import pandas, pandas as pd
import numpy, numpy as np

from dateutil import parser
from fastcore.basics import patch_to, patch
from .core import *
from . import __version__
Expand All @@ -28,9 +30,42 @@
pass

# %% ../nbs/01_aicorebridge.ipynb 7
def build_historic_args(data, history):
if not history:
return {}

if isinstance(history, dict):
return history

if not isinstance(history, list):
return {}

if isinstance(data, pd.DataFrame):
dates = data.index.astype(np.int64).astype(np.float64) / 1e9
dates = dates.to_numpy()
elif data.dtype.names is not None:
dates = data.view(dtype=np.float64).reshape(data.shape[0],len(data.dtype))[:,0]
else:
dates = data[:,0]
dates = dates.astype(np.int64)

columns = list(set([K for I in history for K in I.keys() if K != 'startDate']))
column_data = {K:np.full(len(dates), np.nan, dtype=np.float64) for K in columns}

for I in history:
date = parser.parse(str((I.pop('startDate','2000-01-01T00:00:00+00:00')))).timestamp()
mask = np.greater_equal(dates, date)
for K,V in I.items():
column_data[K][mask] = V

return column_data
#return pd.DataFrame(column_data, index=data.index)


# %% ../nbs/01_aicorebridge.ipynb 10
class AICoreModule(): pass

# %% ../nbs/01_aicorebridge.ipynb 8
# %% ../nbs/01_aicorebridge.ipynb 11
@patch
def __init__(self:AICoreModule,
processor:typing.Callable, # data processing function
Expand All @@ -50,7 +85,7 @@ def __init__(self:AICoreModule,



# %% ../nbs/01_aicorebridge.ipynb 9
# %% ../nbs/01_aicorebridge.ipynb 12
@patch
def _init_processor(
self:AICoreModule,
Expand All @@ -64,14 +99,14 @@ def _init_processor(
self.data_param, *self.call_params = list(self.processor_params.keys())


# %% ../nbs/01_aicorebridge.ipynb 10
# %% ../nbs/01_aicorebridge.ipynb 13
# can be overloaded
@patch
def call_processor(self:AICoreModule, calldata, **callargs):
return self.processor(calldata, **callargs)


# %% ../nbs/01_aicorebridge.ipynb 12
# %% ../nbs/01_aicorebridge.ipynb 15
@patch
def infer(self:AICoreModule, data:dict, *_, **kwargs):
try:
Expand All @@ -88,29 +123,43 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs):
timezone = kwargs.get('timezone', 'UTC')
msg.append(f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}")

samplerPeriod = kwargs.pop('samplerPeriod', 'h')
samplerMethod = kwargs.pop('samplerMethod', None)

calldata = self.get_call_data(
data,
recordformat=recordformat,
timezone=timezone,
reversed=reversed)
timezone=timezone)

msg.append(f"calldata shape: {calldata.shape}")

callargs = self.get_callargs(**kwargs)
history = build_historic_args(calldata, kwargs.pop('history', {}))

callargs = self.get_callargs(kwargs, history)

for arg, val in callargs.items():
msg.append(f"{arg}: {val}")

result = self.call_processor(calldata, **callargs)
msg.append(f"result shape: {result.shape}")

result = timeseries_dataframe(result, timezone=timezone)
if reversed:
result = result[::-1]

if samplerMethod:
msg.append(f"Sampler: {samplerMethod}, period: {samplerPeriod}")
result = timeseries_dataframe_resample(result, samplerPeriod, samplerMethod)

msg.append(f"return-data shape: {result.shape}")

return {
'msg':msg,
'data': timeseries_dataframe_to_datadict(
result if not lastSeen else result[-1:],
recordformat=recordformat,
timezone=timezone,
reversed=reversed)
popNaN=True)
}
except Exception as err:
msg.append(''.join(traceback.format_exception(None, err, err.__traceback__)))
Expand All @@ -121,23 +170,52 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs):
}


# %% ../nbs/01_aicorebridge.ipynb 14
# %% ../nbs/01_aicorebridge.ipynb 17
annotated_arg_builders = {
str(B[0]):B[1] for B in [
(numpy.ndarray, lambda X: numpy.array(X, dtype=X.dtype))
]
}

# %% ../nbs/01_aicorebridge.ipynb 18
@patch
def init_annotated_param(self:AICoreModule, K, value):
"Get arguments for the processor call"

annotation = self.processor_signature.parameters[K].annotation
print(K, annotation, value)

for T in typing.get_args(annotation):
try:
builder = annotated_arg_builders.get(str(T), T)
return builder(value)
except TypeError as err:
#syslog.exception(f"Exception {str(err)} in conversion to {T} of {type(value)}")
continue
try:
return self.processor_signature.parameters[K].annotation(value)
except TypeError as err:
syslog.exception(f"Exception {str(err)} in fallback conversion to {self.processor_signature.parameters[K].annotation} of {type(value)}")

return None


# %% ../nbs/01_aicorebridge.ipynb 19
@patch
def get_callargs(self:AICoreModule, **kwargs):
def get_callargs(self:AICoreModule, kwargs, history):
"Get arguments for the processor call"

# Remove null / None values
kwargs = {k:v for k,v in kwargs.items() if v is not None}

metadata = kwargs.pop('metadata', {}) # TODO: historic metadata

return {
K:self.processor_signature.parameters[K].annotation(
self.init_kwargs.get(
K:self.init_annotated_param(
K,
history.get(
K,
kwargs.get(
self.init_kwargs.get(
K,
metadata.get(
kwargs.get(
K,
self.processor_signature.parameters[K].default
)
Expand All @@ -148,22 +226,20 @@ def get_callargs(self:AICoreModule, **kwargs):
}


# %% ../nbs/01_aicorebridge.ipynb 15
# %% ../nbs/01_aicorebridge.ipynb 22
@patch
def get_call_data(
self:AICoreModule,
data:dict,
recordformat='records',
timezone='UTC',
reversed=False):
timezone='UTC'):

"Convert data to the processor signature"

df = set_time_index_zone(timeseries_dataframe_from_datadict(
data, ['datetimeMeasure', 'time'], recordformat), timezone)

if reversed:
df = df[::-1]
df.sort_index(inplace=True)

if self.processor_params[self.data_param].annotation == pd.DataFrame:
return df
Expand Down
64 changes: 53 additions & 11 deletions corebridge/core.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/00_core.ipynb.

# %% auto 0
__all__ = ['set_time_index_zone', 'timeseries_dataframe', 'timeseries_dataframe_from_datadict',
'timeseries_dataframe_to_datadict']
__all__ = ['ResamplerMethods', 'ReSamplerPeriods', 'set_time_index_zone', 'timeseries_dataframe',
'timeseries_dataframe_from_datadict', 'pop_nan_values', 'timeseries_dataframe_to_datadict',
'timeseries_dataframe_resample']

# %% ../nbs/00_core.ipynb 3
import typing
Expand Down Expand Up @@ -79,13 +80,13 @@ def timeseries_dataframe(
# %% ../nbs/00_core.ipynb 11
def timeseries_dataframe_from_datadict(
data:dict,
timecolumns,
timecolumns=None,
recordformat='records'):

"Convert data dict to dataframe"

orient = recordformat.lower()
assert orient in ['records', 'table']
assert orient in ['records', 'table', 'split', 'index', 'tight']

if orient == 'records':
df = pd.DataFrame.from_records(data)
Expand All @@ -95,6 +96,9 @@ def timeseries_dataframe_from_datadict(
time_column = data['schema']['primaryKey'][0]
df = pd.DataFrame.from_dict(data['data']).set_index(data['schema']['primaryKey'])
df.index.name = 'time'
else:
df = pd.DataFrame.from_dict(data, orient=orient)
time_column = df.index.name


df.columns = list(df.columns)
Expand All @@ -107,32 +111,70 @@ def timeseries_dataframe_from_datadict(
return df


# %% ../nbs/00_core.ipynb 14
def pop_nan_values(data):
if isinstance(data, list):
return [pop_nan_values(v) for v in data if pd.notnull([v]).any()]
elif isinstance(data, dict):
return {k:pop_nan_values(v) for k, v in data.items() if pd.notnull([v]).any()}
else:
return data

# %% ../nbs/00_core.ipynb 15
def timeseries_dataframe_to_datadict(
data:typing.Union[pd.DataFrame, pd.Series, dict],
recordformat:str='split',
timezone:str='UTC',
reversed:bool=False):
recordformat:str='records',
timezone:str='UTC',
popNaN:bool=False):

orient = recordformat.lower()

normalized_data = timeseries_dataframe(data, timezone=timezone)
if isinstance(normalized_data.index, pd.DatetimeIndex):
normalized_data.index = normalized_data.index.map(lambda x: x.isoformat())

if reversed:
normalized_data = normalized_data[::-1]

if orient == 'records':
records = normalized_data.reset_index().to_dict(orient='records')
else:
records = normalized_data.to_dict(orient=orient)


if normalized_data.isna().any(axis=None):
if popNaN and normalized_data.isna().any(axis=None):
#return pop_nan_values(records)
return [ {k:v for k,v in m.items() if pd.notnull(v)} for m in records]
else:
return records




# %% ../nbs/00_core.ipynb 22
ResamplerMethods = dict(
count=lambda R: R.count(),
median=lambda R: R.median(),
mean=lambda R: R.mean(),
min=lambda R: R.min(),
max=lambda R: R.max(),
sum=lambda R: R.sum(),
std=lambda R: R.std(),
var=lambda R: R.var(),

)

ReSamplerPeriods = dict(
H='h', T='min', S='sec', L='ms', U='us', N='ns'
)

def timeseries_dataframe_resample(df:pd.DataFrame, period:str, method:str):

sampler = df.resample(ReSamplerPeriods.get(period, str(period)))

dataframes = [df]
for M in str(method).split(';'):
sdf = ResamplerMethods.get(M)(sampler)
sdf.columns = [f"{C}_{M}" for C in df.columns]
dataframes.append(sdf)

return pd.concat(dataframes, axis=1, join='outer')


Loading

0 comments on commit 8aed56c

Please sign in to comment.