Skip to content

Commit 83ff4a2

Browse files
CO2Leakage: Statistics plot, bug fixes, read new file name formats (#1319)
1 parent 51acd9c commit 83ff4a2

21 files changed

+3360
-1286
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .ensemble_polygon_provider import EnsemblePolygonProvider, SimulatedPolygonsAddress
2+
from .ensemble_polygon_provider_factory import EnsemblePolygonProviderFactory
3+
from .polygon_server import PolygonsAddress, PolygonServer
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import glob
2+
import os
3+
import re
4+
from dataclasses import dataclass
5+
from pathlib import Path
6+
from typing import Dict, List, Optional
7+
8+
# The fmu.ensemble dependency resdata is only available for Linux,
9+
# hence, ignore any import exception here to make
10+
# it still possible to use the PvtPlugin on
11+
# machines with other OSes.
12+
#
13+
# NOTE: Functions in this file cannot be used
14+
# on non-Linux OSes.
15+
try:
16+
from fmu.ensemble import ScratchEnsemble
17+
except ImportError:
18+
pass
19+
20+
21+
@dataclass(frozen=True)
22+
class PolygonsFileInfo:
23+
path: str
24+
real: int
25+
name: str
26+
attribute: str
27+
28+
29+
def _discover_ensemble_realizations_fmu(ens_path: str) -> Dict[int, str]:
30+
"""Returns dict indexed by realization number and with runpath as value"""
31+
scratch_ensemble = ScratchEnsemble("dummyEnsembleName", paths=ens_path).filter("OK")
32+
real_dict = {i: r.runpath() for i, r in scratch_ensemble.realizations.items()}
33+
return real_dict
34+
35+
36+
def _discover_ensemble_realizations(ens_path: str) -> Dict[int, str]:
37+
# Much faster than FMU impl above, but is it risky?
38+
# Do we need to check for OK-file?
39+
real_dict: Dict[int, str] = {}
40+
41+
realidxregexp = re.compile(r"realization-(\d+)")
42+
globbed_real_dirs = sorted(glob.glob(str(ens_path)))
43+
for real_dir in globbed_real_dirs:
44+
realnum: Optional[int] = None
45+
for path_comp in reversed(real_dir.split(os.path.sep)):
46+
realmatch = re.match(realidxregexp, path_comp)
47+
if realmatch:
48+
realnum = int(realmatch.group(1))
49+
break
50+
51+
if realnum is not None:
52+
real_dict[realnum] = real_dir
53+
54+
return real_dict
55+
56+
57+
@dataclass(frozen=True)
58+
class PolygonsIdent:
59+
name: str
60+
attribute: str
61+
62+
63+
def _polygons_ident_from_filename(filename: str) -> Optional[PolygonsIdent]:
64+
"""Split the stem part of the fault polygons filename into fault polygons name and attribute"""
65+
delimiter: str = "--"
66+
parts = Path(filename).stem.split(delimiter)
67+
if len(parts) != 2:
68+
return None
69+
70+
return PolygonsIdent(name=parts[0], attribute=parts[1])
71+
72+
73+
def discover_per_realization_polygons_files(
74+
ens_path: str,
75+
polygons_pattern: str,
76+
) -> List[PolygonsFileInfo]:
77+
polygons_files: List[PolygonsFileInfo] = []
78+
79+
real_dict = _discover_ensemble_realizations_fmu(ens_path)
80+
for realnum, runpath in sorted(real_dict.items()):
81+
if Path(polygons_pattern).is_absolute():
82+
filenames = [polygons_pattern]
83+
else:
84+
filenames = glob.glob(str(Path(runpath) / polygons_pattern))
85+
for polygons_filename in sorted(filenames):
86+
polygons_ident = _polygons_ident_from_filename(polygons_filename)
87+
if polygons_ident:
88+
polygons_files.append(
89+
PolygonsFileInfo(
90+
path=polygons_filename,
91+
real=realnum,
92+
name=polygons_ident.name,
93+
attribute=polygons_ident.attribute,
94+
)
95+
)
96+
97+
return polygons_files
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import logging
2+
import shutil
3+
from pathlib import Path
4+
from typing import List, Optional
5+
6+
import pandas as pd
7+
import xtgeo
8+
9+
from webviz_subsurface._utils.enum_shim import StrEnum
10+
from webviz_subsurface._utils.perf_timer import PerfTimer
11+
12+
from ._polygon_discovery import PolygonsFileInfo
13+
from .ensemble_polygon_provider import (
14+
EnsemblePolygonProvider,
15+
PolygonsAddress,
16+
SimulatedPolygonsAddress,
17+
)
18+
19+
LOGGER = logging.getLogger(__name__)
20+
21+
REL_SIM_DIR = "sim"
22+
23+
24+
# pylint: disable=too-few-public-methods
25+
class Col:
26+
TYPE = "type"
27+
REAL = "real"
28+
ATTRIBUTE = "attribute"
29+
NAME = "name"
30+
ORIGINAL_PATH = "original_path"
31+
REL_PATH = "rel_path"
32+
33+
34+
class PolygonType(StrEnum):
35+
SIMULATED = "simulated"
36+
HAZARDUOUS_BOUNDARY = "hazarduous_boundary"
37+
CONTAINMENT_BOUNDARY = "containment_boundary"
38+
39+
40+
class ProviderImplFile(EnsemblePolygonProvider):
41+
def __init__(
42+
self,
43+
provider_id: str,
44+
provider_dir: Path,
45+
polygon_inventory_df: pd.DataFrame,
46+
) -> None:
47+
self._provider_id = provider_id
48+
self._provider_dir = provider_dir
49+
self._inventory_df = polygon_inventory_df
50+
51+
@staticmethod
52+
# pylint: disable=too-many-locals
53+
def write_backing_store(
54+
storage_dir: Path,
55+
storage_key: str,
56+
sim_polygons: List[PolygonsFileInfo],
57+
) -> None:
58+
timer = PerfTimer()
59+
60+
# All data for this provider will be stored inside a sub-directory
61+
# given by the storage key
62+
provider_dir = storage_dir / storage_key
63+
LOGGER.debug(f"Writing polygon backing store to: {provider_dir}")
64+
provider_dir.mkdir(parents=True, exist_ok=True)
65+
(provider_dir / REL_SIM_DIR).mkdir(parents=True, exist_ok=True)
66+
67+
type_arr: List[PolygonType] = []
68+
real_arr: List[int] = []
69+
attribute_arr: List[str] = []
70+
name_arr: List[str] = []
71+
rel_path_arr: List[str] = []
72+
original_path_arr: List[str] = []
73+
74+
for polygon_info in sim_polygons:
75+
rel_path_in_store = _compose_rel_sim_polygons_path(
76+
real=polygon_info.real,
77+
attribute=polygon_info.attribute,
78+
name=polygon_info.name,
79+
extension=Path(polygon_info.path).suffix,
80+
)
81+
type_arr.append(PolygonType.SIMULATED)
82+
real_arr.append(polygon_info.real)
83+
attribute_arr.append(polygon_info.attribute)
84+
name_arr.append(polygon_info.name)
85+
rel_path_arr.append(str(rel_path_in_store))
86+
original_path_arr.append(polygon_info.path)
87+
88+
LOGGER.debug(f"Copying {len(original_path_arr)} polygons into backing store...")
89+
timer.lap_s()
90+
_copy_polygons_into_provider_dir(original_path_arr, rel_path_arr, provider_dir)
91+
et_copy_s = timer.lap_s()
92+
93+
polygons_inventory_df = pd.DataFrame(
94+
{
95+
Col.TYPE: type_arr,
96+
Col.REAL: real_arr,
97+
Col.ATTRIBUTE: attribute_arr,
98+
Col.NAME: name_arr,
99+
Col.REL_PATH: rel_path_arr,
100+
Col.ORIGINAL_PATH: original_path_arr,
101+
}
102+
)
103+
104+
parquet_file_name = provider_dir / "polygons_inventory.parquet"
105+
polygons_inventory_df.to_parquet(path=parquet_file_name)
106+
107+
LOGGER.debug(
108+
f"Wrote polygon backing store in: {timer.elapsed_s():.2f}s ("
109+
f"copy={et_copy_s:.2f}s)"
110+
)
111+
112+
@staticmethod
113+
def from_backing_store(
114+
storage_dir: Path,
115+
storage_key: str,
116+
) -> Optional["ProviderImplFile"]:
117+
provider_dir = storage_dir / storage_key
118+
parquet_file_name = provider_dir / "polygons_inventory.parquet"
119+
120+
try:
121+
polygons_inventory_df = pd.read_parquet(path=parquet_file_name)
122+
return ProviderImplFile(storage_key, provider_dir, polygons_inventory_df)
123+
except FileNotFoundError:
124+
return None
125+
126+
def provider_id(self) -> str:
127+
return self._provider_id
128+
129+
def attributes(self) -> List[str]:
130+
return sorted(list(self._inventory_df[Col.ATTRIBUTE].unique()))
131+
132+
def fault_polygons_names_for_attribute(self, polygons_attribute: str) -> List[str]:
133+
return sorted(
134+
list(
135+
self._inventory_df.loc[
136+
self._inventory_df[Col.ATTRIBUTE] == polygons_attribute
137+
][Col.NAME].unique()
138+
)
139+
)
140+
141+
def realizations(self) -> List[int]:
142+
unique_reals = self._inventory_df[Col.REAL].unique()
143+
144+
# Sort and strip out any entries with real == -1
145+
return sorted([r for r in unique_reals if r >= 0])
146+
147+
def get_polygons(
148+
self,
149+
address: PolygonsAddress,
150+
) -> Optional[xtgeo.Polygons]:
151+
if isinstance(address, SimulatedPolygonsAddress):
152+
return self._get_simulated_polygons(address)
153+
154+
raise TypeError("Unknown type of fault polygons address")
155+
156+
def _get_simulated_polygons(
157+
self, address: SimulatedPolygonsAddress
158+
) -> Optional[xtgeo.Polygons]:
159+
"""Returns a Xtgeo fault polygons instance of a single realization fault polygons"""
160+
161+
timer = PerfTimer()
162+
163+
polygons_fns: List[Path] = self._locate_simulated_polygons(
164+
attribute=address.attribute,
165+
name=address.name,
166+
realizations=[address.realization],
167+
)
168+
169+
if len(polygons_fns) == 0:
170+
LOGGER.warning(f"No simulated polygons found for {address}")
171+
return None
172+
if len(polygons_fns) > 1:
173+
LOGGER.warning(
174+
f"Multiple simulated polygonss found for: {address}"
175+
"Returning first fault polygons."
176+
)
177+
178+
if polygons_fns[0].suffix == ".csv":
179+
polygons = xtgeo.Polygons(pd.read_csv(polygons_fns[0]))
180+
else:
181+
polygons = xtgeo.polygons_from_file(polygons_fns[0])
182+
183+
LOGGER.debug(f"Loaded simulated fault polygons in: {timer.elapsed_s():.2f}s")
184+
185+
return polygons
186+
187+
def _locate_simulated_polygons(
188+
self, attribute: str, name: str, realizations: List[int]
189+
) -> List[Path]:
190+
"""Returns list of file names matching the specified filter criteria"""
191+
df = self._inventory_df.loc[
192+
self._inventory_df[Col.TYPE] == PolygonType.SIMULATED
193+
]
194+
195+
df = df.loc[
196+
(df[Col.ATTRIBUTE] == attribute)
197+
& (df[Col.NAME] == name)
198+
& (df[Col.REAL].isin(realizations))
199+
]
200+
201+
return [self._provider_dir / rel_path for rel_path in df[Col.REL_PATH]]
202+
203+
204+
def _copy_polygons_into_provider_dir(
205+
original_path_arr: List[str],
206+
rel_path_arr: List[str],
207+
provider_dir: Path,
208+
) -> None:
209+
for src_path, dst_rel_path in zip(original_path_arr, rel_path_arr):
210+
# LOGGER.debug(f"copying fault polygons from: {src_path}")
211+
shutil.copyfile(src_path, provider_dir / dst_rel_path)
212+
213+
# full_dst_path_arr = [storage_dir / dst_rel_path for dst_rel_path in store_path_arr]
214+
# with ProcessPoolExecutor() as executor:
215+
# executor.map(shutil.copyfile, original_path_arr, full_dst_path_arr)
216+
217+
218+
def _compose_rel_sim_polygons_path(
219+
real: int,
220+
attribute: str,
221+
name: str,
222+
extension: str,
223+
) -> Path:
224+
"""Compose path to simulated fault polygons file, relative to provider's directory"""
225+
fname = f"{real}--{name}--{attribute}{extension}"
226+
return Path(REL_SIM_DIR) / fname
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import abc
2+
from dataclasses import dataclass
3+
from typing import List, Optional
4+
5+
import xtgeo
6+
7+
8+
@dataclass(frozen=True)
9+
class SimulatedPolygonsAddress:
10+
"""Specifies a unique simulated polygon set for a given ensemble realization"""
11+
12+
attribute: str
13+
name: str
14+
realization: int
15+
16+
17+
# Type aliases used for signature readability
18+
PolygonsAddress = SimulatedPolygonsAddress
19+
20+
21+
# Class provides data for ensemble surfaces
22+
class EnsemblePolygonProvider(abc.ABC):
23+
@abc.abstractmethod
24+
def provider_id(self) -> str:
25+
"""Returns string ID of the provider."""
26+
27+
@abc.abstractmethod
28+
def attributes(self) -> List[str]:
29+
"""Returns list of all available attributes."""
30+
31+
@abc.abstractmethod
32+
def realizations(self) -> List[int]:
33+
"""Returns list of all available realizations."""
34+
35+
@abc.abstractmethod
36+
def get_polygons(
37+
self,
38+
address: PolygonsAddress,
39+
) -> Optional[xtgeo.Polygons]:
40+
"""Returns fault polygons for a given fault polygons address"""
41+
42+
# @abc.abstractmethod
43+
# def get_surface_bounds(self, surface: EnsembleSurfaceContext) -> List[float]:
44+
# """Returns the bounds for a surface [xmin,ymin, xmax,ymax]"""
45+
46+
# @abc.abstractmethod
47+
# def get_surface_value_range(self, surface: EnsembleSurfaceContext) -> List[float]:
48+
# """Returns the value range for a given surface context [zmin, zmax]"""
49+
50+
# @abc.abstractmethod
51+
# def get_surface_as_rgba(self, surface: EnsembleSurfaceContext) -> io.BytesIO:
52+
# """Returns surface as a greyscale png RGBA with encoded elevation values
53+
# in a bytestream"""

0 commit comments

Comments
 (0)