importcopyfromcollectionsimportdefaultdictfromtypingimportList,Dict,Any,Union,Generatorfromparamtools.sorted_key_listimportSortedKeyListfromparamtools.typingimportValueObjectdefdefault_cmp_func(x):returnxclassValueItem:""" Handles index-based look-ups on the Values class. """def__init__(self,values:"Values",index:List[int]=None):self.values=valuesself.index=list(index)ifindexisnotNoneelseindexdef__getitem__(self,item):ifisinstance(item,slice):ifself.indexisnotNone:indices=item.indices(len(self.index))return[dict(self.values.values[self.index[ix]])forixinrange(*indices)]else:indices=item.indices(len(self.values))return[dict(self.values.values[ix])forixinrange(*indices)]elifself.indexisnotNone:returndict(self.values.values[self.index[item]])else:returndict(self.values.values[item])classValueBase:@propertydefcmp_attr(self):raiseNotImplementedError()def__eq__(self,value=None,**labels):returnself.cmp_attr.eq(**{self.label:value})def__ne__(self,value):returnself.cmp_attr.ne(**{self.label:value})def__gt__(self,value):returnself.cmp_attr.gt(**{self.label:value})def__ge__(self,value):returnself.cmp_attr.gte(**{self.label:value})def__lt__(self,value):returnself.cmp_attr.lt(**{self.label:value})def__le__(self,value):returnself.cmp_attr.lte(**{self.label:value})def__len__(self):returnlen([itemforiteminiter(self.cmp_attr)])def__iter__(self):returniter(self.cmp_attr)def__getitem__(self,item):returnself.cmp_attr[item]defeq(self,value,strict=True):returnself.cmp_attr.eq(strict,**{self.label:value})defne(self,value,strict=True):returnself.cmp_attr.ne(strict,**{self.label:value})defgt(self,value,strict=True):returnself.cmp_attr.gt(strict,**{self.label:value})defgte(self,value,strict=True):returnself.cmp_attr.gte(strict,**{self.label:value})deflt(self,value,strict=True):returnself.cmp_attr.lt(strict,**{self.label:value})deflte(self,value,strict=True):returnself.cmp_attr.lte(strict,**{self.label:value})defisin(self,value,strict=True):returnself.cmp_attr.isin(strict,**{self.label:value})classQueryResult(ValueBase):def__init__(self,values:"Values",index:List[Any]):self.values=valuesself.index=indexdef__and__(self,queryresult:"QueryResult"):res=set(self.index)&set(queryresult.index)returnQueryResult(self.values,res)def__or__(self,queryresult:"QueryResult"):res=set(self.index)|set(queryresult.index)returnQueryResult(self.values,res)def__repr__(self):vo_repr="\n ".join(str(dict(self.values.values[i]))foriin(self.indexor[]))returnf"QueryResult([\n{vo_repr}\n])"def__iter__(self):foriinself.index:yieldself.values.values[i]def__getitem__(self,item):raiseNotImplementedError("Use .isel to do index-based look ups or as_values to chain queries.")@propertydefisel(self):returnValueItem(self.values,self.index)deftolist(self):return[self.values.values[i]foriinself.index]defeq(self,strict=True,**labels):returnself.cmp_attr.eq(strict,**labels)defne(self,strict=True,**labels):returnself.cmp_attr.ne(strict,**labels)defgt(self,strict=True,**labels):returnself.cmp_attr.gt(strict,**labels)defgte(self,strict=True,**labels):returnself.cmp_attr.gte(strict,**labels)deflt(self,strict=True,**labels):returnself.cmp_attr.lt(strict,**labels)deflte(self,strict=True,**labels):returnself.cmp_attr.lte(strict,**labels)defisin(self,strict=True,**labels):returnself.cmp_attr.isin(strict,**labels)def__eq__(self,*args,**kwargs):raiseNotImplementedError()def__ne__(self,*args,**kwargs):raiseNotImplementedError()def__gt__(self,*args,**kwargs):raiseNotImplementedError()def__ge__(self,*args,**kwargs):raiseNotImplementedError()def__lt__(self,*args,**kwargs):raiseNotImplementedError()def__le__(self,*args,**kwargs):raiseNotImplementedError()defas_values(self):returnValues(values=list(self),index=self.index,keyfuncs=self.values.keyfuncs)defdelete(self):self.values.delete(*self.index,inplace=True)@propertydefcmp_attr(self):returnself.valuesclassSlice(ValueBase):def__init__(self,values:"Values",label:str):self.values=valuesself.label=label@propertydefcmp_attr(self):returnself.valuesdef__getitem__(self,item):ifisinstance(item,slice):indices=item.indices(len(self))return[self.values.values[ix].get(self.label,None)forixinrange(*indices)]else:returnself.values.values[item][self.label]@propertydefisel(self):raiseNotImplementedError("Access values of a Slice object directly: parameters['label'][1]")def__repr__(self):vo_repr="\n ".join(str(dict(self.values.values[i]))foriinself.values.values)returnf"Slice([\n{vo_repr}\n], \nlabel={self.label})"
[docs]classValues(ValueBase):""" The Values class is used to query and update parameter values. For more information, checkout the `Viewing Data <https://paramtools.dev/api/viewing-data.html>`_ docs. """def__init__(self,values:List[ValueObject],keyfuncs:Dict[str,Any]=None,skls:Dict[str,SortedKeyList]=None,index:List[Any]=None,):self.index=indexorlist(range(len(values)))self.values={ix:valueforix,valueinzip(self.index,values)}self.keyfuncs=keyfuncsself.label="value"ifsklsisnotNone:self.skls=sklselse:self.skls=self.build_skls(self.values,keyfuncsor{})defbuild_skls(self,values,keyfuncs):label_values=defaultdict(list)label_index=defaultdict(list)forix,voinvalues.items():forlabel,valueinvo.items():label_values[label].append(value)label_index[label].append(ix)skls={}forlabelinlabel_values:keyfunc=self.get_keyfunc(label,keyfuncs)skls[label]=SortedKeyList(label_values[label],keyfunc,label_index[label])returnsklsdefupdate_skls(self,values):# TODO: remove existing values with clashing indexforix,voinvalues.items():forlabel,valueinvo.items():ifself.skls.get(label,None)isnotNone:self.skls[label].add(value,index=ix)else:self.skls[label]=SortedKeyList([value],keyfunc=self.get_keyfunc(label,self.keyfuncs),index=[ix],)defget_keyfunc(self,label,keyfuncs):keyfunc=keyfuncs.get(label)returnkeyfuncordefault_cmp_funcdef_cmp(self,op,strict,**labels):label,value=list(labels.items())[0]skl=self.skls.get(label,None)ifsklisNoneandstrict:raiseKeyError(f"Unknown label: {label}.")elifsklisNoneandnotstrict:returnQueryResult(self,list(self.index))skl_result=getattr(self.skls[label],op)(value)ifnotstrict:match_index=skl_result.indexifskl_resultelse[]missing=self.missing(label)match_index=set(match_index+missing.index)elifskl_resultisNone:match_index=[]else:match_index=skl_result.indexreturnQueryResult(self,match_index)def__getitem__(self,label):iflabelnotinself.skls:raiseKeyError(f"Unknown label: {label}")returnSlice(self,label)defmissing(self,label:str):index=list(set(self.index)-self.skls[label].index)returnQueryResult(self,index)
[docs]defeq(self,strict=True,**labels):""" Returns values that match the given label: .. code-block:: Python params.sel["my_param"].eq(my_label=5) params.sel["my_param"]["my_label"] == 5 """returnself._cmp("eq",strict,**labels)
[docs]defne(self,strict=True,**labels):""" Returns values that do match the given label: .. code-block:: Python params.sel["my_param"].ne(my_label=5) params.sel["my_param"]["my_label"] != 5 """returnself._cmp("ne",strict,**labels)
[docs]defgt(self,strict=True,**labels):""" Returns values that have label values greater than the label value: .. code-block:: Python params.sel["my_param"].gt(my_label=5) params.sel["my_param"]["my_label"] > 5 """returnself._cmp("gt",strict,**labels)
defgte(self,strict=True,**labels):""" Returns values that have label values greater than or equal to the label value: .. code-block:: Python params.sel["my_param"].gte(my_label=5) params.sel["my_param"]["my_label"] >= 5 """returnself._cmp("gte",strict,**labels)
[docs]deflt(self,strict=True,**labels):""" Returns values that have label values less than the label value: .. code-block:: Python params.sel["my_param"].lt(my_label=5) params.sel["my_param"]["my_label"] < 5 """returnself._cmp("lt",strict,**labels)
[docs]deflte(self,strict=True,**labels):""" Returns values that have label values less than or equal to the label value: .. code-block:: Python params.sel["my_param"].lte(my_label=5) params.sel["my_param"]["my_label"] <= 5 """returnself._cmp("lte",strict,**labels)
[docs]defisin(self,strict=True,**labels):""" Returns values that have label values less than or equal to the label value: .. code-block:: Python params.sel["my_param"].isin(my_label=[5, 6]) """label,values=list(labels.items())[0]returnunion(self.eq(strict=strict,**{label:value})forvalueinvalues)
defadd(self,values:List[ValueObject],index:List[Any]=None,inplace=False):ifindexisnotNone:assertlen(index)==len(values)new_index=indexelse:max_index=max(self.index)ifself.indexelse0new_index=[max_index+i+1foriinrange(len(values))]new_values={ix:valueforix,valueinzip(new_index,values)}ifinplace:self.update_skls(new_values)self.values.update(new_values)self.index+=new_indexelse:current_index=list(self.index)updated_values=dict(self.values)updated_values.update(new_values)returnValues([valueforvalueinupdated_values.values()],skls=self.build_skls(updated_values,self.keyfuncs),index=current_index+new_index,)defdelete(self,*index,inplace=False):ifnotindex:index=list(self.index)ifinplace:forixinindex:self.values.pop(ix)self.index.remove(ix)self.skls=self.build_skls(self.values,self.keyfuncs)else:new_index=list(self.index)new_values=copy.deepcopy(self.values)forixinindex:new_values.pop(ix)new_index.remove(ix)returnValues([valueforvalueinnew_values.values()],keyfuncs=self.keyfuncs,index=new_index,)@propertydefcmp_attr(self):returnself@propertydefisel(self):""" Select values by their index: .. code-block:: Python params.sel["my_param"].isel[0] params.sel["my_param"].isel[:5] """returnValueItem(self,self.index)@propertydeflabels(self):returnlist(self.skls.keys())def__eq__(self,other):ifisinstance(other,ValueBase):returnlist(self)==list(other)elifisinstance(other,list):returnlist(self)==otherelse:raiseTypeError(f"Unable to compare Values against {type(other)}")