Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to documents_to_xarray #471

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions databroker/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,19 +1339,26 @@ def __repr__(self):
out = f"<Intake catalog: Stream *REPR_RENDERING_FAILURE* {exc!r}>"
return out

def _to_xarray(self, fill='yes'):

def stream_gen():
for i in itertools.count():
partition = self.read_partition({'index': i, 'fill': fill,
'partition_size': 'auto'})
if not partition:
break
yield from partition

stream = stream_gen()

array_pages = [eventpage_to_arraypage(doc) for name, doc
in stream if name == 'event_page']
array_page = concat_arraypages(array_pages)
dataset_page = arraypage_to_datasetpage(array_page)
return dataset_page

def _open_dataset(self):
self._load_header()
self._ds = documents_to_xarray(
start_doc=self._run_start_doc,
stop_doc=self._run_stop_doc,
descriptor_docs=self._descriptors,
get_event_pages=self._get_event_pages,
filler=self.fillers['delayed'],
get_resource=self._get_resource,
lookup_resource_for_datum=self._lookup_resource_for_datum,
get_datum_pages=self._get_datum_pages,
include=self.include,
exclude=self.exclude)
self._ds = self._to_xarray()['data']

def read(self):
"""
Expand Down Expand Up @@ -1756,7 +1763,7 @@ def parse_transforms(transforms):
intake.container.container_map['bluesky-event-stream'] = RemoteBlueskyEventStream


def concat_dataarray_pages(dataarray_pages):
def concat_arraypages(dataarray_pages):
"""
Combines a iterable of dataarray_pages to a single dataarray_page.

Expand All @@ -1777,6 +1784,7 @@ def concat_dataarray_pages(dataarray_pages):

array_keys = ['seq_num', 'time', 'uid']
data_keys = dataarray_pages[0]['data'].keys()
filled_keys = dataarray_pages[0]['filled'].keys()

return {'descriptor': pages[0]['descriptor'],
**{key: list(itertools.chain.from_iterable(
Expand All @@ -1789,10 +1797,10 @@ def concat_dataarray_pages(dataarray_pages):
for key in data_keys},
'filled': {key: xarray.concat([page['filled'][key]
for page in pages], dim='concat_dim')
for key in data_keys}}
for key in filled_keys}}


def event_page_to_dataarray_page(event_page, dims=None, coords=None):
def eventpage_to_arraypage(event_page, dims=None, coords=None):
"""
Converts the event_page's data, timestamps, and filled to xarray.DataArray.

Expand All @@ -1817,21 +1825,22 @@ def event_page_to_dataarray_page(event_page, dims=None, coords=None):

array_keys = ['seq_num', 'time', 'uid']
data_keys = event_page['data'].keys()
filled_keys = event_page['filled'].keys()

return {'descriptor': event_page['descriptor'],
**{key: event_page[key] for key in array_keys},
'data': {key: xarray.DataArray(
event_page['data'][key], dims=dims, coords=coords, name=key)
'data': {key: xarray.DataArray(event_page['data'][key], dims=dims,
coords=coords, name=key)
for key in data_keys},
'timestamps': {key: xarray.DataArray(
event_page['timestamps'][key], dims=dims, coords=coords, name=key)
'timestamps': {key: xarray.DataArray(event_page['timestamps'][key],
dims=dims, coords=coords, name=key)
for key in data_keys},
'filled': {key: xarray.DataArray(
event_page['filled'][key], dims=dims, coords=coords, name=key)
for key in data_keys}}
'filled': {key: xarray.DataArray(event_page['filled'][key],
dims=dims, coords=coords, name=key)
for key in filled_keys}}


def dataarray_page_to_dataset_page(dataarray_page):
def arraypage_to_datasetpage(dataarray_page):

"""
Converts the dataarray_page's data, timestamps, and filled to xarray.DataSet.
Expand Down