Skip to content

Commit

Permalink
add aicorebridge
Browse files Browse the repository at this point in the history
move code to core
  • Loading branch information
fenke committed Apr 16, 2024
1 parent e0be4f5 commit 79c807e
Show file tree
Hide file tree
Showing 9 changed files with 1,095 additions and 10 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# corebridge


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

This file will become your README and also the index of your
Expand All @@ -20,3 +21,15 @@ Fill me in please! Don’t forget code examples:
```

2

## nbdev cycle

- edit
- nbdev_export
- pip install -e ‘.\[dev\]
- nbdev_test
- nbdev_clean
- nbdev_readme
- nbdev_prepare
- git add .
-
2 changes: 1 addition & 1 deletion corebridge/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.1"
__version__ = "0.0.5"
22 changes: 21 additions & 1 deletion corebridge/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,24 @@
'doc_host': 'https://fenke.github.io',
'git_url': 'https://github.com/fenke/corebridge',
'lib_path': 'corebridge'},
'syms': {'corebridge.core': {'corebridge.core.foo': ('core.html#foo', 'corebridge/core.py')}}}
'syms': { 'corebridge.aicorebridge': { 'corebridge.aicorebridge.AICoreModule': ( 'aicorebridge.html#aicoremodule',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.__init__': ( 'aicorebridge.html#aicoremodule.__init__',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule._init_processor': ( 'aicorebridge.html#aicoremodule._init_processor',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.convert_to_dataframe': ( 'aicorebridge.html#aicoremodule.convert_to_dataframe',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.get_call_data': ( 'aicorebridge.html#aicoremodule.get_call_data',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.get_callargs': ( 'aicorebridge.html#aicoremodule.get_callargs',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.infer': ( 'aicorebridge.html#aicoremodule.infer',
'corebridge/aicorebridge.py'),
'corebridge.aicorebridge.AICoreModule.rewrite_data': ( 'aicorebridge.html#aicoremodule.rewrite_data',
'corebridge/aicorebridge.py')},
'corebridge.core': { 'corebridge.core.timeseries_dataframe': ('core.html#timeseries_dataframe', 'corebridge/core.py'),
'corebridge.core.timeseries_dataframe_from_datadict': ( 'core.html#timeseries_dataframe_from_datadict',
'corebridge/core.py'),
'corebridge.core.timeseries_dataframe_to_datadict': ( 'core.html#timeseries_dataframe_to_datadict',
'corebridge/core.py')}}}
247 changes: 247 additions & 0 deletions corebridge/aicorebridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/01_aicorebridge.ipynb.

# %% auto 0
__all__ = ['syslog', 'AICoreModule']

# %% ../nbs/01_aicorebridge.ipynb 2
import typing
import logging
import inspect
import datetime
import json
import pandas as pd
import numpy as np

from fastcore.basics import patch_to, patch

# %% ../nbs/01_aicorebridge.ipynb 4
syslog = logging.getLogger(__name__)

# %% ../nbs/01_aicorebridge.ipynb 6
class AICoreModule(): pass

# %% ../nbs/01_aicorebridge.ipynb 7
@patch
def __init__(self:AICoreModule,
save_dir:str, # path where the module can keep files
processor:typing.Callable, # data processing function
*args, **kwargs):

self.save_dir = save_dir
self._init_processor(processor)

self.init_args = args
self.init_kwargs = kwargs



# %% ../nbs/01_aicorebridge.ipynb 8
@patch
def _init_processor(
self:AICoreModule,
processor:typing.Callable):
"""Initializes processor related variables on self"""

self.processor = processor
self.processor_signature = inspect.signature(self.processor)
self.processor_params = dict(self.processor_signature.parameters)
self.return_param = self.processor_params.pop('return', None)
self.data_param, *self.call_params = list(self.processor_params.keys())


# %% ../nbs/01_aicorebridge.ipynb 9
@patch
def infer(self:AICoreModule, data:dict, *_, **kwargs):
try:

msg=[
f"{self.processor.__name__}({self.processor_signature})",
f"init_args: {self.init_args}, init_kwargs: {self.init_kwargs}",
]

lastSeen = kwargs.pop('lastSeen', False)
recordformat = kwargs.pop('format', "records").lower()
reversed = kwargs.pop('reversed', False)
timezone = kwargs.get('timezone', 'UTC')
msg.append(f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}")

calldata = self.get_call_data(
data,
recordformat=recordformat,
timezone=timezone,
reversed=reversed)

msg.append(f"calldata shape: {calldata.shape}")

callargs = self.get_callargs(**kwargs)

for arg, val in callargs.items():
msg.append(f"{arg}: {val}")

result = self.processor(calldata, **callargs)
msg.append(f"result shape: {result.shape}")

return {
'msg':msg,
'data': self.rewrite_data(
result if not lastSeen else result[-1:],
recordformat=recordformat,
timezone=timezone,
reversed=reversed)
}
except Exception as err:
return {
'msg': f"Unexpected {err=}, {type(err)=}",
'data': []
}


# %% ../nbs/01_aicorebridge.ipynb 10
@patch
def get_callargs(self:AICoreModule, **kwargs):
"Get arguments for the processor call"

metadata = kwargs.pop('metadata', {}) # TODO: historic metadata

return {
K:self.processor_signature.parameters[K].annotation(kwargs.get(K,metadata.get(K, self.init_kwargs.get(K, self.processor_signature.parameters[K].default))))
for K in self.call_params
}


# %% ../nbs/01_aicorebridge.ipynb 11
@patch
def get_call_data(
self:AICoreModule,
data:dict,
recordformat='records',
timezone='UTC',
reversed=False):

"Convert data to the processor signature"

orient = recordformat.lower()
assert orient in ['records', 'table']
#assert False, "started work on format"

if orient == 'records':
df = pd.DataFrame.from_records(data)
time_column = [C for C in df.columns if C.lower() in ['datetimemeasure', 'time']][0]

elif orient == 'table':
time_column = data['schema']['primaryKey'][0]
df = pd.DataFrame.from_dict(data['data']).set_index(data['schema']['primaryKey'])
df.index.name = 'time'

df.columns = [C.lower() for C in df.columns]
time_column = [C for C in df.columns if C in ['datetimemeasure', 'time']][0]
df[time_column] = pd.to_datetime(df[time_column],utc=True,format='ISO8601')
df.set_index(time_column, inplace=True)
#df.index = pd.DatetimeIndex(df.index).round('ms')

df.index.name = 'time'

if reversed:
df = df[::-1]

if not df.index.tz:
df.index = df.index.tz_localize('UTC').tz_convert(timezone)
elif str(df.index.tz) != timezone:
df.index = df.index.tz_convert(timezone)

if self.processor_params[self.data_param].annotation == pd.DataFrame:
return df
elif len(df.columns) > 1:
df.index = (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
return df.to_records(index=True)
else:
df.index = (df.index - datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)) / datetime.timedelta(seconds=1)
return df.reset_index().to_numpy()


# %% ../nbs/01_aicorebridge.ipynb 12
@patch
def rewrite_data(
self:AICoreModule,
data:typing.Union[pd.DataFrame, pd.Series, dict],
recordformat:str='split',
timezone:str='UTC',
reversed:bool=False):

"Rewrite data to dictionary matching the requested recordformat"

orient = recordformat.lower()

if orient == 'split-index':
orient = 'split'

normalized_data = self.convert_to_dataframe(data, timezone=timezone)
normalized_data.index = normalized_data.index.map(lambda x: x.isoformat())

if reversed:
normalized_data = normalized_data[::-1]

if orient == 'records':
records = normalized_data.reset_index().to_dict(orient='records')
else:
records = normalized_data.to_dict(orient=orient)


if normalized_data.isna().any(axis=None):
return [ {k:v for k,v in m.items() if pd.notnull(v)} for m in records]
else:
return records



# %% ../nbs/01_aicorebridge.ipynb 13
@patch
def convert_to_dataframe(
self:AICoreModule,
adata:typing.Union[pd.DataFrame, pd.Series, dict, np.ndarray, np.recarray],
timezone='UTC',
columnnames=None):
"""Convert various data formats to standardized timeseries DataFrame"""

if isinstance(adata, pd.DataFrame):
df = adata
elif isinstance(adata, pd.Series):
df = pd.DataFrame(adata)

elif isinstance(adata, dict):
# dict/mapping of individual timeseries
df = pd.DataFrame({
C:pd.Series(data=A[:,1], index=pd.DatetimeIndex(A[:,0]*1e9)) if isinstance(A, np.ndarray) else A
for C,A in adata.items()
})

elif adata.dtype.names is not None:
# structured or recarray
df = pd.DataFrame(
data=adata.view(dtype=np.float64).reshape(adata.shape[0],len(adata.dtype))[:,range(1,len(adata.dtype))],
index=pd.DatetimeIndex(adata.view(dtype=np.float64).reshape(adata.shape[0],len(adata.dtype))[:,0] * 1e9),
columns=adata.dtype.names[1:]
)

else:
if adata.shape[0] > 0:
if adata.shape[1]>2:
columns=[f"value_{str(i+1)}" for i in range(adata.shape[1]-1)] if not columnnames else [f"{str(i)}" for i in columnnames[1:]]
else:
columns=['value']

df = pd.DataFrame(
data=adata[:, 1:],
index=pd.DatetimeIndex(adata[:,0]*1e9),
columns=columns
)
else:
return pd.DataFrame()

df.index.name = 'time'
if not df.index.tz:
df.index = df.index.tz_localize('UTC').tz_convert(timezone)
elif str(df.index.tz) != timezone:
df.index = df.index.tz_convert(timezone)

return df
Loading

0 comments on commit 79c807e

Please sign in to comment.