## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromfunctoolsimportpartial,reducefromtypingimportAny,Callable,Iterator,List,Optional,Tuple,Union,cast,no_type_checkimportpandasaspdfrompandas.api.typesimportis_hashable,is_list_like# type: ignore[attr-defined]frompyspark.sqlimportfunctionsasF,ColumnasPySparkColumn,Windowfrompyspark.sql.typesimportDataTypefrompyspark.sql.utilsimportget_column_classfrompysparkimportpandasaspsfrompyspark.pandas._typingimportLabel,Name,Scalarfrompyspark.pandas.exceptionsimportPandasNotImplementedErrorfrompyspark.pandas.frameimportDataFramefrompyspark.pandas.indexes.baseimportIndexfrompyspark.pandas.missing.indexesimportMissingPandasLikeMultiIndexfrompyspark.pandas.seriesimportSeries,first_seriesfrompyspark.pandas.utilsimport(compare_disallow_null,is_name_like_tuple,name_like_string,scol_for,verify_temp_column_name,validate_index_loc,xor,)frompyspark.pandas.internalimport(InternalField,InternalFrame,NATURAL_ORDER_COLUMN_NAME,SPARK_INDEX_NAME_FORMAT,)
[docs]classMultiIndex(Index):""" pandas-on-Spark MultiIndex that corresponds to pandas MultiIndex logically. This might hold Spark Column internally. Parameters ---------- levels : sequence of arrays The unique labels for each level. codes : sequence of arrays Integers for each level designating which label at each location. sortorder : optional int Level of sortedness (must be lexicographically sorted by that level). names : optional sequence of objects Names for each of the index levels. (name is accepted for compat). copy : bool, default False Copy the meta-data. verify_integrity : bool, default True Check that the levels/codes are consistent and valid. See Also -------- MultiIndex.from_arrays : Convert list of arrays to MultiIndex. MultiIndex.from_product : Create a MultiIndex from the cartesian product of iterables. MultiIndex.from_tuples : Convert list of tuples to a MultiIndex. MultiIndex.from_frame : Make a MultiIndex from a DataFrame. Index : A single-level Index. Examples -------- >>> ps.DataFrame({'a': ['a', 'b', 'c']}, index=[[1, 2, 3], [4, 5, 6]]).index # doctest: +SKIP MultiIndex([(1, 4), (2, 5), (3, 6)], ) >>> ps.DataFrame({'a': [1, 2, 3]}, index=[list('abc'), list('def')]).index # doctest: +SKIP MultiIndex([('a', 'd'), ('b', 'e'), ('c', 'f')], ) """@no_type_checkdef__new__(cls,levels=None,codes=None,sortorder=None,names=None,dtype=None,copy=False,name=None,verify_integrity:bool=True,)->"MultiIndex":pidx=pd.MultiIndex(levels=levels,codes=codes,sortorder=sortorder,names=names,dtype=dtype,copy=copy,name=name,verify_integrity=verify_integrity,)returnps.from_pandas(pidx)@propertydef_internal(self)->InternalFrame:internal=self._psdf._internalscol=F.struct(*internal.index_spark_columns)returninternal.copy(column_labels=[None],data_spark_columns=[scol],data_fields=[None],column_label_names=None,)@propertydef_column_label(self)->Optional[Label]:returnNonedef__abs__(self)->"MultiIndex":raiseTypeError("TypeError: cannot perform __abs__ with this index type: MultiIndex")def_with_new_scol(self,scol:PySparkColumn,*,field:Optional[InternalField]=None)->"MultiIndex":raiseNotImplementedError("Not supported for type MultiIndex")@no_type_checkdefany(self,*args,**kwargs)->None:raiseTypeError("cannot perform any with this index type: MultiIndex")@no_type_checkdefall(self,*args,**kwargs)->None:raiseTypeError("cannot perform all with this index type: MultiIndex")
[docs]@staticmethoddeffrom_tuples(tuples:List[Tuple],sortorder:Optional[int]=None,names:Optional[List[Name]]=None,)->"MultiIndex":""" Convert list of tuples to MultiIndex. Parameters ---------- tuples : list / sequence of tuple-likes Each tuple is the index of one row/column. sortorder : int or None Level of sortedness (must be lexicographically sorted by that level). names : list / sequence of str, optional Names for the levels in the index. Returns ------- index : MultiIndex Examples -------- >>> tuples = [(1, 'red'), (1, 'blue'), ... (2, 'red'), (2, 'blue')] >>> ps.MultiIndex.from_tuples(tuples, names=('number', 'color')) # doctest: +SKIP MultiIndex([(1, 'red'), (1, 'blue'), (2, 'red'), (2, 'blue')], names=['number', 'color']) """returncast(MultiIndex,ps.from_pandas(pd.MultiIndex.from_tuples(tuples=tuples,sortorder=sortorder,names=names)),)
[docs]@staticmethoddeffrom_arrays(arrays:List[List],sortorder:Optional[int]=None,names:Optional[List[Name]]=None,)->"MultiIndex":""" Convert arrays to MultiIndex. Parameters ---------- arrays: list / sequence of array-likes Each array-like gives one level’s value for each data point. len(arrays) is the number of levels. sortorder: int or None Level of sortedness (must be lexicographically sorted by that level). names: list / sequence of str, optional Names for the levels in the index. Returns ------- index: MultiIndex Examples -------- >>> arrays = [[1, 1, 2, 2], ['red', 'blue', 'red', 'blue']] >>> ps.MultiIndex.from_arrays(arrays, names=('number', 'color')) # doctest: +SKIP MultiIndex([(1, 'red'), (1, 'blue'), (2, 'red'), (2, 'blue')], names=['number', 'color']) """returncast(MultiIndex,ps.from_pandas(pd.MultiIndex.from_arrays(arrays=arrays,sortorder=sortorder,names=names)),)
[docs]@staticmethoddeffrom_product(iterables:List[List],sortorder:Optional[int]=None,names:Optional[List[Name]]=None,)->"MultiIndex":""" Make a MultiIndex from the cartesian product of multiple iterables. Parameters ---------- iterables : list / sequence of iterables Each iterable has unique labels for each level of the index. sortorder : int or None Level of sortedness (must be lexicographically sorted by that level). names : list / sequence of str, optional Names for the levels in the index. Returns ------- index : MultiIndex See Also -------- MultiIndex.from_arrays : Convert list of arrays to MultiIndex. MultiIndex.from_tuples : Convert list of tuples to MultiIndex. Examples -------- >>> numbers = [0, 1, 2] >>> colors = ['green', 'purple'] >>> ps.MultiIndex.from_product([numbers, colors], ... names=['number', 'color']) # doctest: +SKIP MultiIndex([(0, 'green'), (0, 'purple'), (1, 'green'), (1, 'purple'), (2, 'green'), (2, 'purple')], names=['number', 'color']) """returncast(MultiIndex,ps.from_pandas(pd.MultiIndex.from_product(iterables=iterables,sortorder=sortorder,names=names)),)
[docs]@staticmethoddeffrom_frame(df:DataFrame,names:Optional[List[Name]]=None)->"MultiIndex":""" Make a MultiIndex from a DataFrame. Parameters ---------- df : DataFrame DataFrame to be converted to MultiIndex. names : list-like, optional If no names are provided, use the column names, or tuple of column names if the column is a MultiIndex. If a sequence, overwrite names with the given sequence. Returns ------- MultiIndex The MultiIndex representation of the given DataFrame. See Also -------- MultiIndex.from_arrays : Convert list of arrays to MultiIndex. MultiIndex.from_tuples : Convert list of tuples to MultiIndex. MultiIndex.from_product : Make a MultiIndex from cartesian product of iterables. Examples -------- >>> df = ps.DataFrame([['HI', 'Temp'], ['HI', 'Precip'], ... ['NJ', 'Temp'], ['NJ', 'Precip']], ... columns=['a', 'b']) >>> df # doctest: +SKIP a b 0 HI Temp 1 HI Precip 2 NJ Temp 3 NJ Precip >>> ps.MultiIndex.from_frame(df) # doctest: +SKIP MultiIndex([('HI', 'Temp'), ('HI', 'Precip'), ('NJ', 'Temp'), ('NJ', 'Precip')], names=['a', 'b']) Using explicit names, instead of the column names >>> ps.MultiIndex.from_frame(df, names=['state', 'observation']) # doctest: +SKIP MultiIndex([('HI', 'Temp'), ('HI', 'Precip'), ('NJ', 'Temp'), ('NJ', 'Precip')], names=['state', 'observation']) """ifnotisinstance(df,DataFrame):raiseTypeError("Input must be a DataFrame")sdf=df._to_spark()ifnamesisNone:names=df._internal.column_labelselifnotis_list_like(names):raiseTypeError("Names should be list-like for a MultiIndex")else:names=[nameifis_name_like_tuple(name)else(name,)fornameinnames]internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinsdf.columns],index_names=names,)returncast(MultiIndex,DataFrame(internal).index)
@propertydefname(self)->Name:raisePandasNotImplementedError(class_name="pd.MultiIndex",property_name="name")@name.setterdefname(self,name:Name)->None:raisePandasNotImplementedError(class_name="pd.MultiIndex",property_name="name")@propertydefdtypes(self)->pd.Series:"""Return the dtypes as a Series for the underlying MultiIndex. .. versionadded:: 3.3.0 Returns ------- pd.Series The data type of each level. Examples -------- >>> psmidx = ps.MultiIndex.from_arrays( ... [[0, 1, 2, 3, 4, 5, 6, 7, 8], [1, 2, 3, 4, 5, 6, 7, 8, 9]], ... names=("zero", "one"), ... ) >>> psmidx.dtypes zero int64 one int64 dtype: object """returnpd.Series([field.dtypeforfieldinself._internal.index_fields],index=pd.Index([nameiflen(name)>1elsename[0]fornameinself._internal.index_names]),)def_verify_for_rename(self,name:List[Name])->List[Label]:# type: ignore[override]ifis_list_like(name):ifself._internal.index_level!=len(name):raiseValueError("Length of new names must be {}, got {}".format(self._internal.index_level,len(name)))ifany(notis_hashable(n)forninname):raiseTypeError("MultiIndex.name must be a hashable type")return[nifis_name_like_tuple(n)else(n,)forninname]else:raiseTypeError("Must pass list-like as `names`.")
[docs]defswaplevel(self,i:int=-2,j:int=-1)->"MultiIndex":""" Swap level i with level j. Calling this method does not change the ordering of the values. Parameters ---------- i : int, str, default -2 First level of index to be swapped. Can pass level name as string. Parameter types can be mixed. j : int, str, default -1 Second level of index to be swapped. Can pass level name as string. Parameter types can be mixed. Returns ------- MultiIndex A new MultiIndex. Examples -------- >>> midx = ps.MultiIndex.from_arrays([['a', 'b'], [1, 2]], names = ['word', 'number']) >>> midx # doctest: +SKIP MultiIndex([('a', 1), ('b', 2)], names=['word', 'number']) >>> midx.swaplevel(0, 1) # doctest: +SKIP MultiIndex([(1, 'a'), (2, 'b')], names=['number', 'word']) >>> midx.swaplevel('number', 'word') # doctest: +SKIP MultiIndex([(1, 'a'), (2, 'b')], names=['number', 'word']) """forindexin(i,j):ifnotisinstance(index,int)andindexnotinself.names:raiseKeyError("Level %s not found"%index)i=iifisinstance(i,int)elseself.names.index(i)j=jifisinstance(j,int)elseself.names.index(j)forindexin(i,j):ifindex>=len(self.names)orindex<-len(self.names):raiseIndexError("Too many levels: Index has only %s levels, ""%s is not a valid level number"%(len(self.names),index))index_map=list(zip(self._internal.index_spark_columns,self._internal.index_names,self._internal.index_fields,))index_map[i],index_map[j]=index_map[j],index_map[i]index_spark_columns,index_names,index_fields=zip(*index_map)internal=self._internal.copy(index_spark_columns=list(index_spark_columns),index_names=list(index_names),index_fields=list(index_fields),column_labels=[],data_spark_columns=[],data_fields=[],)returncast(MultiIndex,DataFrame(internal).index)
@propertydeflevshape(self)->Tuple[int,...]:""" A tuple with the length of each level. Examples -------- >>> midx = ps.MultiIndex.from_tuples([('a', 'x'), ('b', 'y'), ('c', 'z')]) >>> midx # doctest: +SKIP MultiIndex([('a', 'x'), ('b', 'y'), ('c', 'z')], ) >>> midx.levshape (3, 3) """result=self._internal.spark_frame.agg(*(F.countDistinct(c)forcinself._internal.index_spark_columns)).collect()[0]returntuple(result)@staticmethoddef_comparator_for_monotonic_increasing(data_type:DataType,)->Callable[[PySparkColumn,PySparkColumn,Callable[[PySparkColumn,PySparkColumn],PySparkColumn]],PySparkColumn,]:returncompare_disallow_nulldef_is_monotonic(self,order:str)->bool:iforder=="increasing":returnself._is_monotonic_increasing().all()else:returnself._is_monotonic_decreasing().all()def_is_monotonic_increasing(self)->Series:window=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-1,-1)cond=F.lit(True)has_not_null=F.lit(True)Column=get_column_class()forscolinself._internal.index_spark_columns[::-1]:data_type=self._internal.spark_type_for(scol)prev=F.lag(scol,1).over(window)compare=MultiIndex._comparator_for_monotonic_increasing(data_type)# Since pandas 1.1.4, null value is not allowed at any levels of MultiIndex.# Therefore, we should check `has_not_null` over all levels.has_not_null=has_not_null&scol.isNotNull()cond=F.when(scol.eqNullSafe(prev),cond).otherwise(compare(scol,prev,Column.__gt__))cond=has_not_null&(prev.isNull()|cond)cond_name=verify_temp_column_name(self._internal.spark_frame.select(self._internal.index_spark_columns),"__is_monotonic_increasing_cond__",)sdf=self._internal.spark_frame.select(self._internal.index_spark_columns+[cond.alias(cond_name)])internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinself._internal.index_spark_column_names],index_names=self._internal.index_names,index_fields=self._internal.index_fields,)returnfirst_series(DataFrame(internal))@staticmethoddef_comparator_for_monotonic_decreasing(data_type:DataType,)->Callable[[PySparkColumn,PySparkColumn,Callable[[PySparkColumn,PySparkColumn],PySparkColumn]],PySparkColumn,]:returncompare_disallow_nulldef_is_monotonic_decreasing(self)->Series:window=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-1,-1)cond=F.lit(True)has_not_null=F.lit(True)Column=get_column_class()forscolinself._internal.index_spark_columns[::-1]:data_type=self._internal.spark_type_for(scol)prev=F.lag(scol,1).over(window)compare=MultiIndex._comparator_for_monotonic_increasing(data_type)# Since pandas 1.1.4, null value is not allowed at any levels of MultiIndex.# Therefore, we should check `has_not_null` over all levels.has_not_null=has_not_null&scol.isNotNull()cond=F.when(scol.eqNullSafe(prev),cond).otherwise(compare(scol,prev,Column.__lt__))cond=has_not_null&(prev.isNull()|cond)cond_name=verify_temp_column_name(self._internal.spark_frame.select(self._internal.index_spark_columns),"__is_monotonic_decreasing_cond__",)sdf=self._internal.spark_frame.select(self._internal.index_spark_columns+[cond.alias(cond_name)])internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinself._internal.index_spark_column_names],index_names=self._internal.index_names,index_fields=self._internal.index_fields,)returnfirst_series(DataFrame(internal))
[docs]defto_frame(# type: ignore[override]self,index:bool=True,name:Optional[List[Name]]=None)->DataFrame:""" Create a DataFrame with the levels of the MultiIndex as columns. Column ordering is determined by the DataFrame constructor with data as a dict. Parameters ---------- index : boolean, default True Set the index of the returned DataFrame as the original MultiIndex. name : list / sequence of strings, optional The passed names should substitute index level names. Returns ------- DataFrame : a DataFrame containing the original MultiIndex data. See Also -------- DataFrame Examples -------- >>> tuples = [(1, 'red'), (1, 'blue'), ... (2, 'red'), (2, 'blue')] >>> idx = ps.MultiIndex.from_tuples(tuples, names=('number', 'color')) >>> idx # doctest: +SKIP MultiIndex([(1, 'red'), (1, 'blue'), (2, 'red'), (2, 'blue')], names=['number', 'color']) >>> idx.to_frame() # doctest: +NORMALIZE_WHITESPACE number color number color 1 red 1 red blue 1 blue 2 red 2 red blue 2 blue By default, the original Index is reused. To enforce a new Index: >>> idx.to_frame(index=False) number color 0 1 red 1 1 blue 2 2 red 3 2 blue To override the name of the resulting column, specify `name`: >>> idx.to_frame(name=['n', 'c']) # doctest: +NORMALIZE_WHITESPACE n c number color 1 red 1 red blue 1 blue 2 red 2 red blue 2 blue """ifnameisNone:name=[nameifnameisnotNoneelse(i,)fori,nameinenumerate(self._internal.index_names)]elifis_list_like(name):iflen(name)!=self._internal.index_level:raiseValueError("'name' should have same length as number of levels on index.")name=[nifis_name_like_tuple(n)else(n,)forninname]else:raiseTypeError("'name' must be a list / sequence of column names.")returnself._to_frame(index=index,names=name)
defto_pandas(self)->pd.MultiIndex:""" Return a pandas MultiIndex. .. note:: This method should only be used if the resulting pandas object is expected to be small, as all the data is loaded into the driver's memory. Examples -------- >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], ... columns=['dogs', 'cats'], ... index=[list('abcd'), list('efgh')]) >>> df['dogs'].index.to_pandas() # doctest: +SKIP MultiIndex([('a', 'e'), ('b', 'f'), ('c', 'g'), ('d', 'h')], ) """# TODO: We might need to handle internal state change.# So far, we don't have any functions to change the internal state of MultiIndex except for# series-like operations. In that case, it creates a new Index object instead of MultiIndex.returncast(pd.MultiIndex,super().to_pandas())def_to_pandas(self)->pd.MultiIndex:""" Same as `to_pandas()`, without issuing the advice log for internal usage. """returncast(pd.MultiIndex,super()._to_pandas())defnunique(self,dropna:bool=True,approx:bool=False,rsd:float=0.05)->int:raiseNotImplementedError("nunique is not defined for MultiIndex")# TODO: add 'name' parameter after pd.MultiIndex.name is implemented
[docs]defcopy(self,deep:Optional[bool]=None)->"MultiIndex":# type: ignore[override]""" Make a copy of this object. Parameters ---------- deep : None this parameter is not supported but just dummy parameter to match pandas. Examples -------- >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], ... columns=['dogs', 'cats'], ... index=[list('abcd'), list('efgh')]) >>> df['dogs'].index # doctest: +SKIP MultiIndex([('a', 'e'), ('b', 'f'), ('c', 'g'), ('d', 'h')], ) Copy index >>> df.index.copy() # doctest: +SKIP MultiIndex([('a', 'e'), ('b', 'f'), ('c', 'g'), ('d', 'h')], ) """returncast(MultiIndex,super().copy(deep=deep))
[docs]defsymmetric_difference(# type: ignore[override]self,other:Index,result_name:Optional[List[Name]]=None,sort:Optional[bool]=None,)->"MultiIndex":""" Compute the symmetric difference of two MultiIndex objects. Parameters ---------- other : Index or array-like result_name : list sort : True or None, default None Whether to sort the resulting index. * True : Attempt to sort the result. * None : Do not sort the result. Returns ------- symmetric_difference : MultiIndex Notes ----- ``symmetric_difference`` contains elements that appear in either ``idx1`` or ``idx2`` but not both. Equivalent to the Index created by ``idx1.difference(idx2) | idx2.difference(idx1)`` with duplicates dropped. Examples -------- >>> midx1 = pd.MultiIndex([['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 0, 0, 0, 1, 2, 0, 1, 2]]) >>> midx2 = pd.MultiIndex([['pandas-on-Spark', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [0, 0, 0, 0, 1, 2, 0, 1, 2]]) >>> s1 = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx1) >>> s2 = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], ... index=midx2) >>> s1.index.symmetric_difference(s2.index) # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], ) You can set names of the result Index. >>> s1.index.symmetric_difference(s2.index, result_name=['a', 'b']) # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], names=['a', 'b']) You can set sort to `True`, if you want to sort the resulting index. >>> s1.index.symmetric_difference(s2.index, sort=True) # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], ) You can also use the ``^`` operator: >>> s1.index ^ s2.index # doctest: +SKIP MultiIndex([('pandas-on-Spark', 'speed'), ( 'lama', 'speed')], ) """iftype(self)!=type(other):raiseNotImplementedError("Doesn't support symmetric_difference between Index & MultiIndex for now")sdf_self=self._psdf._internal.spark_frame.select(self._internal.index_spark_columns)sdf_other=other._psdf._internal.spark_frame.select(other._internal.index_spark_columns)sdf_symdiff=xor(sdf_self,sdf_other)ifsort:sdf_symdiff=sdf_symdiff.sort(*self._internal.index_spark_column_names)internal=InternalFrame(spark_frame=sdf_symdiff,index_spark_columns=[scol_for(sdf_symdiff,col)forcolinself._internal.index_spark_column_names],index_names=self._internal.index_names,index_fields=self._internal.index_fields,)result=cast(MultiIndex,DataFrame(internal).index)ifresult_name:result.names=result_namereturnresult
# TODO: ADD error parameter
[docs]defdrop(self,codes:List[Any],level:Optional[Union[int,Name]]=None)->"MultiIndex":""" Make new MultiIndex with passed list of labels deleted Parameters ---------- codes : array-like Must be a list of tuples level : int or level name, default None Returns ------- dropped : MultiIndex Examples -------- >>> index = ps.MultiIndex.from_tuples([('a', 'x'), ('b', 'y'), ('c', 'z')]) >>> index # doctest: +SKIP MultiIndex([('a', 'x'), ('b', 'y'), ('c', 'z')], ) >>> index.drop(['a']) # doctest: +SKIP MultiIndex([('b', 'y'), ('c', 'z')], ) >>> index.drop(['x', 'y'], level=1) # doctest: +SKIP MultiIndex([('c', 'z')], ) """internal=self._internal.resolved_copysdf=internal.spark_frameindex_scols=internal.index_spark_columnsiflevelisNone:scol=index_scols[0]elifisinstance(level,int):scol=index_scols[level]else:scol=Noneforindex_spark_column,index_nameinzip(internal.index_spark_columns,internal.index_names):ifnotisinstance(level,tuple):level=(level,)iflevel==index_name:ifscolisnotNone:raiseValueError("The name {} occurs multiple times, use a level number".format(name_like_string(level)))scol=index_spark_columnifscolisNone:raiseKeyError("Level {} not found".format(name_like_string(level)))sdf=sdf[~scol.isin(codes)]internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolininternal.index_spark_column_names],index_names=internal.index_names,index_fields=internal.index_fields,column_labels=[],data_spark_columns=[],data_fields=[],)returncast(MultiIndex,DataFrame(internal).index)
defdrop_duplicates(self,keep:Union[bool,str]="first")->"MultiIndex":""" Return MultiIndex with duplicate values removed. Parameters ---------- keep : {'first', 'last', ``False``}, default 'first' Method to handle dropping duplicates: - 'first' : Drop duplicates except for the first occurrence. - 'last' : Drop duplicates except for the last occurrence. - ``False`` : Drop all duplicates. Returns ------- deduplicated : MultiIndex See Also -------- Series.drop_duplicates : Equivalent method on Series. DataFrame.drop_duplicates : Equivalent method on DataFrame. Examples -------- Generate a MultiIndex with duplicate values. >>> arrays = [[1, 2, 3, 1, 2], ["red", "blue", "black", "red", "blue"]] >>> midx = ps.MultiIndex.from_arrays(arrays, names=("number", "color")) >>> midx MultiIndex([(1, 'red'), (2, 'blue'), (3, 'black'), (1, 'red'), (2, 'blue')], names=['number', 'color']) >>> midx.drop_duplicates() MultiIndex([(1, 'red'), (2, 'blue'), (3, 'black')], names=['number', 'color']) >>> midx.drop_duplicates(keep='first') MultiIndex([(1, 'red'), (2, 'blue'), (3, 'black')], names=['number', 'color']) >>> midx.drop_duplicates(keep='last') MultiIndex([(3, 'black'), (1, 'red'), (2, 'blue')], names=['number', 'color']) >>> midx.drop_duplicates(keep=False) MultiIndex([(3, 'black')], names=['number', 'color']) """withps.option_context("compute.default_index_type","distributed"):# The attached index caused by `reset_index` below is used for sorting only,# and it will be dropped soon,# so we enforce “distributed” default index typepsdf=self.to_frame().reset_index(drop=True)returnps.MultiIndex.from_frame(psdf.drop_duplicates(keep=keep).sort_index())defargmax(self)->None:raiseTypeError("reduction operation 'argmax' not allowed for this dtype")defargmin(self)->None:raiseTypeError("reduction operation 'argmin' not allowed for this dtype")defasof(self,label:Any)->None:raiseNotImplementedError("only the default get_loc method is currently supported for MultiIndex")def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeMultiIndex,item):property_or_func=getattr(MissingPandasLikeMultiIndex,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError("'MultiIndex' object has no attribute '{}'".format(item))def_get_level_number(self,level:Union[int,Name])->int:""" Return the level number if a valid level is given. """count=self.names.count(level)if(count>1)andnotisinstance(level,int):raiseValueError("The name %s occurs multiple times, use a level number"%level)iflevelinself.names:level=self.names.index(level)elifisinstance(level,int):nlevels=self.nlevelsiflevel>=nlevels:raiseIndexError("Too many levels: Index has only %d ""levels, %d is not a valid level number"%(nlevels,level))iflevel<0:if(level+nlevels)<0:raiseIndexError("Too many levels: Index has only %d levels, ""not %d"%(nlevels,level+1))level=level+nlevelselse:raiseKeyError("Level %s not found"%str(level))returnleveldefget_level_values(self,level:Union[int,Name])->Index:""" Return vector of label values for requested level, equal to the length of the index. Parameters ---------- level : int or str ``level`` is either the integer position of the level in the MultiIndex, or the name of the level. Returns ------- values : Index Values is a level of this MultiIndex converted to a single :class:`Index` (or subclass thereof). Examples -------- Create a MultiIndex: >>> mi = ps.MultiIndex.from_tuples([('x', 'a'), ('x', 'b'), ('y', 'a')]) >>> mi.names = ['level_1', 'level_2'] Get level values by supplying level as either integer or name: >>> mi.get_level_values(0) Index(['x', 'x', 'y'], dtype='object', name='level_1') >>> mi.get_level_values('level_2') Index(['a', 'b', 'a'], dtype='object', name='level_2') """level=self._get_level_number(level)index_scol=self._internal.index_spark_columns[level]index_name=self._internal.index_names[level]index_field=self._internal.index_fields[level]internal=self._internal.copy(index_spark_columns=[index_scol],index_names=[index_name],index_fields=[index_field],column_labels=[],data_spark_columns=[],data_fields=[],)returnDataFrame(internal).index
[docs]definsert(self,loc:int,item:Any)->Index:""" Make new MultiIndex inserting new item at location. Follows Python list.append semantics for negative values. .. versionchanged:: 3.4.0 Raise IndexError when loc is out of bounds to follow Pandas 1.4+ behavior Parameters ---------- loc : int item : object Returns ------- new_index : MultiIndex Examples -------- >>> psmidx = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")]) >>> psmidx.insert(3, ("h", "j")) # doctest: +SKIP MultiIndex([('a', 'x'), ('b', 'y'), ('c', 'z'), ('h', 'j')], ) For negative values >>> psmidx.insert(-2, ("h", "j")) # doctest: +SKIP MultiIndex([('a', 'x'), ('h', 'j'), ('b', 'y'), ('c', 'z')], ) """validate_index_loc(self,loc)loc=loc+len(self)ifloc<0elselocindex_name:List[Label]=[(name,)fornameinself._internal.index_spark_column_names]sdf_before=self.to_frame(name=index_name)[:loc]._to_spark()sdf_middle=Index([item]).to_frame(name=index_name)._to_spark()sdf_after=self.to_frame(name=index_name)[loc:]._to_spark()sdf=sdf_before.union(sdf_middle).union(sdf_after)internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinself._internal.index_spark_column_names],index_names=self._internal.index_names,index_fields=[InternalField(field.dtype)forfieldinself._internal.index_fields],)returnDataFrame(internal).index
[docs]defitem(self)->Tuple[Scalar,...]:""" Return the first element of the underlying data as a python tuple. Returns ------- tuple The first element of MultiIndex. Raises ------ ValueError If the data is not length-1. Examples -------- >>> psmidx = ps.MultiIndex.from_tuples([('a', 'x')]) >>> psmidx.item() ('a', 'x') """returnself._psdf.head(2)._to_internal_pandas().index.item()
[docs]defintersection(self,other:Union[DataFrame,Series,Index,List])->"MultiIndex":""" Form the intersection of two Index objects. This returns a new Index with elements common to the index and `other`. Parameters ---------- other : Index or array-like Returns ------- intersection : MultiIndex Examples -------- >>> midx1 = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")]) >>> midx2 = ps.MultiIndex.from_tuples([("c", "z"), ("d", "w")]) >>> midx1.intersection(midx2).sort_values() # doctest: +SKIP MultiIndex([('c', 'z')], ) """ifisinstance(other,Series)ornotis_list_like(other):raiseTypeError("other must be a MultiIndex or a list of tuples")elifisinstance(other,DataFrame):raiseValueError("Index data must be 1-dimensional")elifisinstance(other,MultiIndex):spark_frame_other=other.to_frame()._to_spark()keep_name=self.names==other.nameselifisinstance(other,Index):# Always returns an empty MultiIndex if `other` is Index.returncast(MultiIndex,self.to_frame().head(0).index)elifnotall(isinstance(item,tuple)foriteminother):raiseTypeError("other must be a MultiIndex or a list of tuples")else:other=MultiIndex.from_tuples(list(other))spark_frame_other=cast(MultiIndex,other).to_frame()._to_spark()keep_name=Trueindex_fields=self._index_fields_for_union_like(other,func_name="intersection")default_name:List[Name]=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(self.nlevels)]spark_frame_self=self.to_frame(name=default_name)._to_spark()spark_frame_intersected=spark_frame_self.intersect(spark_frame_other)ifkeep_name:index_names=self._internal.index_nameselse:index_names=Noneinternal=InternalFrame(spark_frame=spark_frame_intersected,index_spark_columns=[scol_for(spark_frame_intersected,cast(str,col))forcolindefault_name],index_names=index_names,index_fields=index_fields,)returncast(MultiIndex,DataFrame(internal).index)
[docs]defequal_levels(self,other:"MultiIndex")->bool:""" Return True if the levels of both MultiIndex objects are the same .. versionadded:: 3.3.0 Examples -------- >>> psmidx1 = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")]) >>> psmidx2 = ps.MultiIndex.from_tuples([("b", "y"), ("a", "x"), ("c", "z")]) >>> psmidx1.equal_levels(psmidx2) True >>> psmidx2 = ps.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "j")]) >>> psmidx1.equal_levels(psmidx2) False """nlevels=self.nlevelsifnlevels!=other.nlevels:returnFalseself_sdf=self._internal.spark_frameother_sdf=other._internal.spark_framesubtract_list=[]fornlevelinrange(nlevels):self_index_scol=self._internal.index_spark_columns[nlevel]other_index_scol=other._internal.index_spark_columns[nlevel]self_subtract_other=self_sdf.select(self_index_scol).subtract(other_sdf.select(other_index_scol))subtract_list.append(self_subtract_other)unioned_subtracts=reduce(lambdax,y:x.union(y),subtract_list)returnlen(unioned_subtracts.head(1))==0
@propertydefhasnans(self)->bool:raiseNotImplementedError("hasnans is not defined for MultiIndex")@propertydefinferred_type(self)->str:""" Return a string of the type inferred from the values. """# Always returns "mixed" for MultiIndexreturn"mixed"deffactorize(self,sort:bool=True,na_sentinel:Optional[int]=-1)->Tuple["MultiIndex",pd.Index]:returnMissingPandasLikeMultiIndex.factorize(self,sort=sort,na_sentinel=na_sentinel)def__iter__(self)->Iterator:returnMissingPandasLikeMultiIndex.__iter__(self)defmap(self,mapper:Union[dict,Callable[[Any],Any],pd.Series]=None,na_action:Optional[str]=None,)->"Index":returnMissingPandasLikeMultiIndex.map(self,mapper,na_action)