"""SDMX Information Model (SDMX-IM).
This module implements many of the classes described in the SDMX-IM specification
('spec'), which is available from:
- https://sdmx.org/?page_id=5008
- https://sdmx.org/wp-content/uploads/
SDMX_2-1-1_SECTION_2_InformationModel_201108.pdf
Details of the implementation:
- Python typing and pydantic are used to enforce the types of attributes that reference
instances of other classes.
- Some classes have convenience attributes not mentioned in the spec, to ease navigation
between related objects. These are marked “:mod:`sdmx` extension not in the IM.”
- Class definitions are grouped by section of the spec, but these sections appear out
of order so that dependent classes are defined first.
"""
# TODO for complete implementation of the IM, enforce TimeKeyValue (instead of KeyValue)
# for {Generic,StructureSpecific} TimeSeriesDataSet.
import logging
from collections import ChainMap
from collections.abc import Collection
from collections.abc import Iterable as IterableABC
from copy import copy
from datetime import date, datetime, timedelta
from enum import Enum
from functools import lru_cache
from inspect import isclass
from itertools import product
from operator import attrgetter, itemgetter
from typing import (
Any,
Dict,
Generator,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
)
from warnings import warn
from sdmx.rest import Resource
from sdmx.util import (
BaseModel,
DictLike,
compare,
dictlike_field,
only,
validate_dictlike,
validator,
)
log = logging.getLogger(__name__)
# TODO read this from the environment, or use any value set in the SDMX-ML spec.
# Currently set to 'en' because test_dsd.py expects it.
DEFAULT_LOCALE = "en"
# §3.2: Base structures
[docs]class InternationalString:
"""SDMX-IM InternationalString.
SDMX-IM LocalisedString is not implemented. Instead, the 'localizations' is a
mapping where:
- keys correspond to the 'locale' property of LocalisedString.
- values correspond to the 'label' property of LocalisedString.
When used as a type hint with pydantic, InternationalString fields can be assigned
to in one of four ways::
class Foo(BaseModel):
name: InternationalString = InternationalString()
# Equivalent: no localizations
f = Foo()
f = Foo(name={})
# Using an explicit locale
f.name['en'] = "Foo's name in English"
# Using a (locale, label) tuple
f.name = ('fr', "Foo's name in French")
# Using a dict
f.name = {'en': "Replacement English name",
'fr': "Replacement French name"}
# Using a bare string, implicitly for the DEFAULT_LOCALE
f.name = "Name in DEFAULT_LOCALE language"
Only the first method preserves existing localizations; the latter three replace
them.
"""
localizations: Dict[str, str] = {}
def __init__(self, value=None, **kwargs):
super().__init__()
# Handle initial values according to type
if isinstance(value, str):
# Bare string
value = {DEFAULT_LOCALE: value}
elif (
isinstance(value, Collection)
and len(value) == 2
and isinstance(value[0], str)
):
# 2-tuple of str is (locale, label)
value = {value[0]: value[1]}
elif isinstance(value, dict):
# dict; use directly
pass
elif isinstance(value, IterableABC):
# Iterable of 2-tuples
value = {locale: label for (locale, label) in value}
elif value is None:
# Keyword arguments → dict, possibly empty
value = dict(kwargs)
else:
raise ValueError(value, kwargs)
self.localizations = value
# Convenience access
def __getitem__(self, locale):
return self.localizations[locale]
def __setitem__(self, locale, label):
self.localizations[locale] = label
# Duplicate of __getitem__, to pass existing tests in test_dsd.py
def __getattr__(self, name):
try:
return self.__dict__["localizations"][name]
except KeyError:
raise AttributeError(name) from None
def __add__(self, other):
result = copy(self)
result.localizations.update(other.localizations)
return result
[docs] def localized_default(self, locale=None):
"""Return the string in *locale*, or else the first defined."""
try:
return self.localizations[locale]
except KeyError:
if len(self.localizations):
# No label in the default locale; use the first stored value
return next(iter(self.localizations.values()))
else:
return ""
def __str__(self):
return self.localized_default(DEFAULT_LOCALE)
def __repr__(self):
return "\n".join(
["{}: {}".format(*kv) for kv in sorted(self.localizations.items())]
)
def __eq__(self, other):
try:
return self.localizations == other.localizations
except AttributeError:
return NotImplemented
@classmethod
def __get_validators__(cls):
yield cls.__validate
@classmethod
def __validate(cls, value, values, config, field):
# Any value that the constructor can handle can be assigned
if not isinstance(value, InternationalString):
value = InternationalString(value)
try:
# Update existing value
existing = values[field.name]
existing.localizations.update(value.localizations)
return existing
except KeyError:
# No existing value/None; return the assigned value
return value
[docs]class Annotation(BaseModel):
#: Can be used to disambiguate multiple annotations for one AnnotableArtefact.
id: Optional[str] = None
#: Title, used to identify an annotation.
title: Optional[str] = None
#: Specifies how the annotation is processed.
type: Optional[str] = None
#: A link to external descriptive text.
url: Optional[str] = None
#: Content of the annotation.
text: InternationalString = InternationalString()
[docs]class AnnotableArtefact(BaseModel):
#: :class:`Annotations <.Annotation>` of the object.
#:
#: :mod:`pandaSDMX` implementation: The IM does not specify the name of this
#: feature.
annotations: List[Annotation] = []
[docs] def get_annotation(self, **attrib):
"""Return a :class:`Annotation` with given `attrib`, e.g. 'id'.
If more than one `attrib` is given, all must match a particular annotation.
Raises
------
KeyError
If there is no matching annotation.
"""
for anno in self.annotations:
if all(getattr(anno, key, None) == value for key, value in attrib.items()):
return anno
raise KeyError(attrib)
[docs] def pop_annotation(self, **attrib):
"""Remove and return a :class:`Annotation` with given `attrib`, e.g. 'id'.
If more than one `attrib` is given, all must match a particular annotation.
Raises
------
KeyError
If there is no matching annotation.
"""
for i, anno in enumerate(self.annotations):
if all(getattr(anno, key, None) == value for key, value in attrib.items()):
return self.annotations.pop(i)
raise KeyError(attrib)
class _MissingID(str):
def __str__(self):
return "(missing id)"
def __eq__(self, other):
return isinstance(other, self.__class__)
MissingID = _MissingID()
[docs]class IdentifiableArtefact(AnnotableArtefact):
#: Unique identifier of the object.
id: str = MissingID
#: Universal resource identifier that may or may not be resolvable.
uri: Optional[str] = None
#: Universal resource name. For use in SDMX registries; all registered
#: objects have a URN.
urn: Optional[str] = None
urn_group: Dict = dict()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.urn:
import sdmx.urn
self.urn_group = sdmx.urn.match(self.urn)
try:
if self.id not in (self.urn_group["item_id"] or self.urn_group["id"]):
raise ValueError(f"ID {self.id} does not match URN {self.urn}")
except KeyError:
pass
def __eq__(self, other):
"""Equality comparison.
IdentifiableArtefacts can be compared to other instances. For convenience, a
string containing the object's ID is also equal to the object.
"""
if isinstance(other, self.__class__):
return self.id == other.id
elif isinstance(other, str):
return self.id == other
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two IdentifiableArtefacts are the same if they have the same :attr:`id`,
:attr:`uri`, and :attr:`urn`.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare`.
"""
return (
compare("id", self, other, strict)
and compare("uri", self, other, strict)
and compare("urn", self, other, strict)
)
def __hash__(self):
return id(self) if self.id == MissingID else hash(self.id)
def __lt__(self, other):
return (
self.id < other.id if isinstance(other, self.__class__) else NotImplemented
)
def __str__(self):
return self.id
def __repr__(self):
return f"<{self.__class__.__name__} {self.id}>"
[docs]class NameableArtefact(IdentifiableArtefact):
#: Multi-lingual name of the object.
name: InternationalString = InternationalString()
#: Multi-lingual description of the object.
description: InternationalString = InternationalString()
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two NameableArtefacts are the same if:
- :meth:`.IdentifiableArtefact.compare` is :obj:`True`, and
- they have the same :attr:`name` and :attr:`description`.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare` and :meth:`.IdentifiableArtefact.compare`.
"""
if not super().compare(other, strict):
pass
elif self.name != other.name:
log.debug(
f"Not identical: name <{repr(self.name)}> != <{repr(other.name)}>"
)
elif self.description != other.description:
log.debug(
f"Not identical: description <{repr(self.description)}> != "
f"<{repr(other.description)}>"
)
else:
return True
return False
def _repr_kw(self):
return dict(
cls=self.__class__.__name__,
id=self.id,
name=f": {self.name}" if len(self.name.localizations) else "",
)
def __repr__(self):
return "<{cls} {id}{name}>".format(**self._repr_kw())
[docs]class VersionableArtefact(NameableArtefact):
#: A version string following an agreed convention.
version: Optional[str] = None
#: Date from which the version is valid.
valid_from: Optional[str] = None
#: Date from which the version is superseded.
valid_to: Optional[str] = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
if self.version and self.version != self.urn_group["version"]:
raise ValueError(
f"Version {self.version} does not match URN {self.urn}"
)
else:
self.version = self.urn_group["version"]
except KeyError:
pass
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two VersionableArtefacts are the same if:
- :meth:`.NameableArtefact.compare` is :obj:`True`, and
- they have the same :attr:`version`.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare` and :meth:`.NameableArtefact.compare`.
"""
return super().compare(other, strict) and compare(
"version", self, other, strict
)
def _repr_kw(self) -> Mapping:
return ChainMap(
super()._repr_kw(),
dict(version=f"({self.version})" if self.version else ""),
)
[docs]class MaintainableArtefact(VersionableArtefact):
#: True if the object is final; otherwise it is in a draft state.
is_final: Optional[bool] = None
#: :obj:`True` if the content of the object is held externally; i.e., not
#: the current :class:`Message`.
is_external_reference: Optional[bool] = None
#: URL of an SDMX-compliant web service from which the object can be
#: retrieved.
service_url: Optional[str] = None
#: URL of an SDMX-ML document containing the object.
structure_url: Optional[str] = None
#: Association to the Agency responsible for maintaining the object.
maintainer: Optional["Agency"] = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
if self.maintainer and self.maintainer.id != self.urn_group["agency"]:
raise ValueError(
f"Maintainer {self.maintainer} does not match URN {self.urn}"
)
else:
self.maintainer = Agency(id=self.urn_group["agency"])
except KeyError:
pass
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two MaintainableArtefacts are the same if:
- :meth:`.VersionableArtefact.compare` is :obj:`True`, and
- they have the same :attr:`maintainer`.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare` and :meth:`.VersionableArtefact.compare`.
"""
return super().compare(other, strict) and compare(
"maintainer", self, other, strict
)
def _repr_kw(self):
return ChainMap(
super()._repr_kw(),
dict(maint=f"{self.maintainer}:" if self.maintainer else ""),
)
def __repr__(self):
return "<{cls} {maint}{id}{version}{name}>".format(**self._repr_kw())
# §3.4: Data Types
ActionType = Enum("ActionType", "delete replace append information")
UsageStatus = Enum("UsageStatus", "mandatory conditional")
# NB three diagrams in the spec show this enumeration containing 'gregorianYearMonth'
# but not 'gregorianYear' or 'gregorianMonth'. The table in §3.6.3.3 Representation
# Constructs does the opposite. One ESTAT query (via SGR) shows a real-world usage
# of 'gregorianYear'; while one query shows usage of 'gregorianYearMonth'; so all
# three are included.
FacetValueType = Enum(
"FacetValueType",
"""string bigInteger integer long short decimal float double boolean uri count
inclusiveValueRange alpha alphaNumeric numeric exclusiveValueRange incremental
observationalTimePeriod standardTimePeriod basicTimePeriod gregorianTimePeriod
gregorianYear gregorianMonth gregorianYearMonth gregorianDay reportingTimePeriod
reportingYear reportingSemester reportingTrimester reportingQuarter reportingMonth
reportingWeek reportingDay dateTime timesRange month monthDay day time duration
keyValues identifiableReference dataSetReference""",
)
ConstraintRoleType = Enum("ConstraintRoleType", "allowable actual")
# §3.5: Item Scheme
IT = TypeVar("IT", bound="Item")
[docs]class Item(NameableArtefact, Generic[IT]):
parent: Optional[Union[IT, "ItemScheme"]] = None
child: List[IT] = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Add this Item as a child of its parent
parent = kwargs.get("parent", None)
if parent:
parent.append_child(self)
# Add this Item as a parent of its children
for c in kwargs.get("child", []):
self.append_child(c)
def __contains__(self, item):
"""Recursive containment."""
for c in self.child:
if item == c or item in c:
return True
def __iter__(self, recurse=True):
yield self
for c in self.child:
yield from iter(c)
@property
def hierarchical_id(self):
"""Construct the ID of an Item in a hierarchical ItemScheme.
Returns, for example, 'A.B.C' for an Item with id 'C' that is the child of an
item with id 'B', which is the child of a root Item with id 'A'.
See also
--------
.ItemScheme.get_hierarchical
"""
return (
f"{self.parent.hierarchical_id}.{self.id}"
if isinstance(self.parent, self.__class__)
else self.id
)
[docs] def append_child(self, other: IT):
if other not in self.child:
self.child.append(other)
other.parent = self
[docs] def get_child(self, id) -> IT:
"""Return the child with the given *id*."""
for c in self.child:
if c.id == id:
return c
raise ValueError(id)
[docs] def get_scheme(self):
"""Return the :class:`ItemScheme` to which the Item belongs, if any."""
try:
# Recurse
return self.parent.get_scheme()
except AttributeError:
# Either this Item is a top-level Item whose .parent refers to the
# ItemScheme, or it has no parent
return self.parent
[docs]class ItemScheme(MaintainableArtefact, Generic[IT]):
"""SDMX-IM Item Scheme.
The IM states that ItemScheme “defines a *set* of :class:`Items <.Item>`…” To
simplify indexing/retrieval, this implementation uses a :class:`dict` for the
:attr:`items` attribute, in which the keys are the :attr:`~.IdentifiableArtefact.id`
of the Item.
Because this may change in future versions of pandaSDMX, user code should not access
:attr:`items` directly. Instead, use the :func:`getattr` and indexing features of
ItemScheme, or the public methods, to access and manipulate Items:
>>> foo = ItemScheme(id='foo')
>>> bar = Item(id='bar')
>>> foo.append(bar)
>>> foo
<ItemScheme: 'foo', 1 items>
>>> (foo.bar is bar) and (foo['bar'] is bar) and (bar in foo)
True
"""
# TODO add delete()
# TODO add sorting capability; perhaps sort when new items are inserted
is_partial: Optional[bool]
#: Members of the ItemScheme. Both ItemScheme and Item are abstract classes.
#: Concrete classes are paired: for example, a :class:`.Codelist` contains
#: :class:`Codes <.Code>`.
items: Dict[str, IT] = {}
# The type of the Items in the ItemScheme. This is necessary because the type hint
# in the class declaration is static; not meant to be available at runtime.
_Item: Type = Item
[docs] @validator("items", pre=True)
def convert_to_dict(cls, v):
if isinstance(v, dict):
return v
return {i.id: i for i in v}
# Convenience access to items
def __getattr__(self, name: str) -> IT:
# Provided to pass test_dsd.py
try:
return self.__getitem__(name)
except KeyError:
raise AttributeError(name)
def __getitem__(self, name: str) -> IT:
return self.items[name]
[docs] def get_hierarchical(self, id: str) -> IT:
"""Get an Item by its :attr:`~.Item.hierarchical_id`."""
if "." not in id:
return self.items[id]
else:
for item in self.items.values():
if item.hierarchical_id == id:
return item
raise KeyError(id)
def __contains__(self, item: Union[str, IT]) -> bool:
"""Check containment.
No recursive search on children is performed as these are assumed to be included
in :attr:`items`. Allow searching by Item or its id attribute.
"""
if isinstance(item, str):
return item in self.items
return item in self.items.values()
def __iter__(self):
return iter(self.items.values())
[docs] def extend(self, items: Iterable[IT]):
"""Extend the ItemScheme with members of `items`.
Parameters
----------
items : iterable of :class:`.Item`
Elements must be of the same class as :attr:`items`.
"""
for i in items:
self.append(i)
def __len__(self):
return len(self.items)
[docs] def append(self, item: IT):
"""Add *item* to the ItemScheme.
Parameters
----------
item : same class as :attr:`items`
Item to add.
"""
if item.id in self.items:
raise ValueError(f"Item with id {repr(item.id)} already exists")
self.items[item.id] = item
if item.parent is None:
item.parent = self
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two ItemSchemes are the same if:
- :meth:`.MaintainableArtefact.compare` is :obj:`True`, and
- their :attr:`items` have the same keys, and corresponding
:class:`Items <Item>` compare equal.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare` and :meth:`.MaintainableArtefact.compare`.
"""
if not super().compare(other, strict):
pass
elif set(self.items) != set(other.items):
log.debug(
f"ItemScheme contents differ: {repr(set(self.items))} != "
+ repr(set(other.items))
)
else:
for id, item in self.items.items():
if not item.compare(other.items[id], strict):
log.debug(f"…for items with id={repr(id)}")
return False
return True
return False
def __repr__(self):
return "<{cls} {maint}{id}{version} ({N} items){name}>".format(
**self._repr_kw(), N=len(self.items)
)
[docs] def setdefault(self, obj=None, **kwargs) -> IT:
"""Retrieve the item *name*, or add it with *kwargs* and return it.
The returned object is a reference to an object in the ItemScheme, and is of the
appropriate class.
"""
if obj and len(kwargs):
raise ValueError(
"cannot give both *obj* and keyword arguments to setdefault()"
)
if not obj:
# Replace a string 'parent' ID with a reference to the object
parent = kwargs.pop("parent", None)
if isinstance(parent, str):
kwargs["parent"] = self[parent]
# Instantiate an object of the correct class
obj = self._Item(**kwargs)
try:
# Add the object to the ItemScheme
self.append(obj)
except ValueError:
pass # Already present
return obj
Item.update_forward_refs()
# §3.6: Structure
[docs]class FacetType(BaseModel):
[docs] class Config:
extra = "forbid"
#:
is_sequence: Optional[bool] = None
#:
min_length: Optional[int] = None
#:
max_length: Optional[int] = None
#:
min_value: Optional[float] = None
#:
max_value: Optional[float] = None
#:
start_value: Optional[float] = None
#:
end_value: Optional[str] = None
#:
interval: Optional[float] = None
#:
time_interval: Optional[timedelta] = None
#:
decimals: Optional[int] = None
#:
pattern: Optional[str] = None
#:
start_time: Optional[datetime] = None
#:
end_time: Optional[datetime] = None
[docs]class Facet(BaseModel):
[docs] class Config:
extra = "forbid"
#:
type: FacetType = FacetType()
#:
value: Optional[str] = None
#:
value_type: Optional[FacetValueType] = None
[docs]class Representation(BaseModel):
[docs] class Config:
extra = "forbid"
#:
enumerated: Optional[ItemScheme] = None
#:
non_enumerated: List[Facet] = []
def __repr__(self):
return "<{}: {}, {}>".format(
self.__class__.__name__, self.enumerated, self.non_enumerated
)
# §4.4: Concept Scheme
[docs]class ISOConceptReference(BaseModel):
[docs] class Config:
extra = "forbid"
#:
agency: str
#:
id: str
#:
scheme_id: str
[docs]class Concept(Item["Concept"]):
#:
core_representation: Optional[Representation] = None
#:
iso_concept: Optional[ISOConceptReference] = None
[docs]class ConceptScheme(ItemScheme[Concept]):
_Item = Concept
# §3.3: Basic Inheritance
[docs]class Component(IdentifiableArtefact):
#:
concept_identity: Optional[Concept] = None
#:
local_representation: Optional[Representation] = None
def __contains__(self, value):
for repr in [
self.concept_identity.core_representation,
self.local_representation,
]:
enum = getattr(repr, "enumerated", None)
if enum is not None:
return value in enum
raise TypeError("membership not defined for non-enumerated representations")
CT = TypeVar("CT", bound=Component)
[docs]class ComponentList(IdentifiableArtefact, Generic[CT]):
#:
components: List[CT] = []
#:
auto_order = 1
# The default type of the Components in the ComponentList. See comment on
# ItemScheme._Item
_Component: Type = Component
# Convenience access to the components
[docs] def append(self, value: CT):
"""Append *value* to :attr:`components`."""
self.components.append(value)
[docs] def get(self, id) -> CT:
"""Return the component with the given *id*."""
# Search for an existing Component
for c in self.components:
if c.id == id:
return c
raise KeyError(id)
[docs] def getdefault(self, id, cls=None, **kwargs) -> CT:
"""Return or create the component with the given *id*.
If the component is automatically created, its :attr:`.Dimension.order`
attribute is set to the value of :attr:`auto_order`, which is then incremented.
Parameters
----------
id : str
Component ID.
cls : type, optional
Hint for the class of a new object.
kwargs
Passed to the constructor of :class:`.Component`, or a Component subclass if
:attr:`.components` is overridden in a subclass of ComponentList.
"""
try:
return self.get(id)
except KeyError:
# No match
pass
# Create a new object of a class:
# 1. Given by the cls argument,
# 2. Specified by a subclass' _default_type attribute, or
# 3. Hinted for a subclass' components attribute.
cls = cls or self._Component
component = cls(id=id, **kwargs)
if "order" not in kwargs:
# For automatically created dimensions, give a serial value to the
# order property
try:
component.order = self.auto_order
self.auto_order += 1
except ValueError:
pass
self.components.append(component)
return component
# Properties of components
def __getitem__(self, key) -> CT:
"""Convenience access to components."""
return self.components[key]
def __len__(self):
return len(self.components)
def __iter__(self):
return iter(self.components)
def __repr__(self):
return "<{}: {}>".format(
self.__class__.__name__, "; ".join(map(repr, self.components))
)
def __eq__(self, other):
"""ID equal and same components occur in same order."""
return super().__eq__(other) and all(
s == o for s, o in zip(self.components, other.components)
)
# Must be reset because __eq__ is defined
def __hash__(self):
return super().__hash__()
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two ComponentLists are the same if:
- :meth:`.IdentifiableArtefact.compare` is :obj:`True`, and
- corresponding :attr:`components` compare equal.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare` and :meth:`.IdentifiableArtefact.compare`.
"""
return super().compare(other, strict) and all(
c.compare(other.get(c.id), strict) for c in self.components
)
# §4.3: Codelist
[docs]class Code(Item["Code"]):
"""SDMX-IM Code."""
[docs]class Codelist(ItemScheme[Code]):
_Item = Code
# §4.5: Category Scheme
[docs]class Category(Item["Category"]):
"""SDMX-IM Category."""
[docs]class CategoryScheme(ItemScheme[Category]):
_Item = Category
[docs]class Categorisation(MaintainableArtefact):
#:
category: Optional[Category] = None
#:
artefact: Optional[IdentifiableArtefact] = None
# §4.6: Organisations
[docs]class Organisation(Item["Organisation"]):
#:
contact: List[Contact] = []
[docs]class Agency(Organisation):
pass
# DataProvider delayed until after ConstrainableArtefact, below
# Update forward references to 'Agency'
for cls in list(locals().values()):
if isclass(cls) and issubclass(cls, MaintainableArtefact):
cls.update_forward_refs()
[docs]class OrganisationScheme:
"""SDMX-IM abstract OrganisationScheme."""
[docs]class AgencyScheme(ItemScheme[Agency], OrganisationScheme):
_Item = Agency
# DataProviderScheme delayed until after DataProvider, below
# §10.2: Constraint inheritance
[docs]class ConstrainableArtefact(BaseModel):
"""SDMX-IM ConstrainableArtefact."""
[docs]class DataConsumer(Organisation, ConstrainableArtefact):
"""SDMX-IM DataConsumer."""
[docs]class DataProvider(Organisation, ConstrainableArtefact):
"""SDMX-IM DataProvider."""
[docs]class DataConsumerScheme(ItemScheme[DataConsumer], OrganisationScheme):
_Item = DataConsumer
[docs]class DataProviderScheme(ItemScheme[DataProvider], OrganisationScheme):
_Item = DataProvider
# §10.3: Constraints
[docs]class ConstraintRole(BaseModel):
#:
role: ConstraintRoleType
[docs]class ComponentValue(BaseModel):
#:
value_for: Component
#:
value: Any
[docs]class DataKey(BaseModel):
#: :obj:`True` if the :attr:`keys` are included in the :class:`.Constraint`;
# :obj:`False` if they are excluded.
included: bool
#: Mapping from :class:`.Component` to :class:`.ComponentValue` comprising the key.
key_value: Dict[Component, ComponentValue]
[docs]class DataKeySet(BaseModel):
#: :obj:`True` if the :attr:`keys` are included in the :class:`.Constraint`;
#: :obj:`False` if they are excluded.
included: bool
#: :class:`DataKeys <.DataKey>` appearing in the set.
keys: List[DataKey] = []
def __len__(self):
""":func:`len` of the DataKeySet = :func:`len` of its :attr:`keys`."""
return len(self.keys)
def __contains__(self, item):
return any(item == dk for dk in self.keys)
[docs]class Constraint(MaintainableArtefact):
#: :class:`.DataKeySet` included in the Constraint.
data_content_keys: Optional[DataKeySet] = None
# metadata_content_keys: MetadataKeySet = None
# NB the spec gives 1..* for this attribute, but this implementation allows only 1
role: ConstraintRole
# NB this is required to prevent “unhashable type: 'dict'” in pydantic
[docs] class Config:
validate_assignment = False
def __contains__(self, value):
if self.data_content_keys is None:
raise NotImplementedError("Constraint does not contain a DataKeySet")
return value in self.data_content_keys
[docs]class SelectionValue(BaseModel):
"""SDMX-IM SelectionValue."""
[docs]class MemberValue(SelectionValue):
#:
value: str
#:
cascade_values: Optional[bool] = None
def __hash__(self):
return hash(self.value)
def __eq__(self, other):
return self.value == (other.value if isinstance(other, KeyValue) else other)
def __repr__(self):
return f"{repr(self.value)}" + (" + children" if self.cascade_values else "")
[docs]class TimeRangeValue(SelectionValue):
"""SDMX-IM TimeRangeValue."""
[docs]class Period(BaseModel):
is_inclusive: bool
period: datetime
[docs]class RangePeriod(TimeRangeValue):
start: Period
end: Period
[docs]class MemberSelection(BaseModel):
#:
included: bool = True
#:
values_for: Component
#: Value(s) included in the selection. Note that the name of this attribute is not
#: stated in the IM, so 'values' is chosen for the implementation in this package.
values: List[SelectionValue] = []
def __contains__(self, value):
"""Compare KeyValue to MemberValue."""
return any(mv == value for mv in self.values) is self.included
def __len__(self):
return len(self.values)
def __repr__(self):
return (
f"<{self.__class__.__name__} {self.values_for.id} "
f"{'not ' if not self.included else ''}in {{"
f"{', '.join(map(repr, self.values))}}}>"
)
# NB CubeRegion and ContentConstraint are moved below, after Dimension, since CubeRegion
# references that class.
[docs]class AttachmentConstraint(Constraint):
#:
attachment: Set[ConstrainableArtefact] = set()
# §5.2: Data Structure Definition
[docs]class DimensionComponent(Component):
#:
order: Optional[int] = None
[docs]class Dimension(DimensionComponent):
"""SDMX-IM Dimension."""
# (continued from §10.3)
[docs]class CubeRegion(BaseModel):
#:
included: bool = True
#:
member: Dict[Dimension, MemberSelection] = {}
def __contains__(self, other: Union["Key", "KeyValue"]) -> bool:
"""Membership test.
`other` may be either:
- :class:`.Key` —all its :class:`.KeyValue` are checked.
- :class:`.KeyValue` —only the one :class:`.Dimension` for which `other` is a
value is checked
Returns
-------
bool
:obj:`True` if:
- :attr:`.included` *and* `other` is in the CubeRegion;
- if :attr:`.included` is :obj:`False` *and* `other` is outside the
CubeRegion; or
- the `other` is KeyValue referencing a Dimension that is not included in
:attr:`.member`.
"""
if isinstance(other, Key):
result = all(other[ms.values_for.id] in ms for ms in self.member.values())
elif other.value_for is None:
# No Dimension reference to use
result = False
elif other.value_for not in self.member or len(self.member) > 1:
# This CubeRegion doesn't have a MemberSelection for the KeyValue's
# Component; or it concerns additional Components, so inclusion can't be
# determined
return True
else:
# Check whether the KeyValue is in the indicated dimension
result = other.value in self.member[other.value_for]
# Return the correct sense
return result is self.included
[docs] def to_query_string(self, structure):
all_values = []
for dim in structure.dimensions:
if isinstance(dim, TimeDimension):
# TimeDimensions handled by query parameters
continue
ms = self.member.get(dim, None)
values = sorted(mv.value for mv in ms.values) if ms else []
all_values.append("+".join(values))
return ".".join(all_values)
def __repr__(self):
return (
f"<{self.__class__.__name__} {'in' if self.included else 'ex'}clude "
f"{' '.join(map(repr, self.member.values()))}>"
)
# (continued from §10.3)
[docs]class ContentConstraint(Constraint):
#: :class:`CubeRegions <.CubeRegion>` included in the ContentConstraint.
data_content_region: List[CubeRegion] = []
#:
content: Set[ConstrainableArtefact] = set()
# metadata_content_region: MetadataTargetRegion = None
def __contains__(self, value):
if self.data_content_region:
return all(value in cr for cr in self.data_content_region)
else:
raise NotImplementedError(
"ContentConstraint does not contain a CubeRegion."
)
[docs] def to_query_string(self, structure):
cr_count = len(self.data_content_region)
try:
if cr_count > 1:
warn(f"to_query_string() using first of {cr_count} " "CubeRegions.")
return self.data_content_region[0].to_query_string(structure)
except IndexError:
raise RuntimeError("ContentConstraint does not contain a CubeRegion.")
[docs] def iter_keys(
self,
obj: Union["DataStructureDefinition", "DataflowDefinition"],
dims: List[str] = [],
) -> Generator["Key", None, None]:
"""Iterate over keys.
A warning is logged if `obj` is not already explicitly associated to this
ContentConstraint, i.e. present in :attr:`.content`.
See also
--------
.DataStructureDefinition.iter_keys
"""
if obj not in self.content:
log.warning(f"{repr(obj)} is not in {repr(self)}.content")
yield from obj.iter_keys(constraint=self, dims=dims)
[docs]class TimeDimension(DimensionComponent):
"""SDMX-IM TimeDimension."""
[docs]class MeasureDimension(DimensionComponent):
"""SDMX-IM MeasureDimension."""
[docs]class PrimaryMeasure(Component):
"""SDMX-IM PrimaryMeasure."""
[docs]class MeasureDescriptor(ComponentList[PrimaryMeasure]):
_Component = PrimaryMeasure
[docs]class AttributeRelationship(BaseModel):
pass
class _NoSpecifiedRelationship(AttributeRelationship):
pass
NoSpecifiedRelationship = _NoSpecifiedRelationship()
class _PrimaryMeasureRelationship(AttributeRelationship):
pass
PrimaryMeasureRelationship = _PrimaryMeasureRelationship()
[docs]class DimensionRelationship(AttributeRelationship):
#:
dimensions: List[DimensionComponent] = []
#: NB the IM says "0..*" here in a diagram, but the text does not match.
group_key: Optional["GroupDimensionDescriptor"] = None
[docs]class GroupRelationship(AttributeRelationship):
# 'Retained for compatibility reasons' in SDMX 2.1; not used by pandaSDMX.
#:
group_key: Optional["GroupDimensionDescriptor"] = None
[docs]class DataAttribute(Component):
#:
related_to: Optional[AttributeRelationship] = None
#:
usage_status: Optional[UsageStatus] = None
[docs]class ReportingYearStartDay(DataAttribute):
pass
[docs]class AttributeDescriptor(ComponentList[DataAttribute]):
_Component = DataAttribute
[docs]class Structure(MaintainableArtefact):
#:
grouping: Optional[ComponentList] = None
[docs]class StructureUsage(MaintainableArtefact):
#:
structure: Optional[Structure] = None
[docs]class DimensionDescriptor(ComponentList[DimensionComponent]):
"""Describes a set of dimensions.
IM: “An ordered set of metadata concepts that, combined, classify a statistical
series, and whose values, when combined (the key) in an instance such as a data set,
uniquely identify a specific observation.”
:attr:`.components` is a :class:`list` (ordered) of :class:`Dimension`,
:class:`MeasureDimension`, and/or :class:`TimeDimension`.
"""
_Component = Dimension
[docs] def assign_order(self):
"""Assign the :attr:`.DimensionComponent.order` attribute.
The Dimensions in :attr:`components` are numbered, starting from 1.
"""
for i, component in enumerate(self.components):
component.order = i + 1
[docs] def order_key(self, key):
"""Return a key ordered according to the DSD."""
result = key.__class__()
for dim in sorted(self.components, key=attrgetter("order")):
try:
result[dim.id] = key[dim.id]
except KeyError:
continue
return result
[docs] @classmethod
def from_key(cls, key):
"""Create a new DimensionDescriptor from a *key*.
For each :class:`KeyValue` in the *key*:
- A new :class:`Dimension` is created.
- A new :class:`Codelist` is created, containing the
:attr:`KeyValue.value`.
Parameters
----------
key : :class:`Key` or :class:`GroupKey` or :class:`SeriesKey`
"""
dd = cls()
for order, (id, kv) in enumerate(key.values.items()):
cl = Codelist(id=id)
cl.append(Code(id=kv.value))
dd.components.append(
Dimension(
id=id,
local_representation=Representation(enumerated=cl),
order=order,
)
)
return dd
[docs]class GroupDimensionDescriptor(DimensionDescriptor):
#:
attachment_constraint: Optional[bool] = None
#:
constraint: Optional[AttachmentConstraint] = None
[docs] def assign_order(self):
""":meth:`assign_order` has no effect for GroupDimensionDescriptor."""
pass
DimensionRelationship.update_forward_refs()
GroupRelationship.update_forward_refs()
class _NullConstraintClass:
"""Constraint that allows anything."""
def __contains__(self, value):
return True
_NullConstraint = _NullConstraintClass()
[docs]@validate_dictlike
class DataStructureDefinition(Structure, ConstrainableArtefact):
"""SDMX-IM DataStructureDefinition (‘DSD’)."""
#: A :class:`AttributeDescriptor` that describes the attributes of the data
#: structure.
attributes: AttributeDescriptor = AttributeDescriptor()
#: A :class:`DimensionDescriptor` that describes the dimensions of the data
#: structure.
dimensions: DimensionDescriptor = DimensionDescriptor()
#: A :class:`.MeasureDescriptor`.
measures: MeasureDescriptor = MeasureDescriptor()
#: Mapping from :attr:`.GroupDimensionDescriptor.id` to
#: :class:`.GroupDimensionDescriptor`.
group_dimensions: DictLike[str, GroupDimensionDescriptor] = dictlike_field()
# Convenience methods
[docs] def iter_keys(
self, constraint: Constraint = None, dims: List[str] = []
) -> Generator["Key", None, None]:
"""Iterate over keys.
Parameters
----------
constraint : Constraint, optional
If given, only yield Keys that are within the constraint.
dims : list of str, optional
If given, only iterate over allowable values for the Dimensions with these
IDs. Other dimensions have only a single value like "(DIM_ID)", where
DIM_ID is the ID of the dimension.
"""
# NB for performance, the implementation tries to use iterators and avoid
# constructing full-length tuples/lists at any point
_constraint = constraint or _NullConstraint
dims = dims or [dim.id for dim in self.dimensions.components]
# Utility to return an immutable function that produces KeyValues. The
# arguments are frozen so these can be set using loop variables and stored in a
# map() object that isn't modified on future loops
def make_factory(id=None, value_for=None):
return lambda value: KeyValue.construct(
id=id, value=value, value_for=value_for
)
# List of iterables of (dim.id, KeyValues) along each dimension
all_kvs: List[Iterable[Tuple[str, KeyValue]]] = []
# Iterate over dimensions
for dim in self.dimensions.components:
if (
dim.id not in dims
or dim.local_representation is None
or dim.local_representation.enumerated is None
):
# `dim` is not enumerated by an ItemScheme, or not included in the
# `dims` argument and not to be iterated over. Create a placeholder.
all_kvs.append(
[(dim.id, KeyValue(id=dim.id, value=f"({dim.id})", value_for=dim))]
)
else:
# Create a KeyValue for each Item in the ItemScheme; filter through any
# constraint.
all_kvs.append(
map(
lambda kv: (kv.id, kv),
filter(
_constraint.__contains__,
map(
make_factory(id=dim.id, value_for=dim),
dim.local_representation.enumerated,
),
),
)
)
# Create Key objects from Cartesian product of KeyValues along each dimension
# NB this does not work with DataKeySet
# TODO improve to work with DataKeySet
yield from filter(_constraint.__contains__, map(Key._fast, product(*all_kvs)))
[docs] def make_constraint(self, key):
"""Return a constraint for `key`.
`key` is a :class:`dict` wherein:
- keys are :class:`str` ids of Dimensions appearing in this DSD's
:attr:`dimensions`, and
- values are '+'-delimited :class:`str` containing allowable values, *or*
iterables of :class:`str`, each an allowable value.
For example::
cc2 = dsd.make_constraint({'foo': 'bar+baz', 'qux': 'q1+q2+q3'})
``cc2`` includes any key where the 'foo' dimension is 'bar' *or* 'baz', *and*
the 'qux' dimension is one of 'q1', 'q2', or 'q3'.
Returns
-------
ContentConstraint
A constraint with one :class:`CubeRegion` in its
:attr:`data_content_region <ContentConstraint.data_content_region>` ,
including only the values appearing in `key`.
Raises
------
ValueError
if `key` contains a dimension IDs not appearing in :attr:`dimensions`.
"""
# Make a copy to avoid pop()'ing off the object in the calling scope
key = key.copy()
cr = CubeRegion()
for dim in self.dimensions:
mvs = set()
try:
values = key.pop(dim.id)
except KeyError:
continue
values = values.split("+") if isinstance(values, str) else values
for value in values:
# TODO validate values
mvs.add(MemberValue(value=value))
cr.member[dim] = MemberSelection(included=True, values_for=dim, values=mvs)
if len(key):
raise ValueError(
"Dimensions {!r} not in {!r}".format(list(key.keys()), self.dimensions)
)
return ContentConstraint(
data_content_region=[cr],
role=ConstraintRole(role=ConstraintRoleType.allowable),
)
[docs] @classmethod
def from_keys(cls, keys):
"""Return a new DSD given some *keys*.
The DSD's :attr:`dimensions` refers to a set of new :class:`Concepts <Concept>`
and :class:`Codelists <Codelist>`, created to represent all the values observed
across *keys* for each dimension.
Parameters
----------
keys : iterable of :class:`Key`
or of subclasses such as :class:`SeriesKey` or :class:`GroupKey`.
"""
iter_keys = iter(keys)
dd = DimensionDescriptor.from_key(next(iter_keys))
for k in iter_keys:
for i, (id, kv) in enumerate(k.values.items()):
try:
dd[i].local_representation.enumerated.append(Code(id=kv.value))
except ValueError:
pass # Item already exists
return cls(dimensions=dd)
[docs] def make_key(self, key_cls, values: Mapping, extend=False, group_id=None):
"""Make a :class:`.Key` or subclass.
Parameters
----------
key_cls : Key or SeriesKey or GroupKey
Class of Key to create.
values : dict
Used to construct :attr:`.Key.values`.
extend : bool, optional
If :obj:`True`, make_key will not return :class:`KeyError` on missing
dimensions. Instead :attr:`dimensions` (`key_cls` is Key or SeriesKey) or
:attr:`group_dimensions` (`key_cls` is GroupKey) will be extended by
creating new Dimension objects.
group_id : str, optional
When `key_cls` is :class`.GroupKey`, the ID of the
:class:`.GroupDimensionDescriptor` that structures the key.
Returns
-------
Key
An instance of `key_cls`.
Raises
------
KeyError
If any of the keys of `values` is not a Dimension or Attribute in the DSD.
"""
# Methods to get dimensions and attributes
get_method = "getdefault" if extend else "get"
dim = getattr(self.dimensions, get_method)
attr = getattr(self.attributes, get_method)
# Arguments for creating the Key
args: Dict[str, Any] = dict(described_by=self.dimensions)
if key_cls is GroupKey:
# Get the GroupDimensionDescriptor, if indicated by group_id
gdd = self.group_dimensions.get(group_id, None)
if group_id and not gdd and not extend:
# Cannot create
raise KeyError(group_id)
elif group_id and extend:
# Create the GDD
gdd = GroupDimensionDescriptor(id=group_id)
self.group_dimensions[gdd.id] = gdd
# GroupKey will have same ID and be described by the GDD
args = dict(id=group_id, described_by=gdd)
# Dimensions to be retrieved from the GDD
def dim(id):
# Get from the DimensionDescriptor
new_dim = self.dimensions.getdefault(id)
# Add to the GDD
gdd.components.append(new_dim)
return gdd.get(id)
else:
# Not described by anything
args = dict()
key = key_cls(**args)
# Convert keyword arguments to either KeyValue or AttributeValue
keyvalues = []
for order, (id, value) in enumerate(values.items()):
args = dict(id=id, value=value)
if id in self.attributes:
# Reference a DataAttribute from the AttributeDescriptor
da = attr(id)
# Store the attribute value, referencing
key.attrib[da.id] = AttributeValue(**args, value_for=da)
continue
# Reference a Dimension from the DimensionDescriptor. If extend=False and
# the Dimension does not exist, this will raise KeyError
args["value_for"] = dim(id)
# Retrieve the order
order = args["value_for"].order
# Store a KeyValue, to be sorted later
keyvalues.append((order, KeyValue(**args)))
# Sort the values according to *order*
key.values.update({kv.id: kv for _, kv in sorted(keyvalues)})
return key
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two DataStructureDefinitions are the same if each of :attr:`attributes`,
:attr:`dimensions`, :attr:`measures`, and :attr:`group_dimensions` compares
equal.
Parameters
----------
strict : bool, optional
Passed to :meth:`.ComponentList.compare`.
"""
return all(
getattr(self, attr).compare(getattr(other, attr), strict)
for attr in ("attributes", "dimensions", "measures", "group_dimensions")
)
[docs]class DataflowDefinition(StructureUsage, ConstrainableArtefact):
#:
structure: DataStructureDefinition = DataStructureDefinition()
[docs] def iter_keys(
self, constraint: Constraint = None, dims: List[str] = []
) -> Generator["Key", None, None]:
"""Iterate over keys.
See also
--------
.DataStructureDefinition.iter_keys
"""
yield from self.structure.iter_keys(constraint=constraint, dims=dims)
# §5.4: Data Set
[docs]def value_for_dsd_ref(kind, args, kwargs):
"""Maybe replace a string 'value_for' in *kwargs* with a DSD reference."""
try:
dsd = kwargs.pop("dsd")
descriptor = getattr(dsd, kind + "s")
kwargs["value_for"] = descriptor.get(kwargs["value_for"])
except KeyError:
pass
return args, kwargs
[docs]class KeyValue(BaseModel):
"""One value in a multi-dimensional :class:`Key`."""
#:
id: str
#: The actual value.
value: Any
#:
value_for: Optional[Dimension] = None
def __init__(self, *args, **kwargs):
args, kwargs = value_for_dsd_ref("dimension", args, kwargs)
super().__init__(*args, **kwargs)
[docs] def __eq__(self, other):
"""Compare the value to a simple Python built-in type or other key-like.
`other` may be :class:`.KeyValue` or :class:`.ComponentValue`; if so, and both
`self` and `other` have :attr:`.value_for`, these must refer to the same object.
"""
if isinstance(other, (KeyValue, ComponentValue)):
return (self.value == other.value) and (
self.value_for in (None, other.value_for) or other.value_for is None
)
elif isinstance(other, MemberValue):
return self.value == other.value
else:
return self.value == other
def __str__(self):
return "{0.id}={0.value}".format(self)
def __repr__(self):
return "<{0.__class__.__name__}: {0.id}={0.value}>".format(self)
def __hash__(self):
# KeyValue instances with the same id & value hash identically
return hash(self.id + str(self.value))
TimeKeyValue = KeyValue
[docs]class AttributeValue(BaseModel):
"""SDMX-IM AttributeValue.
In the spec, AttributeValue is an abstract class. Here, it serves as both the
concrete subclasses CodedAttributeValue and UncodedAttributeValue.
"""
# TODO separate and enforce properties of Coded- and UncodedAttributeValue
#:
value: Union[str, Code]
#:
value_for: Optional[DataAttribute] = None
#:
start_date: Optional[date] = None
def __init__(self, *args, **kwargs):
args, kwargs = value_for_dsd_ref("attribute", args, kwargs)
super(AttributeValue, self).__init__(*args, **kwargs)
def __eq__(self, other):
"""Compare the value to a Python built-in type, e.g. str."""
return self.value == other
def __str__(self):
# self.value directly for UncodedAttributeValue
return self.value if isinstance(self.value, str) else self.value.id
def __repr__(self):
return "<{}: {}={}>".format(self.__class__.__name__, self.value_for, self.value)
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two AttributeValues are equal if their properties are equal.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare`.
"""
return all(
compare(attr, self, other, strict)
for attr in ["start_date", "value", "value_for"]
)
[docs]@validate_dictlike
class Key(BaseModel):
"""SDMX Key class.
The constructor takes an optional list of keyword arguments; the keywords are used
as Dimension or Attribute IDs, and the values as KeyValues.
For convience, the values of the key may be accessed directly:
>>> k = Key(foo=1, bar=2)
>>> k.values['foo']
1
>>> k['foo']
1
Parameters
----------
dsd : DataStructureDefinition
If supplied, the :attr:`~.DataStructureDefinition.dimensions` and
:attr:`~.DataStructureDefinition.attributes` are used to separate the `kwargs`
into :class:`KeyValues <.KeyValue>` and
:class:`AttributeValues <.AttributeValue>`. The `kwargs` for
:attr:`described_by`, if any, must be
:attr:`~.DataStructureDefinition.dimensions` or appear in
:attr:`~.DataStructureDefinition.group_dimensions`.
kwargs
Dimension and Attribute IDs, and/or the class properties.
"""
#:
attrib: DictLike[str, AttributeValue] = dictlike_field()
#:
described_by: Optional[DimensionDescriptor] = None
#: Individual KeyValues that describe the key.
values: DictLike[str, KeyValue] = dictlike_field()
def __init__(self, arg: Union[Mapping, Sequence[KeyValue]] = None, **kwargs):
# DimensionDescriptor
dd = kwargs.pop("described_by", None)
super().__init__(
attrib=kwargs.pop("attrib", DictLike()), described_by=dd, values=DictLike()
)
if arg and isinstance(arg, Mapping):
if len(kwargs):
raise ValueError(
"Key() accepts either a single argument, or keyword arguments; not "
"both."
)
kwargs.update(arg)
if isinstance(arg, Sequence):
# Sequence of already-prepared KeyValues; assume already sorted
kvs: Iterable[Tuple] = map(lambda kv: (kv.id, kv), arg)
else:
# Convert bare keyword arguments to KeyValue
kvs = []
for order, (id, value) in enumerate(kwargs.items()):
args = dict(id=id, value=value)
if dd:
# Reference the Dimension
args["value_for"] = dd.get(id)
# Use the existing Dimension's order attribute
order = args["value_for"].order
# Store a KeyValue, to be sorted later
kvs.append((order, (id, KeyValue(**args))))
# Sort the values according to *order*, then unwrap
kvs = map(itemgetter(1), sorted(kvs))
for id, kv in kvs:
self.values[id] = kv
@classmethod
def _fast(cls, kvs):
"""Use :meth:`pydantic.BaseModel.construct` for faster construction."""
return cls.construct(values=DictLike(kvs))
def __len__(self):
"""The length of the Key is the number of KeyValues it contains."""
return len(self.values)
def __contains__(self, other):
"""A Key contains another if it is a superset."""
try:
return all([self.values[k] == v for k, v in other.values.items()])
except KeyError:
# 'k' in other does not appear in this Key()
return False
def __iter__(self):
yield from self.values.values()
# Convenience access to values by name
def __getitem__(self, name):
return self.values[name]
def __setitem__(self, name, value):
# Convert a bare string or other Python object to a KeyValue instance
if not isinstance(value, KeyValue):
value = KeyValue(id=name, value=value)
self.values[name] = value
# Convenience access to values by attribute
def __getattr__(self, name):
try:
return self.__getitem__(name)
except KeyError as e:
raise AttributeError(e)
# Copying
def __copy__(self):
result = Key()
if self.described_by:
result.described_by = self.described_by
for kv in self.values.values():
result[kv.id] = kv
return result
[docs] def copy(self, arg=None, **kwargs):
result = copy(self)
for id, value in kwargs.items():
result[id] = value
return result
def __add__(self, other):
if not isinstance(other, Key):
raise NotImplementedError
result = copy(self)
for id, value in other.values.items():
result[id] = value
return result
def __radd__(self, other):
if other is None:
return copy(self)
else:
raise NotImplementedError
def __eq__(self, other):
if hasattr(other, "values"):
# Key
return all(
[a == b for a, b in zip(self.values.values(), other.values.values())]
)
elif hasattr(other, "key_value"):
# DataKey
return all(
[a == b for a, b in zip(self.values.values(), other.key_value.values())]
)
elif isinstance(other, str) and len(self.values) == 1:
return self.values[0] == other
else:
raise ValueError(other)
def __hash__(self):
# Hash of the individual KeyValues, in order
return hash(tuple(hash(kv) for kv in self.values.values()))
# Representations
def __str__(self):
return "({})".format(", ".join(map(str, self.values.values())))
def __repr__(self):
return "<{}: {}>".format(
self.__class__.__name__, ", ".join(map(str, self.values.values()))
)
[docs] def order(self, value=None):
if value is None:
value = self
try:
return self.described_by.order_key(value)
except AttributeError:
return value
[docs] def get_values(self):
return tuple([kv.value for kv in self.values.values()])
[docs]class GroupKey(Key):
#:
id: Optional[str] = None
#:
described_by: Optional[GroupDimensionDescriptor] = None
def __init__(self, arg: Mapping = None, **kwargs):
# Remove the 'id' keyword argument
id = kwargs.pop("id", None)
super().__init__(arg, **kwargs)
self.id = id
[docs]class SeriesKey(Key):
#: :mod:`sdmx` extension not in the IM.
group_keys: Set[GroupKey] = set()
@property
def group_attrib(self):
"""Return a view of combined group attributes."""
# Needed to pass existing tests
view = DictLike()
for gk in self.group_keys:
view.update(gk.attrib)
return view
[docs]@validate_dictlike
class Observation(BaseModel):
"""SDMX-IM Observation.
This class also implements the spec classes ObservationValue,
UncodedObservationValue, and CodedObservation.
"""
#:
attached_attribute: DictLike[str, AttributeValue] = dictlike_field()
#:
series_key: Optional[SeriesKey] = None
#: Key for dimension(s) varying at the observation level.
dimension: Optional[Key] = None
#: Data value.
value: Optional[Union[Any, Code]] = None
#:
value_for: Optional[PrimaryMeasure] = None
#: :mod:`sdmx` extension not in the IM.
group_keys: Set[GroupKey] = set()
@property
def attrib(self):
"""Return a view of combined observation, series & group attributes."""
view = self.attached_attribute.copy()
view.update(getattr(self.series_key, "attrib", {}))
for gk in self.group_keys:
view.update(gk.attrib)
return view
@property
def dim(self):
return self.dimension
@property
def key(self):
"""Return the entire key, including KeyValues at the series level."""
return self.series_key + self.dimension
def __len__(self):
# FIXME this is unintuitive; maybe deprecate/remove?
return len(self.key)
def __str__(self):
return "{0.key}: {0.value}".format(self)
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two Observations are equal if:
- their :attr:`dimension`, :attr:`value`, :attr:`series_key`, and
:attr:`value_for` are all equal,
- their corresponding :attr:`attached_attribute` and :attr:`group_keys` are all
equal.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare`.
"""
return (
all(
compare(attr, self, other, strict)
for attr in ["dimension", "series_key", "value", "value_for"]
)
and self.attached_attribute.compare(other.attached_attribute)
and self.group_keys == other.group_keys
)
[docs]@validate_dictlike
class DataSet(AnnotableArtefact):
# SDMX-IM features
#:
action: Optional[ActionType] = None
#:
attrib: DictLike[str, AttributeValue] = dictlike_field()
#:
valid_from: Optional[str] = None
#:
described_by: Optional[DataflowDefinition] = None
#:
structured_by: Optional[DataStructureDefinition] = None
#: All observations in the DataSet.
obs: List[Observation] = []
#: Map of series key → list of observations.
#: :mod:`sdmx` extension not in the IM.
series: DictLike[SeriesKey, List[Observation]] = dictlike_field()
#: Map of group key → list of observations.
#: :mod:`sdmx` extension not in the IM.
group: DictLike[GroupKey, List[Observation]] = dictlike_field()
def __len__(self):
return len(self.obs)
def _add_group_refs(self, target):
"""Associate *target* with groups in this dataset.
*target* may be an instance of SeriesKey or Observation.
"""
for group_key in self.group:
if group_key in (target if isinstance(target, SeriesKey) else target.key):
target.group_keys.add(group_key)
if isinstance(target, Observation):
self.group[group_key].append(target)
[docs] def add_obs(self, observations, series_key=None):
"""Add *observations* to a series with *series_key*.
Checks consistency and adds group associations."""
if series_key is not None:
# Associate series_key with any GroupKeys that apply to it
self._add_group_refs(series_key)
# Maybe initialize empty series
self.series.setdefault(series_key, [])
for obs in observations:
# Associate the observation with any GroupKeys that contain it
self._add_group_refs(obs)
# Store a reference to the observation
self.obs.append(obs)
if series_key is not None:
if obs.series_key is None:
# Assign the observation to the SeriesKey
obs.series_key = series_key
else:
# Check that the Observation is not associated with a different
# SeriesKey
assert obs.series_key is series_key
# Store a reference to the observation
self.series[series_key].append(obs)
@validator("action")
def _validate_action(cls, value):
if value in ActionType:
return value
else:
return ActionType[value]
[docs] def compare(self, other, strict=True):
"""Return :obj:`True` if `self` is the same as `other`.
Two DataSets are the same if:
- their :attr:`action`, :attr:`valid_from` compare equal.
- all dataset-level attached attributes compare equal.
- they have the same number of observations, series, and groups.
Parameters
----------
strict : bool, optional
Passed to :func:`.compare`.
"""
return (
compare("action", self, other, strict)
and compare("valid_from", self, other, strict)
and self.attrib.compare(other.attrib, strict)
and len(self.obs) == len(other.obs)
and len(self.series) == len(other.series)
and len(self.group) == len(other.group)
and all(o[0].compare(o[1], strict) for o in zip(self.obs, other.obs))
)
[docs]class StructureSpecificDataSet(DataSet):
"""SDMX-IM StructureSpecificDataSet."""
[docs]class GenericDataSet(DataSet):
"""SDMX-IM GenericDataSet."""
[docs]class GenericTimeSeriesDataSet(DataSet):
"""SDMX-IM GenericTimeSeriesDataSet."""
[docs]class StructureSpecificTimeSeriesDataSet(DataSet):
"""SDMX-IM StructureSpecificTimeSeriesDataSet."""
class _AllDimensions:
pass
AllDimensions = _AllDimensions()
# §11: Data Provisioning
[docs]class Datasource(BaseModel):
url: str
[docs]class SimpleDatasource(Datasource):
pass
[docs]class QueryDatasource(Datasource):
# Abstract.
# NB the SDMX-IM inconsistently uses this name and 'WebServicesDatasource'.
pass
[docs]class RESTDatasource(QueryDatasource):
pass
[docs]class ProvisionAgreement(MaintainableArtefact, ConstrainableArtefact):
#:
structure_usage: Optional[StructureUsage] = None
#:
data_provider: Optional[DataProvider] = None
#: The SDMX-IM defines 'packages'; these are used in URNs.
PACKAGE = dict()
_PACKAGE_CLASS: Dict[str, set] = {
"base": {Agency, AgencyScheme, DataProvider, DataProviderScheme},
"categoryscheme": {Category, Categorisation, CategoryScheme},
"codelist": {Code, Codelist},
"conceptscheme": {Concept, ConceptScheme},
"datastructure": {DataflowDefinition, DataStructureDefinition, StructureUsage},
"registry": {ContentConstraint, ProvisionAgreement},
}
for package, classes in _PACKAGE_CLASS.items():
PACKAGE.update({cls: package for cls in classes})
del cls
[docs]@lru_cache()
def get_class(name: Union[str, Resource], package=None) -> Optional[Type]:
"""Return a class for `name` and (optional) `package` names."""
if isinstance(name, Resource):
# Convert a Resource enumeration value to a string
# Expected class name in lower case; maybe just the enumeration value
match = Resource.class_name(name).lower()
# Match class names in lower case. If no match or >2, only() returns None, and
# KeyError occurs below
name = only(filter(lambda g: g.lower() == match, globals().keys()))
name = {"Dataflow": "DataflowDefinition"}.get(name, name)
try:
cls = globals()[name]
except KeyError:
return None
if package and package != PACKAGE[cls]:
raise ValueError(f"Package {repr(package)} invalid for {name}")
return cls
[docs]def parent_class(cls):
"""Return the class that contains objects of type `cls`.
E.g. if `cls` is :class:`.PrimaryMeasure`, returns :class:`.MeasureDescriptor`.
"""
return {
Agency: AgencyScheme,
Category: CategoryScheme,
Code: Codelist,
Concept: ConceptScheme,
Dimension: DimensionDescriptor,
DataProvider: DataProviderScheme,
GroupDimensionDescriptor: DataStructureDefinition,
PrimaryMeasure: MeasureDescriptor,
}[cls]