diff --git a/corebridge/_modidx.py b/corebridge/_modidx.py index 0da1299..44ce077 100644 --- a/corebridge/_modidx.py +++ b/corebridge/_modidx.py @@ -18,10 +18,17 @@ 'corebridge.aicorebridge.AICoreModule.get_callargs': ( 'aicorebridge.html#aicoremodule.get_callargs', 'corebridge/aicorebridge.py'), 'corebridge.aicorebridge.AICoreModule.infer': ( 'aicorebridge.html#aicoremodule.infer', - 'corebridge/aicorebridge.py')}, - 'corebridge.core': { 'corebridge.core.set_time_index_zone': ('core.html#set_time_index_zone', 'corebridge/core.py'), + 'corebridge/aicorebridge.py'), + 'corebridge.aicorebridge.AICoreModule.init_annotated_param': ( 'aicorebridge.html#aicoremodule.init_annotated_param', + 'corebridge/aicorebridge.py'), + 'corebridge.aicorebridge.build_historic_args': ( 'aicorebridge.html#build_historic_args', + 'corebridge/aicorebridge.py')}, + 'corebridge.core': { 'corebridge.core.pop_nan_values': ('core.html#pop_nan_values', 'corebridge/core.py'), + 'corebridge.core.set_time_index_zone': ('core.html#set_time_index_zone', 'corebridge/core.py'), 'corebridge.core.timeseries_dataframe': ('core.html#timeseries_dataframe', 'corebridge/core.py'), 'corebridge.core.timeseries_dataframe_from_datadict': ( 'core.html#timeseries_dataframe_from_datadict', 'corebridge/core.py'), + 'corebridge.core.timeseries_dataframe_resample': ( 'core.html#timeseries_dataframe_resample', + 'corebridge/core.py'), 'corebridge.core.timeseries_dataframe_to_datadict': ( 'core.html#timeseries_dataframe_to_datadict', 'corebridge/core.py')}}} diff --git a/corebridge/aicorebridge.py b/corebridge/aicorebridge.py index 316970d..fd966ca 100644 --- a/corebridge/aicorebridge.py +++ b/corebridge/aicorebridge.py @@ -1,7 +1,7 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/01_aicorebridge.ipynb. # %% auto 0 -__all__ = ['syslog', 'AICoreModule'] +__all__ = ['syslog', 'annotated_arg_builders', 'build_historic_args', 'AICoreModule'] # %% ../nbs/01_aicorebridge.ipynb 4 import typing @@ -10,9 +10,11 @@ import inspect import datetime import json -import pandas as pd -import numpy as np +import os +import pandas, pandas as pd +import numpy, numpy as np +from dateutil import parser from fastcore.basics import patch_to, patch from .core import * from . import __version__ @@ -28,9 +30,42 @@ pass # %% ../nbs/01_aicorebridge.ipynb 7 +def build_historic_args(data, history): + if not history: + return {} + + if isinstance(history, dict): + return history + + if not isinstance(history, list): + return {} + + if isinstance(data, pd.DataFrame): + dates = data.index.astype(np.int64).astype(np.float64) / 1e9 + dates = dates.to_numpy() + elif data.dtype.names is not None: + dates = data.view(dtype=np.float64).reshape(data.shape[0],len(data.dtype))[:,0] + else: + dates = data[:,0] + dates = dates.astype(np.int64) + + columns = list(set([K for I in history for K in I.keys() if K != 'startDate'])) + column_data = {K:np.full(len(dates), np.nan, dtype=np.float64) for K in columns} + + for I in history: + date = parser.parse(str((I.pop('startDate','2000-01-01T00:00:00+00:00')))).timestamp() + mask = np.greater_equal(dates, date) + for K,V in I.items(): + column_data[K][mask] = V + + return column_data + #return pd.DataFrame(column_data, index=data.index) + + +# %% ../nbs/01_aicorebridge.ipynb 10 class AICoreModule(): pass -# %% ../nbs/01_aicorebridge.ipynb 8 +# %% ../nbs/01_aicorebridge.ipynb 11 @patch def __init__(self:AICoreModule, processor:typing.Callable, # data processing function @@ -50,7 +85,7 @@ def __init__(self:AICoreModule, -# %% ../nbs/01_aicorebridge.ipynb 9 +# %% ../nbs/01_aicorebridge.ipynb 12 @patch def _init_processor( self:AICoreModule, @@ -64,14 +99,14 @@ def _init_processor( self.data_param, *self.call_params = list(self.processor_params.keys()) -# %% ../nbs/01_aicorebridge.ipynb 10 +# %% ../nbs/01_aicorebridge.ipynb 13 # can be overloaded @patch def call_processor(self:AICoreModule, calldata, **callargs): return self.processor(calldata, **callargs) -# %% ../nbs/01_aicorebridge.ipynb 12 +# %% ../nbs/01_aicorebridge.ipynb 15 @patch def infer(self:AICoreModule, data:dict, *_, **kwargs): try: @@ -88,15 +123,19 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs): timezone = kwargs.get('timezone', 'UTC') msg.append(f"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}") + samplerPeriod = kwargs.pop('samplerPeriod', 'h') + samplerMethod = kwargs.pop('samplerMethod', None) + calldata = self.get_call_data( data, recordformat=recordformat, - timezone=timezone, - reversed=reversed) + timezone=timezone) msg.append(f"calldata shape: {calldata.shape}") - callargs = self.get_callargs(**kwargs) + history = build_historic_args(calldata, kwargs.pop('history', {})) + + callargs = self.get_callargs(kwargs, history) for arg, val in callargs.items(): msg.append(f"{arg}: {val}") @@ -104,13 +143,23 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs): result = self.call_processor(calldata, **callargs) msg.append(f"result shape: {result.shape}") + result = timeseries_dataframe(result, timezone=timezone) + if reversed: + result = result[::-1] + + if samplerMethod: + msg.append(f"Sampler: {samplerMethod}, period: {samplerPeriod}") + result = timeseries_dataframe_resample(result, samplerPeriod, samplerMethod) + + msg.append(f"return-data shape: {result.shape}") + return { 'msg':msg, 'data': timeseries_dataframe_to_datadict( result if not lastSeen else result[-1:], recordformat=recordformat, timezone=timezone, - reversed=reversed) + popNaN=True) } except Exception as err: msg.append(''.join(traceback.format_exception(None, err, err.__traceback__))) @@ -121,23 +170,52 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs): } -# %% ../nbs/01_aicorebridge.ipynb 14 +# %% ../nbs/01_aicorebridge.ipynb 17 +annotated_arg_builders = { + str(B[0]):B[1] for B in [ + (numpy.ndarray, lambda X: numpy.array(X, dtype=X.dtype)) + ] +} + +# %% ../nbs/01_aicorebridge.ipynb 18 +@patch +def init_annotated_param(self:AICoreModule, K, value): + "Get arguments for the processor call" + + annotation = self.processor_signature.parameters[K].annotation + print(K, annotation, value) + + for T in typing.get_args(annotation): + try: + builder = annotated_arg_builders.get(str(T), T) + return builder(value) + except TypeError as err: + #syslog.exception(f"Exception {str(err)} in conversion to {T} of {type(value)}") + continue + try: + return self.processor_signature.parameters[K].annotation(value) + except TypeError as err: + syslog.exception(f"Exception {str(err)} in fallback conversion to {self.processor_signature.parameters[K].annotation} of {type(value)}") + + return None + + +# %% ../nbs/01_aicorebridge.ipynb 19 @patch -def get_callargs(self:AICoreModule, **kwargs): +def get_callargs(self:AICoreModule, kwargs, history): "Get arguments for the processor call" # Remove null / None values kwargs = {k:v for k,v in kwargs.items() if v is not None} - metadata = kwargs.pop('metadata', {}) # TODO: historic metadata - return { - K:self.processor_signature.parameters[K].annotation( - self.init_kwargs.get( + K:self.init_annotated_param( + K, + history.get( K, - kwargs.get( + self.init_kwargs.get( K, - metadata.get( + kwargs.get( K, self.processor_signature.parameters[K].default ) @@ -148,22 +226,20 @@ def get_callargs(self:AICoreModule, **kwargs): } -# %% ../nbs/01_aicorebridge.ipynb 15 +# %% ../nbs/01_aicorebridge.ipynb 22 @patch def get_call_data( self:AICoreModule, data:dict, recordformat='records', - timezone='UTC', - reversed=False): + timezone='UTC'): "Convert data to the processor signature" df = set_time_index_zone(timeseries_dataframe_from_datadict( data, ['datetimeMeasure', 'time'], recordformat), timezone) - if reversed: - df = df[::-1] + df.sort_index(inplace=True) if self.processor_params[self.data_param].annotation == pd.DataFrame: return df diff --git a/corebridge/core.py b/corebridge/core.py index 8fdf481..a61e0da 100644 --- a/corebridge/core.py +++ b/corebridge/core.py @@ -1,8 +1,9 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/00_core.ipynb. # %% auto 0 -__all__ = ['set_time_index_zone', 'timeseries_dataframe', 'timeseries_dataframe_from_datadict', - 'timeseries_dataframe_to_datadict'] +__all__ = ['ResamplerMethods', 'ReSamplerPeriods', 'set_time_index_zone', 'timeseries_dataframe', + 'timeseries_dataframe_from_datadict', 'pop_nan_values', 'timeseries_dataframe_to_datadict', + 'timeseries_dataframe_resample'] # %% ../nbs/00_core.ipynb 3 import typing @@ -79,13 +80,13 @@ def timeseries_dataframe( # %% ../nbs/00_core.ipynb 11 def timeseries_dataframe_from_datadict( data:dict, - timecolumns, + timecolumns=None, recordformat='records'): "Convert data dict to dataframe" orient = recordformat.lower() - assert orient in ['records', 'table'] + assert orient in ['records', 'table', 'split', 'index', 'tight'] if orient == 'records': df = pd.DataFrame.from_records(data) @@ -95,6 +96,9 @@ def timeseries_dataframe_from_datadict( time_column = data['schema']['primaryKey'][0] df = pd.DataFrame.from_dict(data['data']).set_index(data['schema']['primaryKey']) df.index.name = 'time' + else: + df = pd.DataFrame.from_dict(data, orient=orient) + time_column = df.index.name df.columns = list(df.columns) @@ -107,12 +111,21 @@ def timeseries_dataframe_from_datadict( return df +# %% ../nbs/00_core.ipynb 14 +def pop_nan_values(data): + if isinstance(data, list): + return [pop_nan_values(v) for v in data if pd.notnull([v]).any()] + elif isinstance(data, dict): + return {k:pop_nan_values(v) for k, v in data.items() if pd.notnull([v]).any()} + else: + return data + # %% ../nbs/00_core.ipynb 15 def timeseries_dataframe_to_datadict( data:typing.Union[pd.DataFrame, pd.Series, dict], - recordformat:str='split', - timezone:str='UTC', - reversed:bool=False): + recordformat:str='records', + timezone:str='UTC', + popNaN:bool=False): orient = recordformat.lower() @@ -120,19 +133,48 @@ def timeseries_dataframe_to_datadict( if isinstance(normalized_data.index, pd.DatetimeIndex): normalized_data.index = normalized_data.index.map(lambda x: x.isoformat()) - if reversed: - normalized_data = normalized_data[::-1] - if orient == 'records': records = normalized_data.reset_index().to_dict(orient='records') else: records = normalized_data.to_dict(orient=orient) - if normalized_data.isna().any(axis=None): + if popNaN and normalized_data.isna().any(axis=None): + #return pop_nan_values(records) return [ {k:v for k,v in m.items() if pd.notnull(v)} for m in records] else: return records + +# %% ../nbs/00_core.ipynb 22 +ResamplerMethods = dict( + count=lambda R: R.count(), + median=lambda R: R.median(), + mean=lambda R: R.mean(), + min=lambda R: R.min(), + max=lambda R: R.max(), + sum=lambda R: R.sum(), + std=lambda R: R.std(), + var=lambda R: R.var(), + +) + +ReSamplerPeriods = dict( + H='h', T='min', S='sec', L='ms', U='us', N='ns' +) + +def timeseries_dataframe_resample(df:pd.DataFrame, period:str, method:str): + + sampler = df.resample(ReSamplerPeriods.get(period, str(period))) + + dataframes = [df] + for M in str(method).split(';'): + sdf = ResamplerMethods.get(M)(sampler) + sdf.columns = [f"{C}_{M}" for C in df.columns] + dataframes.append(sdf) + + return pd.concat(dataframes, axis=1, join='outer') + + diff --git a/nbs/00_core.ipynb b/nbs/00_core.ipynb index 866733c..065e2e7 100644 --- a/nbs/00_core.ipynb +++ b/nbs/00_core.ipynb @@ -176,13 +176,13 @@ "#| export\n", "def timeseries_dataframe_from_datadict(\n", " data:dict, \n", - " timecolumns,\n", + " timecolumns=None,\n", " recordformat='records'):\n", " \n", " \"Convert data dict to dataframe\"\n", "\n", " orient = recordformat.lower()\n", - " assert orient in ['records', 'table']\n", + " assert orient in ['records', 'table', 'split', 'index', 'tight']\n", " \n", " if orient == 'records':\n", " df = pd.DataFrame.from_records(data)\n", @@ -192,6 +192,9 @@ " time_column = data['schema']['primaryKey'][0]\n", " df = pd.DataFrame.from_dict(data['data']).set_index(data['schema']['primaryKey'])\n", " df.index.name = 'time'\n", + " else:\n", + " df = pd.DataFrame.from_dict(data, orient=orient)\n", + " time_column = df.index.name\n", "\n", "\n", " df.columns = list(df.columns)\n", @@ -369,7 +372,17 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "#| export\n", + "\n", + "def pop_nan_values(data):\n", + " if isinstance(data, list):\n", + " return [pop_nan_values(v) for v in data if pd.notnull([v]).any()]\n", + " elif isinstance(data, dict):\n", + " return {k:pop_nan_values(v) for k, v in data.items() if pd.notnull([v]).any()}\n", + " else:\n", + " return data" + ] }, { "cell_type": "code", @@ -380,9 +393,9 @@ "#| export\n", "def timeseries_dataframe_to_datadict(\n", " data:typing.Union[pd.DataFrame, pd.Series, dict], \n", - " recordformat:str='split', \n", - " timezone:str='UTC', \n", - " reversed:bool=False):\n", + " recordformat:str='records', \n", + " timezone:str='UTC',\n", + " popNaN:bool=False):\n", " \n", " orient = recordformat.lower()\n", "\n", @@ -390,16 +403,14 @@ " if isinstance(normalized_data.index, pd.DatetimeIndex):\n", " normalized_data.index = normalized_data.index.map(lambda x: x.isoformat())\n", " \n", - " if reversed:\n", - " normalized_data = normalized_data[::-1]\n", - "\n", " if orient == 'records':\n", " records = normalized_data.reset_index().to_dict(orient='records')\n", " else:\n", " records = normalized_data.to_dict(orient=orient)\n", " \n", "\n", - " if normalized_data.isna().any(axis=None):\n", + " if popNaN and normalized_data.isna().any(axis=None):\n", + " #return pop_nan_values(records)\n", " return [ {k:v for k,v in m.items() if pd.notnull(v)} for m in records]\n", " else:\n", " return records\n", @@ -407,6 +418,181 @@ "\n" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df = timeseries_dataframe_from_datadict([\n", + " {\n", + " \"time\":\"2023-05-04T10:04:49.000Z\",\n", + " \"value\":16.72\n", + " },\n", + " {\n", + " \"time\":\"2023-05-04T10:24:51.000Z\",\n", + " \"value\":16.65\n", + " },\n", + " {\n", + " \"time\":\"2023-05-04T10:44:53.000Z\",\n", + " \"value\":16.55\n", + " },\n", + " {\n", + " \"time\":\"2023-05-04T10:44:53.000Z\",\n", + " \"value\":np.nan\n", + " }\n", + " ], timecolumns=['time'])\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[{'time': '2023-05-04T10:04:49+00:00', 'value': 16.72},\n", + " {'time': '2023-05-04T10:24:51+00:00', 'value': 16.65},\n", + " {'time': '2023-05-04T10:44:53+00:00', 'value': 16.55},\n", + " {'time': '2023-05-04T10:44:53+00:00'}]" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "timeseries_dataframe_to_datadict(df, recordformat='records', popNaN=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'index': ['2023-05-04T10:04:49+00:00',\n", + " '2023-05-04T10:24:51+00:00',\n", + " '2023-05-04T10:44:53+00:00',\n", + " '2023-05-04T10:44:53+00:00'],\n", + " 'columns': ['value'],\n", + " 'data': [[16.72], [16.65], [16.55]],\n", + " 'index_names': ['time']}" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "timeseries_dataframe_to_datadict(df, recordformat='tight', popNaN=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_data = {'index': ['2023-05-04T10:04:49+00:00',\n", + " '2023-05-04T10:24:51+00:00',\n", + " '2023-05-04T10:44:53+00:00',\n", + " '2023-05-04T10:44:53+00:00'],\n", + " 'columns': ['value'],\n", + " 'data': [[16.72], [16.65], [16.55], [np.nan]],\n", + " 'index_names': ['time'],\n", + " 'column_names': [None]}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'index': ['2023-05-04T10:04:49+00:00',\n", + " '2023-05-04T10:24:51+00:00',\n", + " '2023-05-04T10:44:53+00:00',\n", + " '2023-05-04T10:44:53+00:00'],\n", + " 'columns': ['value'],\n", + " 'data': [[16.72], [16.65], [16.55]],\n", + " 'index_names': ['time']}" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pop_nan_values(test_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pd.notnull([[np.nan, 2]]).any()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| export\n", + "ResamplerMethods = dict(\n", + " count=lambda R: R.count(),\n", + " median=lambda R: R.median(),\n", + " mean=lambda R: R.mean(),\n", + " min=lambda R: R.min(),\n", + " max=lambda R: R.max(),\n", + " sum=lambda R: R.sum(),\n", + " std=lambda R: R.std(),\n", + " var=lambda R: R.var(),\n", + "\n", + ")\n", + "\n", + "ReSamplerPeriods = dict(\n", + " H='h', T='min', S='sec', L='ms', U='us', N='ns'\n", + ")\n", + "\n", + "def timeseries_dataframe_resample(df:pd.DataFrame, period:str, method:str):\n", + "\n", + " sampler = df.resample(ReSamplerPeriods.get(period, str(period)))\n", + "\n", + " dataframes = [df]\n", + " for M in str(method).split(';'):\n", + " sdf = ResamplerMethods.get(M)(sampler)\n", + " sdf.columns = [f\"{C}_{M}\" for C in df.columns]\n", + " dataframes.append(sdf)\n", + "\n", + " return pd.concat(dataframes, axis=1, join='outer')\n", + "\n" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/nbs/01_aicorebridge.ipynb b/nbs/01_aicorebridge.ipynb index 1208ce0..9eb99ad 100644 --- a/nbs/01_aicorebridge.ipynb +++ b/nbs/01_aicorebridge.ipynb @@ -62,9 +62,11 @@ "import inspect\n", "import datetime\n", "import json\n", - "import pandas as pd\n", - "import numpy as np\n", + "import os\n", + "import pandas, pandas as pd\n", + "import numpy, numpy as np\n", "\n", + "from dateutil import parser\n", "from fastcore.basics import patch_to, patch\n", "from corebridge.core import *\n", "from corebridge import __version__\n" @@ -93,6 +95,92 @@ " pass" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| export\n", + "def build_historic_args(data, history):\n", + " if not history:\n", + " return {}\n", + " \n", + " if isinstance(history, dict):\n", + " return history\n", + " \n", + " if not isinstance(history, list):\n", + " return {}\n", + " \n", + " if isinstance(data, pd.DataFrame):\n", + " dates = data.index.astype(np.int64).astype(np.float64) / 1e9\n", + " dates = dates.to_numpy()\n", + " elif data.dtype.names is not None:\n", + " dates = data.view(dtype=np.float64).reshape(data.shape[0],len(data.dtype))[:,0]\n", + " else:\n", + " dates = data[:,0]\n", + " dates = dates.astype(np.int64)\n", + " \n", + " columns = list(set([K for I in history for K in I.keys() if K != 'startDate']))\n", + " column_data = {K:np.full(len(dates), np.nan, dtype=np.float64) for K in columns}\n", + "\n", + " for I in history:\n", + " date = parser.parse(str((I.pop('startDate','2000-01-01T00:00:00+00:00')))).timestamp()\n", + " mask = np.greater_equal(dates, date)\n", + " for K,V in I.items():\n", + " column_data[K][mask] = V\n", + " \n", + " return column_data\n", + " #return pd.DataFrame(column_data, index=data.index)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_data=set_time_index_zone(timeseries_dataframe_from_datadict(\n", + " [\n", + " {\n", + " \"time\":\"2023-05-04T10:04:49\",\n", + " \"value\":16.72\n", + " },\n", + " {\n", + " \"time\":\"2023-05-04T10:44:53\",\n", + " \"value\":16.55\n", + " },\n", + " {\n", + " \"time\":\"2023-05-04T10:24:51\",\n", + " \"value\":16.65\n", + " }\n", + " ], ['datetimeMeasure', 'time'], 'records'), 'UTC').sort_index()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'anumber': array([1., 1., 2.])}" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history_arg = [\n", + " dict(anumber=1.0),\n", + " dict(startDate=\"2023-05-04T10:25:00+00:00\", anumber=2.0)\n", + " ]\n", + "build_historic_args(test_data,history_arg)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -196,15 +284,19 @@ " timezone = kwargs.get('timezone', 'UTC')\n", " msg.append(f\"lastSeen: {lastSeen}, recordformat: {recordformat}, timezone: {timezone}\")\n", "\n", + " samplerPeriod = kwargs.pop('samplerPeriod', 'h')\n", + " samplerMethod = kwargs.pop('samplerMethod', None)\n", + "\n", " calldata = self.get_call_data(\n", " data, \n", " recordformat=recordformat,\n", - " timezone=timezone,\n", - " reversed=reversed)\n", + " timezone=timezone)\n", " \n", " msg.append(f\"calldata shape: {calldata.shape}\")\n", "\n", - " callargs = self.get_callargs(**kwargs)\n", + " history = build_historic_args(calldata, kwargs.pop('history', {}))\n", + "\n", + " callargs = self.get_callargs(kwargs, history)\n", "\n", " for arg, val in callargs.items():\n", " msg.append(f\"{arg}: {val}\")\n", @@ -212,13 +304,23 @@ " result = self.call_processor(calldata, **callargs)\n", " msg.append(f\"result shape: {result.shape}\")\n", "\n", + " result = timeseries_dataframe(result, timezone=timezone)\n", + " if reversed:\n", + " result = result[::-1]\n", + "\n", + " if samplerMethod:\n", + " msg.append(f\"Sampler: {samplerMethod}, period: {samplerPeriod}\")\n", + " result = timeseries_dataframe_resample(result, samplerPeriod, samplerMethod)\n", + "\n", + " msg.append(f\"return-data shape: {result.shape}\")\n", + "\n", " return {\n", " 'msg':msg,\n", " 'data': timeseries_dataframe_to_datadict(\n", " result if not lastSeen else result[-1:],\n", " recordformat=recordformat,\n", " timezone=timezone,\n", - " reversed=reversed)\n", + " popNaN=True)\n", " }\n", " except Exception as err:\n", " msg.append(''.join(traceback.format_exception(None, err, err.__traceback__)))\n", @@ -236,6 +338,20 @@ "### `get_callargs`\n" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| export\n", + "annotated_arg_builders = {\n", + " str(B[0]):B[1] for B in [\n", + " (numpy.ndarray, lambda X: numpy.array(X, dtype=X.dtype))\n", + " ]\n", + "}" + ] + }, { "cell_type": "code", "execution_count": null, @@ -244,21 +360,50 @@ "source": [ "#| export\n", "@patch\n", - "def get_callargs(self:AICoreModule, **kwargs):\n", + "def init_annotated_param(self:AICoreModule, K, value):\n", + " \"Get arguments for the processor call\"\n", + "\n", + " annotation = self.processor_signature.parameters[K].annotation\n", + " print(K, annotation, value)\n", + "\n", + " for T in typing.get_args(annotation):\n", + " try:\n", + " builder = annotated_arg_builders.get(str(T), T)\n", + " return builder(value)\n", + " except TypeError as err:\n", + " #syslog.exception(f\"Exception {str(err)} in conversion to {T} of {type(value)}\")\n", + " continue\n", + " try:\n", + " return self.processor_signature.parameters[K].annotation(value)\n", + " except TypeError as err:\n", + " syslog.exception(f\"Exception {str(err)} in fallback conversion to {self.processor_signature.parameters[K].annotation} of {type(value)}\")\n", + "\n", + " return None\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| export\n", + "@patch\n", + "def get_callargs(self:AICoreModule, kwargs, history):\n", " \"Get arguments for the processor call\"\n", "\n", " # Remove null / None values\n", " kwargs = {k:v for k,v in kwargs.items() if v is not None}\n", " \n", - " metadata = kwargs.pop('metadata', {}) # TODO: historic metadata\n", - "\n", " return {\n", - " K:self.processor_signature.parameters[K].annotation(\n", - " self.init_kwargs.get(\n", + " K:self.init_annotated_param(\n", + " K,\n", + " history.get(\n", " K,\n", - " kwargs.get(\n", + " self.init_kwargs.get(\n", " K,\n", - " metadata.get(\n", + " kwargs.get(\n", " K, \n", " self.processor_signature.parameters[K].default\n", " )\n", @@ -269,6 +414,25 @@ " }\n" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def processor_function(data:pd.DataFrame, anumber:float|np.ndarray):\n", + " return anumber * data\n", + "\n", + "test_module = AICoreModule(processor_function, os.path.join(os.getcwd(), 'cache'), os.path.join(os.getcwd(), 'cache'))\n" + ] + }, { "cell_type": "code", "execution_count": null, @@ -281,16 +445,14 @@ " self:AICoreModule, \n", " data:dict, \n", " recordformat='records', \n", - " timezone='UTC', \n", - " reversed=False):\n", + " timezone='UTC'):\n", " \n", " \"Convert data to the processor signature\"\n", "\n", " df = set_time_index_zone(timeseries_dataframe_from_datadict(\n", " data, ['datetimeMeasure', 'time'], recordformat), timezone)\n", "\n", - " if reversed:\n", - " df = df[::-1]\n", + " df.sort_index(inplace=True)\n", "\n", " if self.processor_params[self.data_param].annotation == pd.DataFrame:\n", " return df\n", @@ -303,6 +465,258 @@ " " ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " | index | \n", + "value | \n", + "
---|---|---|
time | \n", + "\n", + " | \n", + " |
2023-05-04 10:04:49+00:00 | \n", + "0 | \n", + "16.72 | \n", + "
2023-05-04 10:24:51+00:00 | \n", + "1 | \n", + "16.65 | \n", + "
2023-05-04 10:44:53+00:00 | \n", + "2 | \n", + "16.55 | \n", + "