From 0df1b2cdfe34e23eaaa959ef2906969de7686d5e Mon Sep 17 00:00:00 2001 From: Garrett Bischof Date: Wed, 27 Nov 2019 17:16:04 -0500 Subject: [PATCH 1/2] replaced documents_to_xarray --- databroker/core.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/databroker/core.py b/databroker/core.py index 82940b229..92e8e1931 100644 --- a/databroker/core.py +++ b/databroker/core.py @@ -1339,19 +1339,26 @@ def __repr__(self): out = f"" return out + def _to_xarray(self, fill='yes'): + + def stream_gen(): + for i in itertools.count(): + partition = self.read_partition({'index': i, 'fill': fill, + 'partition_size': 'auto'}) + if not partition: + break + yield from partition + + stream = stream_gen() + + arraypages = [eventpage_to_arraypage(doc) for name, doc + in stream if name == 'event_page'] + arraypage = concat_arraypages(array_pages) + datasetpage = arraypage_to_datasetpage(array_page) + return datasetpage + def _open_dataset(self): - self._load_header() - self._ds = documents_to_xarray( - start_doc=self._run_start_doc, - stop_doc=self._run_stop_doc, - descriptor_docs=self._descriptors, - get_event_pages=self._get_event_pages, - filler=self.fillers['delayed'], - get_resource=self._get_resource, - lookup_resource_for_datum=self._lookup_resource_for_datum, - get_datum_pages=self._get_datum_pages, - include=self.include, - exclude=self.exclude) + self._ds = _to_xarray()['data'] def read(self): """ @@ -1756,7 +1763,7 @@ def parse_transforms(transforms): intake.container.container_map['bluesky-event-stream'] = RemoteBlueskyEventStream -def concat_dataarray_pages(dataarray_pages): +def concat_arraypages(dataarray_pages): """ Combines a iterable of dataarray_pages to a single dataarray_page. @@ -1792,7 +1799,7 @@ def concat_dataarray_pages(dataarray_pages): for key in data_keys}} -def event_page_to_dataarray_page(event_page, dims=None, coords=None): +def eventpage_to_arraypage(event_page, dims=None, coords=None): """ Converts the event_page's data, timestamps, and filled to xarray.DataArray. @@ -1831,7 +1838,7 @@ def event_page_to_dataarray_page(event_page, dims=None, coords=None): for key in data_keys}} -def dataarray_page_to_dataset_page(dataarray_page): +def arraypage_to_datasetpage(dataarray_page): """ Converts the dataarray_page's data, timestamps, and filled to xarray.DataSet. From 9597e55be6515d00157801538fcfae0bab181b51 Mon Sep 17 00:00:00 2001 From: Garrett Bischof Date: Mon, 3 Feb 2020 16:19:09 -0500 Subject: [PATCH 2/2] fixed some tests --- databroker/core.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/databroker/core.py b/databroker/core.py index 92e8e1931..3d08f9786 100644 --- a/databroker/core.py +++ b/databroker/core.py @@ -1351,14 +1351,14 @@ def stream_gen(): stream = stream_gen() - arraypages = [eventpage_to_arraypage(doc) for name, doc + array_pages = [eventpage_to_arraypage(doc) for name, doc in stream if name == 'event_page'] - arraypage = concat_arraypages(array_pages) - datasetpage = arraypage_to_datasetpage(array_page) - return datasetpage + array_page = concat_arraypages(array_pages) + dataset_page = arraypage_to_datasetpage(array_page) + return dataset_page def _open_dataset(self): - self._ds = _to_xarray()['data'] + self._ds = self._to_xarray()['data'] def read(self): """ @@ -1784,6 +1784,7 @@ def concat_arraypages(dataarray_pages): array_keys = ['seq_num', 'time', 'uid'] data_keys = dataarray_pages[0]['data'].keys() + filled_keys = dataarray_pages[0]['filled'].keys() return {'descriptor': pages[0]['descriptor'], **{key: list(itertools.chain.from_iterable( @@ -1796,7 +1797,7 @@ def concat_arraypages(dataarray_pages): for key in data_keys}, 'filled': {key: xarray.concat([page['filled'][key] for page in pages], dim='concat_dim') - for key in data_keys}} + for key in filled_keys}} def eventpage_to_arraypage(event_page, dims=None, coords=None): @@ -1824,18 +1825,19 @@ def eventpage_to_arraypage(event_page, dims=None, coords=None): array_keys = ['seq_num', 'time', 'uid'] data_keys = event_page['data'].keys() + filled_keys = event_page['filled'].keys() return {'descriptor': event_page['descriptor'], **{key: event_page[key] for key in array_keys}, - 'data': {key: xarray.DataArray( - event_page['data'][key], dims=dims, coords=coords, name=key) + 'data': {key: xarray.DataArray(event_page['data'][key], dims=dims, + coords=coords, name=key) for key in data_keys}, - 'timestamps': {key: xarray.DataArray( - event_page['timestamps'][key], dims=dims, coords=coords, name=key) + 'timestamps': {key: xarray.DataArray(event_page['timestamps'][key], + dims=dims, coords=coords, name=key) for key in data_keys}, - 'filled': {key: xarray.DataArray( - event_page['filled'][key], dims=dims, coords=coords, name=key) - for key in data_keys}} + 'filled': {key: xarray.DataArray(event_page['filled'][key], + dims=dims, coords=coords, name=key) + for key in filled_keys}} def arraypage_to_datasetpage(dataarray_page):