API Reference
Timeseries Objects
This object is the core of interacting with timeseries data in pandss.
RegularTimeseries
dataclass
A regular timeseries within a DSS file.
Attributes: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
@dataclass(
kw_only=True,
eq=True,
slots=True,
)
class RegularTimeseries:
"""A regular timeseries within a DSS file.
Attributes
-------
path: DatasetPath | str
The A-F path that the data had in the originating DSS file
values: NDArray
The timeseries data
dates: NDArray[datetime64]
Rendered dates, the alignment of which depends on the interval, and the
first date in the timeseries. Different Engines align dates differently
period_type: str
The DSS period type the data had in the originating DSS file.
units: str
The units of the timeseries data
interval: Interval
The time interval in seconds between data in the DSS file.
"""
path: DatasetPath | str
values: NDArray
dates: NDArray[datetime64]
period_type: str
units: str
interval: Interval
def __post_init__(self):
if not isinstance(self.path, DatasetPath):
self.path = DatasetPath.from_str(self.path)
if not isinstance(self.dates, ndarray):
self.dates = array(self.dates, dtype=datetime64)
if not isinstance(self.values, ndarray):
self.values = array(self.values)
if not isinstance(self.interval, Interval):
self.interval = Interval(self.interval)
def __str__(self) -> str:
return f"{self.__class__.__name__}(path={str(self.path)}, len={len(self)})"
def __len__(self) -> int:
"""The size of the data in the RegularTimeseries.
Returns
-------
int
The length of `self.values`
"""
return len(self.values)
def __eq__(self, __other: object) -> bool:
"""Compare whether or not two `RegularTimeseries` are equal.
Compares all fields in the dataclass, and fails equality if any are not exactly
equal.
Parameters
----------
__other : object
The other object to compare to.
Returns
-------
bool
Whether or not the two objects are equal.
"""
if not isinstance(__other, self.__class__):
return False
for f in fields(self):
if not hasattr(__other, f.name):
return False
elif hasattr(getattr(self, f.name), "__iter__"):
for left, right in zip(getattr(self, f.name), getattr(__other, f.name)):
if left != right:
return False
elif getattr(self, f.name) != getattr(__other, f.name):
return False
return True
def __add__(self, __other: Self) -> Self:
kwargs = self._do_arithmetic(__other, "__add__")
return RegularTimeseries(**kwargs)
def _do_arithmetic(self, __other: Self, method_name: str) -> dict:
"""Perform the arithmetic on two `RegularTimeseries` objects.
The operations are performed accordingly:
- interval: No change, must be identical left and right
- period_type: No change, must be identical left and right
- units: No change, must be identical left and right
- path: Combined part by part, where identical parts are not changed, and
differing parts are concatenated
- dates: Intersected with __other.dates
- values: The arithmatic is done on the subset of values selected using the same
intersection used for dates
Parameters
----------
__other : Self
The other object to use when doing arithmetic.
method_name : str
One of `__add__`, `__sub__`, or other numeric dunders
Returns
-------
dict
The kwargs to use by `__init__` of the objects class
Raises
------
ValueError
Raised if the two objects are not the same type
ValueError
Raised if certain attributes do not match as required
"""
CONCAT_KEY = {"__add__": "+", "__sub__": "-"}
concat_char = CONCAT_KEY[method_name]
# Validate action
if not isinstance(__other, self.__class__):
raise ValueError(
f"Cannot perform arithmetic {self.__class__.__name__} "
+ f"with {type(__other)}"
)
for attr in ("interval", "period_type", "units"):
s = getattr(self, attr)
o = getattr(__other, attr)
if s != o:
raise ValueError(f"Cannot add differing {attr}: {s}, {o}")
# Get kwargs for new instance
# path
new_path_kwargs = dict()
for part in ("a", "b", "c", "d", "e", "f"):
part_self = getattr(self.path, part)
part_other = getattr(__other.path, part)
if part_self == part_other:
new_path_kwargs[part] = part_self
else:
new_path_kwargs[part] = f"{part_self}{concat_char}{part_other}"
if self.path == __other.path: # Rare case of adding identical paths
new_path_kwargs["b"] = f"{self.path.b}{concat_char}{__other.path.b}"
new_path = DatasetPath(**new_path_kwargs)
# dates
new_dates = intersect1d(self.dates, __other.dates)
# values
mask_left = [date in new_dates for date in self.dates]
values_left = self.values[mask_left]
mask_right = [date in new_dates for date in __other.dates]
values_right = __other.values[mask_right]
method = getattr(values_left, method_name)
new_values = method(values_right)
kwargs = dict(
path=new_path,
values=new_values,
dates=new_dates,
units=self.units,
period_type=self.period_type,
interval=self.interval,
)
return kwargs
def update(self, **kwargs) -> Self:
"""Update an attribute on the object, creating a new one in the process
Returns
-------
Self
A RegularTimseries object
Raises
------
ValueError
Raised if the length of the values and dates arrays don't match
after updating
"""
values = kwargs.get("values", None)
dates = kwargs.get("dates", None)
if values or dates:
if values is None:
values = self.values
if dates is None:
dates = self.dates
if len(values) != len(dates):
raise ValueError(
"new values/dates must match length:\n"
+ f"\t{len(values)=}\n"
+ f"\t{len(dates)=}"
)
new_obj_kwargs = {f.name: deepcopy(getattr(self, f.name)) for f in fields(self)}
new_obj_kwargs.update(**kwargs)
return self.__class__(**new_obj_kwargs)
def to_frame(self) -> DataFrame:
"""Create a `pandas.DataFrame` from the `RegularTimeseries`
Returns
-------
DataFrame
The `DataFrame`, indexed by dates, with column names of: `A`-`F`, `UNITS`,
`PERIOD_TYPE`, `INTERVAL`
"""
header = dict(self.path.items())
header["UNITS"] = self.units
header["PERIOD_TYPE"] = self.period_type
header["INTERVAL"] = str(self.interval)
header = {k.upper(): (v,) for k, v in header.items()}
columns = MultiIndex.from_arrays(
tuple(header.values()), names=tuple(header.keys())
)
df = DataFrame(
index=self.dates,
data=self.values,
columns=columns,
)
return df
def to_json(self) -> dict:
"""Create a JSON-compliant dictionary with the RegularTimeseries data
Returns
-------
dict
The JSON-compliant dictionary
Raises
------
AttributeError
Raised if unrecognized fields are present in the object, protects against
converting subclasses naievely.
"""
json_obj = dict()
str_encode = ("path", "period_type", "units", "interval")
tuple_encode = {"values": float, "dates": datetime_as_string}
for f in fields(self):
if f.name in str_encode:
json_obj[f.name] = str(getattr(self, f.name))
elif f.name in tuple_encode:
encoder = tuple_encode[f.name]
json_obj[f.name] = tuple(encoder(i) for i in getattr(self, f.name))
else:
raise AttributeError(
f"unrecognized field `{f}`, cannot encode {self.__class__} to JSON."
)
return json_obj
@classmethod
def from_json(cls, obj: dict) -> Self:
"""Create a RegularTimeseries from a JSON-compliant dictionary
Extra data in the dictionary is ignored
Parameters
----------
obj : dict
A JSON-compliant dictionary
Returns
-------
RegularTimeseries
The object with data corresponding to the info in the dictionary
Raises
------
ValueError
Raised if attributes are missing in the dictionary
"""
missing = list()
for f in fields(cls):
if f.name not in obj:
missing.append(f.name)
if missing:
raise ValueError(f"missing the following attributes in JSON obj: {missing}")
decoders = {
"path": DatasetPath.from_str,
"values": array,
"dates": decode_json_date_array,
}
kwargs = dict()
for f in fields(cls):
decoder = decoders.get(f.name, str)
kwargs[f.name] = decoder(obj.get(f.name))
return cls(**kwargs)
__len__()
The size of the data in the RegularTimeseries.
Returns: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
def __len__(self) -> int:
"""The size of the data in the RegularTimeseries.
Returns
-------
int
The length of `self.values`
"""
return len(self.values)
__eq__(__other)
Compare whether or not two RegularTimeseries
are equal.
Compares all fields in the dataclass, and fails equality if any are not exactly equal.
Parameters: |
|
---|
Returns: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
def __eq__(self, __other: object) -> bool:
"""Compare whether or not two `RegularTimeseries` are equal.
Compares all fields in the dataclass, and fails equality if any are not exactly
equal.
Parameters
----------
__other : object
The other object to compare to.
Returns
-------
bool
Whether or not the two objects are equal.
"""
if not isinstance(__other, self.__class__):
return False
for f in fields(self):
if not hasattr(__other, f.name):
return False
elif hasattr(getattr(self, f.name), "__iter__"):
for left, right in zip(getattr(self, f.name), getattr(__other, f.name)):
if left != right:
return False
elif getattr(self, f.name) != getattr(__other, f.name):
return False
return True
_do_arithmetic(__other, method_name)
Perform the arithmetic on two RegularTimeseries
objects.
The operations are performed accordingly: - interval: No change, must be identical left and right - period_type: No change, must be identical left and right - units: No change, must be identical left and right - path: Combined part by part, where identical parts are not changed, and differing parts are concatenated - dates: Intersected with __other.dates - values: The arithmatic is done on the subset of values selected using the same intersection used for dates
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
def _do_arithmetic(self, __other: Self, method_name: str) -> dict:
"""Perform the arithmetic on two `RegularTimeseries` objects.
The operations are performed accordingly:
- interval: No change, must be identical left and right
- period_type: No change, must be identical left and right
- units: No change, must be identical left and right
- path: Combined part by part, where identical parts are not changed, and
differing parts are concatenated
- dates: Intersected with __other.dates
- values: The arithmatic is done on the subset of values selected using the same
intersection used for dates
Parameters
----------
__other : Self
The other object to use when doing arithmetic.
method_name : str
One of `__add__`, `__sub__`, or other numeric dunders
Returns
-------
dict
The kwargs to use by `__init__` of the objects class
Raises
------
ValueError
Raised if the two objects are not the same type
ValueError
Raised if certain attributes do not match as required
"""
CONCAT_KEY = {"__add__": "+", "__sub__": "-"}
concat_char = CONCAT_KEY[method_name]
# Validate action
if not isinstance(__other, self.__class__):
raise ValueError(
f"Cannot perform arithmetic {self.__class__.__name__} "
+ f"with {type(__other)}"
)
for attr in ("interval", "period_type", "units"):
s = getattr(self, attr)
o = getattr(__other, attr)
if s != o:
raise ValueError(f"Cannot add differing {attr}: {s}, {o}")
# Get kwargs for new instance
# path
new_path_kwargs = dict()
for part in ("a", "b", "c", "d", "e", "f"):
part_self = getattr(self.path, part)
part_other = getattr(__other.path, part)
if part_self == part_other:
new_path_kwargs[part] = part_self
else:
new_path_kwargs[part] = f"{part_self}{concat_char}{part_other}"
if self.path == __other.path: # Rare case of adding identical paths
new_path_kwargs["b"] = f"{self.path.b}{concat_char}{__other.path.b}"
new_path = DatasetPath(**new_path_kwargs)
# dates
new_dates = intersect1d(self.dates, __other.dates)
# values
mask_left = [date in new_dates for date in self.dates]
values_left = self.values[mask_left]
mask_right = [date in new_dates for date in __other.dates]
values_right = __other.values[mask_right]
method = getattr(values_left, method_name)
new_values = method(values_right)
kwargs = dict(
path=new_path,
values=new_values,
dates=new_dates,
units=self.units,
period_type=self.period_type,
interval=self.interval,
)
return kwargs
update(**kwargs)
Update an attribute on the object, creating a new one in the process
Returns: |
|
---|
Raises: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
def update(self, **kwargs) -> Self:
"""Update an attribute on the object, creating a new one in the process
Returns
-------
Self
A RegularTimseries object
Raises
------
ValueError
Raised if the length of the values and dates arrays don't match
after updating
"""
values = kwargs.get("values", None)
dates = kwargs.get("dates", None)
if values or dates:
if values is None:
values = self.values
if dates is None:
dates = self.dates
if len(values) != len(dates):
raise ValueError(
"new values/dates must match length:\n"
+ f"\t{len(values)=}\n"
+ f"\t{len(dates)=}"
)
new_obj_kwargs = {f.name: deepcopy(getattr(self, f.name)) for f in fields(self)}
new_obj_kwargs.update(**kwargs)
return self.__class__(**new_obj_kwargs)
to_frame()
Create a pandas.DataFrame
from the RegularTimeseries
Returns: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
def to_frame(self) -> DataFrame:
"""Create a `pandas.DataFrame` from the `RegularTimeseries`
Returns
-------
DataFrame
The `DataFrame`, indexed by dates, with column names of: `A`-`F`, `UNITS`,
`PERIOD_TYPE`, `INTERVAL`
"""
header = dict(self.path.items())
header["UNITS"] = self.units
header["PERIOD_TYPE"] = self.period_type
header["INTERVAL"] = str(self.interval)
header = {k.upper(): (v,) for k, v in header.items()}
columns = MultiIndex.from_arrays(
tuple(header.values()), names=tuple(header.keys())
)
df = DataFrame(
index=self.dates,
data=self.values,
columns=columns,
)
return df
to_json()
Create a JSON-compliant dictionary with the RegularTimeseries data
Returns: |
|
---|
Raises: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
def to_json(self) -> dict:
"""Create a JSON-compliant dictionary with the RegularTimeseries data
Returns
-------
dict
The JSON-compliant dictionary
Raises
------
AttributeError
Raised if unrecognized fields are present in the object, protects against
converting subclasses naievely.
"""
json_obj = dict()
str_encode = ("path", "period_type", "units", "interval")
tuple_encode = {"values": float, "dates": datetime_as_string}
for f in fields(self):
if f.name in str_encode:
json_obj[f.name] = str(getattr(self, f.name))
elif f.name in tuple_encode:
encoder = tuple_encode[f.name]
json_obj[f.name] = tuple(encoder(i) for i in getattr(self, f.name))
else:
raise AttributeError(
f"unrecognized field `{f}`, cannot encode {self.__class__} to JSON."
)
return json_obj
from_json(obj)
classmethod
Create a RegularTimeseries from a JSON-compliant dictionary
Extra data in the dictionary is ignored
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
Source code in src\pandss\timeseries\regular_timeseries.py
@classmethod
def from_json(cls, obj: dict) -> Self:
"""Create a RegularTimeseries from a JSON-compliant dictionary
Extra data in the dictionary is ignored
Parameters
----------
obj : dict
A JSON-compliant dictionary
Returns
-------
RegularTimeseries
The object with data corresponding to the info in the dictionary
Raises
------
ValueError
Raised if attributes are missing in the dictionary
"""
missing = list()
for f in fields(cls):
if f.name not in obj:
missing.append(f.name)
if missing:
raise ValueError(f"missing the following attributes in JSON obj: {missing}")
decoders = {
"path": DatasetPath.from_str,
"values": array,
"dates": decode_json_date_array,
}
kwargs = dict()
for f in fields(cls):
decoder = decoders.get(f.name, str)
kwargs[f.name] = decoder(obj.get(f.name))
return cls(**kwargs)
DSS, and Catalog Objects
This object represents a *.dss
file on disk, and handles the IO for pandss
.
DSS
Class representing an open DSS file. Binds to various other python based HEC-DSS file readers through an "engine". The Engine classes wrap the other libraries, creating one API that this class uses.
Parameters: |
|
---|
Source code in src\pandss\dss.py
class DSS:
"""Class representing an open DSS file. Binds to various other python based
HEC-DSS file readers through an "engine". The Engine classes wrap the other
libraries, creating one API that this class uses.
Parameters
----------
src: pathlib.Path
The path to the DSS file on disk.
engine: str | EngineABC, default "pyhecdss"
The engine object that handles the DSS interactions. Available engines:
- pyhecdss
- pydsstools
"""
__slots__ = ["src", "engine", "_opened"]
def __init__(self, src: str | Path, engine: str | EngineABC = None):
if engine is None:
engine = module_engine.selected
self.src: Path = Path(src).resolve()
if isinstance(engine, str):
engine = get_engine(engine)
elif not isinstance(engine, EngineABC):
raise ValueError(f"engine type not recognized: {type(engine)=}")
logging.debug(f"using engine {engine}")
self.engine: EngineABC = engine(self.src)
self._opened = 0
def __str__(self) -> str:
return f"{self.__class__.__name__}({self.src})"
@silent
def __enter__(self):
"""Wraps Engine class `open` and enables the use of engine classes in
pythons context manager pattern. Correspondingly, `DSS.__close__()`
wraps Engine class `close`.
```
with DSS(path_to_dss_file) as DSS_File:
# read/write data in DSS file
cat = DSS_File.read_catalog()
# Engine.close() automatically called.
```
Returns
-------
self
The open DSS file.
"""
if self._opened <= 0:
logging.debug(f"opening dss file {self.src}")
self.engine.open()
self._opened += 1
return self
@silent
def __exit__(self, exc_type, exc_inst, traceback):
self._opened += -1
if self._opened <= 0:
logging.debug(f"closing dss file {self.src}")
self.engine.close()
self._opened = 0
def read_catalog(self, drop_date: bool = False) -> Catalog:
"""Read the Catalog of the open DSS file.
The Catalog will contain all the DatasetPath objects present in the DSS
file.
Parameters
----------
drop_date : bool, optional
If True, treat all paths as if they did not have a D part, by default False
Returns
-------
Catalog
A pandss.Catalog object for the DSS file
"""
logging.debug(f"reading catalog, {self.src=}")
with suppress_stdout_stderr():
catalog = self.engine.read_catalog()
if drop_date:
catalog = catalog.collapse_dates()
logging.debug(f"catalog read, size is {len(catalog)}")
return catalog
def read_rts(
self,
path: DatasetPath | str,
expect_single: bool = True,
drop_date: bool = True,
) -> RegularTimeseries:
"""Read a RegularTimeseries from a DSS file.
Parameters
----------
path : DatasetPath | str
The A-F path of the data in the DSS, may contain wildcards
expect_single : bool, optional
Whether or not to expect a single entry and error on unexpected result, by
default True
drop_date : bool, optional
If True, treat all paths as if they did not have a D part, by default True
Returns
-------
RegularTimeseries
The RegularTimeseries data stored in the DSS file.
Raises
------
UnexpectedDSSReturn
Raised if `expect_single` is True, and multiple paths were matched.
WildcardError
Raised if `expect_single` is False, and the path given contains wildcards.
"""
logging.debug(f"reading regular time series, {path}")
if isinstance(path, str):
path = DatasetPath.from_str(path)
if path.has_wildcard:
if expect_single:
rtss = tuple(self.read_multiple_rts(path, drop_date))
if len(rtss) != 1:
raise UnexpectedDSSReturn(
f"expected {path} to resolve to single path, "
+ f"DSS returned {len(rtss)} items."
)
else:
return rtss[0]
else:
raise WildcardError(
f"path has wildcard, use `read_multiple_rts` method, {path=}"
)
with suppress_stdout_stderr():
return self.engine.read_rts(path)
def read_multiple_rts(
self,
paths: DatasetPath | DatasetPathCollection,
drop_date: bool = True,
) -> Iterator[RegularTimeseries]:
"""Iteratively read multiple RegularTimeseries.
Parameters
----------
paths : DatasetPath | DatasetPathCollection
The A-F path of the data in the DSS, may contain wildcards
drop_date : bool, optional
If True, treat all paths as if they did not have a D part, by default True
Yields
------
Iterator[RegularTimeseries]
An iterator that yields the found RegularTimeseries objects
Raises
------
ValueError
Raised if the `paths` argument isn't the correct type.
"""
if hasattr(self.engine, "read_multiple_rts"):
yield from self.engine.read_multiple_rts(paths, drop_date)
else: # If the engine doesn't optimize this, we can just iterate one at a time
# If passed a single path, check for wildcard that might expand it
if isinstance(paths, DatasetPath):
if paths.has_wildcard:
paths = self.resolve_wildcard(paths)
else:
logging.debug(
"`read_multiple_rts` called with only one path,"
+ " path contains no wildcards to expand"
)
paths = DatasetPathCollection(paths={paths})
elif isinstance(paths, DatasetPathCollection):
# If passed multple paths, expand any of them with wildcards
if any(p.has_wildcard for p in paths):
resolved = set()
for p in paths:
resolved = resolved | self.resolve_wildcard(p)
paths = resolved
elif hasattr(paths, "__iter__"):
try:
paths = DatasetPathCollection(paths={p for p in paths})
except Exception:
raise ValueError(
"paths must be given as DatasetPath or DatasetPathCollection"
+ " so wildcards can be correctly resolved, "
+ f"paths given as {type(paths)}"
)
else:
raise ValueError(
"paths must be given as DatasetPath or DatasetPathCollection"
+ " so wildcards can be correctly resolved, "
+ f"paths given as {type(paths)}"
)
# When expanding wildcards, paths might be specific to a single chunk,
# use the special method here to re-combine the paths (combine D-parts)
if drop_date is True:
paths = paths.collapse_dates()
# Read each individually
for p in paths:
yield self.read_rts(p)
def write_rts(self, path: DatasetPath | str, rts: RegularTimeseries):
"""Write a RegularTimeseries to a DSS file.
Parameters
----------
path : DatasetPath | str
The A-F path to write into the DSS file
rts : RegularTimeseries
The RegularTimeseries object containing the data to be written
Raises
------
WildcardError
Raised if the `path` argument contains wildcards.
"""
if isinstance(path, str):
path = DatasetPath.from_str(path)
logging.debug(f"writing regular time series, {path}")
if path.has_wildcard:
raise WildcardError(f"cannot write to path with non-date wildcard, {path=}")
with suppress_stdout_stderr():
return self.engine.write_rts(path, rts)
def resolve_wildcard(
self,
path: DatasetPath | str,
drop_date: bool = False,
) -> DatasetPathCollection:
"""Search the DSS for DatasetPaths that match the `path` argument.
Parameters
----------
path : DatasetPath | str
The path with wildcards to match in the DSS file
drop_date : bool, optional
If True, treat paths as if the D part does not exists, by default False
Returns
-------
DatasetPathCollection
The collection of paths that were matched
"""
if isinstance(path, str):
path = DatasetPath.from_str(path)
logging.debug("resolving wildcards")
if not path.has_wildcard:
return DatasetPathCollection(paths={path})
if self.engine.catalog is None:
self.engine.read_catalog()
collection = self.engine.catalog.resolve_wildcard(path)
if drop_date:
collection = collection.collapse_dates()
return collection
@property
def is_open(self) -> bool:
"""Whether or not the DSS is currently open."""
return self.engine.is_open
@property
def catalog(self) -> Catalog:
"""The `Catalog` of the DSS file."""
return self.engine.catalog
is_open: bool
property
Whether or not the DSS is currently open.
catalog: Catalog
property
The Catalog
of the DSS file.
__enter__()
Wraps Engine class open
and enables the use of engine classes in
pythons context manager pattern. Correspondingly, DSS.__close__()
wraps Engine class close
.
with DSS(path_to_dss_file) as DSS_File:
# read/write data in DSS file
cat = DSS_File.read_catalog()
# Engine.close() automatically called.
Returns: |
|
---|
Source code in src\pandss\dss.py
@silent
def __enter__(self):
"""Wraps Engine class `open` and enables the use of engine classes in
pythons context manager pattern. Correspondingly, `DSS.__close__()`
wraps Engine class `close`.
```
with DSS(path_to_dss_file) as DSS_File:
# read/write data in DSS file
cat = DSS_File.read_catalog()
# Engine.close() automatically called.
```
Returns
-------
self
The open DSS file.
"""
if self._opened <= 0:
logging.debug(f"opening dss file {self.src}")
self.engine.open()
self._opened += 1
return self
read_catalog(drop_date=False)
Read the Catalog of the open DSS file.
The Catalog will contain all the DatasetPath objects present in the DSS file.
Parameters: |
|
---|
Returns: |
|
---|
Source code in src\pandss\dss.py
def read_catalog(self, drop_date: bool = False) -> Catalog:
"""Read the Catalog of the open DSS file.
The Catalog will contain all the DatasetPath objects present in the DSS
file.
Parameters
----------
drop_date : bool, optional
If True, treat all paths as if they did not have a D part, by default False
Returns
-------
Catalog
A pandss.Catalog object for the DSS file
"""
logging.debug(f"reading catalog, {self.src=}")
with suppress_stdout_stderr():
catalog = self.engine.read_catalog()
if drop_date:
catalog = catalog.collapse_dates()
logging.debug(f"catalog read, size is {len(catalog)}")
return catalog
read_rts(path, expect_single=True, drop_date=True)
Read a RegularTimeseries from a DSS file.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
Source code in src\pandss\dss.py
def read_rts(
self,
path: DatasetPath | str,
expect_single: bool = True,
drop_date: bool = True,
) -> RegularTimeseries:
"""Read a RegularTimeseries from a DSS file.
Parameters
----------
path : DatasetPath | str
The A-F path of the data in the DSS, may contain wildcards
expect_single : bool, optional
Whether or not to expect a single entry and error on unexpected result, by
default True
drop_date : bool, optional
If True, treat all paths as if they did not have a D part, by default True
Returns
-------
RegularTimeseries
The RegularTimeseries data stored in the DSS file.
Raises
------
UnexpectedDSSReturn
Raised if `expect_single` is True, and multiple paths were matched.
WildcardError
Raised if `expect_single` is False, and the path given contains wildcards.
"""
logging.debug(f"reading regular time series, {path}")
if isinstance(path, str):
path = DatasetPath.from_str(path)
if path.has_wildcard:
if expect_single:
rtss = tuple(self.read_multiple_rts(path, drop_date))
if len(rtss) != 1:
raise UnexpectedDSSReturn(
f"expected {path} to resolve to single path, "
+ f"DSS returned {len(rtss)} items."
)
else:
return rtss[0]
else:
raise WildcardError(
f"path has wildcard, use `read_multiple_rts` method, {path=}"
)
with suppress_stdout_stderr():
return self.engine.read_rts(path)
read_multiple_rts(paths, drop_date=True)
Iteratively read multiple RegularTimeseries.
Parameters: |
|
---|
Yields: |
|
---|
Raises: |
|
---|
Source code in src\pandss\dss.py
def read_multiple_rts(
self,
paths: DatasetPath | DatasetPathCollection,
drop_date: bool = True,
) -> Iterator[RegularTimeseries]:
"""Iteratively read multiple RegularTimeseries.
Parameters
----------
paths : DatasetPath | DatasetPathCollection
The A-F path of the data in the DSS, may contain wildcards
drop_date : bool, optional
If True, treat all paths as if they did not have a D part, by default True
Yields
------
Iterator[RegularTimeseries]
An iterator that yields the found RegularTimeseries objects
Raises
------
ValueError
Raised if the `paths` argument isn't the correct type.
"""
if hasattr(self.engine, "read_multiple_rts"):
yield from self.engine.read_multiple_rts(paths, drop_date)
else: # If the engine doesn't optimize this, we can just iterate one at a time
# If passed a single path, check for wildcard that might expand it
if isinstance(paths, DatasetPath):
if paths.has_wildcard:
paths = self.resolve_wildcard(paths)
else:
logging.debug(
"`read_multiple_rts` called with only one path,"
+ " path contains no wildcards to expand"
)
paths = DatasetPathCollection(paths={paths})
elif isinstance(paths, DatasetPathCollection):
# If passed multple paths, expand any of them with wildcards
if any(p.has_wildcard for p in paths):
resolved = set()
for p in paths:
resolved = resolved | self.resolve_wildcard(p)
paths = resolved
elif hasattr(paths, "__iter__"):
try:
paths = DatasetPathCollection(paths={p for p in paths})
except Exception:
raise ValueError(
"paths must be given as DatasetPath or DatasetPathCollection"
+ " so wildcards can be correctly resolved, "
+ f"paths given as {type(paths)}"
)
else:
raise ValueError(
"paths must be given as DatasetPath or DatasetPathCollection"
+ " so wildcards can be correctly resolved, "
+ f"paths given as {type(paths)}"
)
# When expanding wildcards, paths might be specific to a single chunk,
# use the special method here to re-combine the paths (combine D-parts)
if drop_date is True:
paths = paths.collapse_dates()
# Read each individually
for p in paths:
yield self.read_rts(p)
write_rts(path, rts)
Write a RegularTimeseries to a DSS file.
Parameters: |
|
---|
Raises: |
|
---|
Source code in src\pandss\dss.py
def write_rts(self, path: DatasetPath | str, rts: RegularTimeseries):
"""Write a RegularTimeseries to a DSS file.
Parameters
----------
path : DatasetPath | str
The A-F path to write into the DSS file
rts : RegularTimeseries
The RegularTimeseries object containing the data to be written
Raises
------
WildcardError
Raised if the `path` argument contains wildcards.
"""
if isinstance(path, str):
path = DatasetPath.from_str(path)
logging.debug(f"writing regular time series, {path}")
if path.has_wildcard:
raise WildcardError(f"cannot write to path with non-date wildcard, {path=}")
with suppress_stdout_stderr():
return self.engine.write_rts(path, rts)
resolve_wildcard(path, drop_date=False)
Search the DSS for DatasetPaths that match the path
argument.
Parameters: |
|
---|
Returns: |
|
---|
Source code in src\pandss\dss.py
def resolve_wildcard(
self,
path: DatasetPath | str,
drop_date: bool = False,
) -> DatasetPathCollection:
"""Search the DSS for DatasetPaths that match the `path` argument.
Parameters
----------
path : DatasetPath | str
The path with wildcards to match in the DSS file
drop_date : bool, optional
If True, treat paths as if the D part does not exists, by default False
Returns
-------
DatasetPathCollection
The collection of paths that were matched
"""
if isinstance(path, str):
path = DatasetPath.from_str(path)
logging.debug("resolving wildcards")
if not path.has_wildcard:
return DatasetPathCollection(paths={path})
if self.engine.catalog is None:
self.engine.read_catalog()
collection = self.engine.catalog.resolve_wildcard(path)
if drop_date:
collection = collection.collapse_dates()
return collection
Catalog
dataclass
Bases: DatasetPathCollection
An unordered collection of pandss.DatasetPath
objects.
A Catalog
contains all of the A-F paths present in a DSS file. A Catalog
isn't
ususally initialized by the user, but is typically created by a pandss.DSS
object
using the pandss.DSS.read_catalog()
method.
Parameters: |
|
---|
Source code in src\pandss\catalog.py
@dataclass(
kw_only=True,
frozen=True,
slots=True,
eq=True,
)
class Catalog(DatasetPathCollection):
"""An unordered collection of `pandss.DatasetPath` objects.
A `Catalog` contains all of the A-F paths present in a DSS file. A `Catalog` isn't
ususally initialized by the user, but is typically created by a `pandss.DSS` object
using the `pandss.DSS.read_catalog()` method.
Parameters
----------
paths : set[DatasetPath]
The paths present in the DSS Catalog.
src: pathlib.Path
The path to the DSS file on disk.
"""
src: Path
@classmethod
def from_strs(cls, paths: list[str], src: Path) -> Self:
"""Create a `Catalog` from an iterable of strings"""
paths = set(DatasetPath.from_str(p) for p in paths)
if any(p.has_wildcard for p in paths):
raise ValueError(f"{cls.__name__} cannot be created with wildcard paths")
return cls(
paths=paths,
src=src,
)
@classmethod
def from_frame(cls, df: DataFrame, src: Path) -> Self:
"""Create a `Catalog` from a `DataFrame`.
Parameters
----------
df : DataFrame
The frame containing the paths to collect
src : Path
The path of the DSS file
Returns
-------
Catalog
The created object
Raises
------
ValueError
Raised if the `DataFrame` is missing required columns
WildcardError
Raised if the paths in the `DataFrame` contain wildcards
"""
df.columns = df.columns.str.lower()
missing = [c for c in ("a", "b", "c", "d", "e", "f") if c not in df.columns]
if missing:
raise ValueError(
f"DataFrame is misssing required columns: {missing}\n\t{df.sample(2)}"
)
df = df[["a", "b", "c", "d", "e", "f"]]
paths = set(DatasetPath(*row) for row in df.itertuples(index=False))
wild = [str(p) for p in paths if p.has_wildcard]
if wild:
wild_str = "\n\t".join(wild)
raise WildcardError(
f"{cls.__name__} cannot be created with wildcard paths:\n\t{wild_str}"
)
return cls(paths=paths, src=src)
def resolve_wildcard(self, path: DatasetPath) -> DatasetPathCollection:
return super(Catalog, self).resolve_wildcard(path)
def find(self, path: DatasetPath) -> DatasetPathCollection:
return self.resolve_wildcard(path)
from_strs(paths, src)
classmethod
Create a Catalog
from an iterable of strings
Source code in src\pandss\catalog.py
@classmethod
def from_strs(cls, paths: list[str], src: Path) -> Self:
"""Create a `Catalog` from an iterable of strings"""
paths = set(DatasetPath.from_str(p) for p in paths)
if any(p.has_wildcard for p in paths):
raise ValueError(f"{cls.__name__} cannot be created with wildcard paths")
return cls(
paths=paths,
src=src,
)
from_frame(df, src)
classmethod
Create a Catalog
from a DataFrame
.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
Source code in src\pandss\catalog.py
@classmethod
def from_frame(cls, df: DataFrame, src: Path) -> Self:
"""Create a `Catalog` from a `DataFrame`.
Parameters
----------
df : DataFrame
The frame containing the paths to collect
src : Path
The path of the DSS file
Returns
-------
Catalog
The created object
Raises
------
ValueError
Raised if the `DataFrame` is missing required columns
WildcardError
Raised if the paths in the `DataFrame` contain wildcards
"""
df.columns = df.columns.str.lower()
missing = [c for c in ("a", "b", "c", "d", "e", "f") if c not in df.columns]
if missing:
raise ValueError(
f"DataFrame is misssing required columns: {missing}\n\t{df.sample(2)}"
)
df = df[["a", "b", "c", "d", "e", "f"]]
paths = set(DatasetPath(*row) for row in df.itertuples(index=False))
wild = [str(p) for p in paths if p.has_wildcard]
if wild:
wild_str = "\n\t".join(wild)
raise WildcardError(
f"{cls.__name__} cannot be created with wildcard paths:\n\t{wild_str}"
)
return cls(paths=paths, src=src)
Path, and Path Collection Objects
Errors
WildcardError
Operation on path with wildcard is invalid.
FileVersionError
Version of DSS file is invalid for this operation.
DatasetNotFound
Dataset is not present in the DSS file.
DatasetPathParseError
DatasetPath could not be constructed from the given information.
ClosedDSSError
Operation attempted to access a closed DSS file.
UnexpectedDSSReturn
Operation returned an unexpected result from a DSS read operation.