## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#"""A wrapper for GroupedData to behave similar to pandas GroupBy."""fromabcimportABCMeta,abstractmethodimportinspectfromcollectionsimportdefaultdict,namedtuplefromdistutils.versionimportLooseVersionfromfunctoolsimportpartialfromitertoolsimportproductfromtypingimport(Any,Callable,Dict,Generic,Iterator,Mapping,List,Optional,Sequence,Set,Tuple,Union,cast,TYPE_CHECKING,)importwarningsimportpandasaspdfrompandas.api.typesimportis_hashable,is_list_like# type: ignore[attr-defined]ifLooseVersion(pd.__version__)>=LooseVersion("1.3.0"):frompandas.core.commonimport_builtin_table# type: ignore[attr-defined]else:frompandas.core.baseimportSelectionMixin_builtin_table=SelectionMixin._builtin_table# type: ignore[attr-defined]frompyspark.sqlimportColumn,DataFrameasSparkDataFrame,Window,functionsasFfrompyspark.sql.typesimport(NumericType,StructField,StructType,StringType,)frompysparkimportpandasasps# For running doctests and reference resolution in PyCharm.frompyspark.pandas._typingimportAxis,FrameLike,Label,Namefrompyspark.pandas.typedefimportinfer_return_type,DataFrameType,ScalarType,SeriesTypefrompyspark.pandas.frameimportDataFramefrompyspark.pandas.internalimport(InternalField,InternalFrame,HIDDEN_COLUMNS,NATURAL_ORDER_COLUMN_NAME,SPARK_INDEX_NAME_FORMAT,SPARK_DEFAULT_SERIES_NAME,SPARK_INDEX_NAME_PATTERN,)frompyspark.pandas.missing.groupbyimport(MissingPandasLikeDataFrameGroupBy,MissingPandasLikeSeriesGroupBy,)frompyspark.pandas.seriesimportSeries,first_seriesfrompyspark.pandas.sparkimportfunctionsasSFfrompyspark.pandas.configimportget_optionfrompyspark.pandas.utilsimport(align_diff_frames,is_name_like_tuple,is_name_like_value,name_like_string,same_anchor,scol_for,verify_temp_column_name,log_advice,)frompyspark.pandas.spark.utilsimportas_nullable_spark_type,force_decimal_precision_scalefrompyspark.pandas.exceptionsimportDataErrorifTYPE_CHECKING:frompyspark.pandas.windowimportRollingGroupby,ExpandingGroupby# to keep it the same as pandasNamedAgg=namedtuple("NamedAgg",["column","aggfunc"])classGroupBy(Generic[FrameLike],metaclass=ABCMeta):""" :ivar _psdf: The parent dataframe that is used to perform the groupby :type _psdf: DataFrame :ivar _groupkeys: The list of keys that will be used to perform the grouping :type _groupkeys: List[Series] """def__init__(self,psdf:DataFrame,groupkeys:List[Series],as_index:bool,dropna:bool,column_labels_to_exclude:Set[Label],agg_columns_selected:bool,agg_columns:List[Series],):self._psdf=psdfself._groupkeys=groupkeysself._as_index=as_indexself._dropna=dropnaself._column_labels_to_exclude=column_labels_to_excludeself._agg_columns_selected=agg_columns_selectedself._agg_columns=agg_columns@propertydef_groupkeys_scols(self)->List[Column]:return[s.spark.columnforsinself._groupkeys]@propertydef_agg_columns_scols(self)->List[Column]:return[s.spark.columnforsinself._agg_columns]@abstractmethoddef_apply_series_op(self,op:Callable[["SeriesGroupBy"],Series],should_resolve:bool=False,numeric_only:bool=False,)->FrameLike:pass@abstractmethoddef_cleanup_and_return(self,psdf:DataFrame)->FrameLike:pass# TODO: Series support is not implemented yet.# TODO: not all arguments are implemented comparing to pandas' for now.defaggregate(self,func_or_funcs:Optional[Union[str,List[str],Dict[Name,Union[str,List[str]]]]]=None,*args:Any,**kwargs:Any,)->DataFrame:"""Aggregate using one or more operations over the specified axis. Parameters ---------- func_or_funcs : dict, str or list a dict mapping from column name (string) to aggregate functions (string or list of strings). Returns ------- Series or DataFrame The return can be: * Series : when DataFrame.agg is called with a single function * DataFrame : when DataFrame.agg is called with several functions Return Series or DataFrame. Notes ----- `agg` is an alias for `aggregate`. Use the alias. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 2], ... 'B': [1, 2, 3, 4], ... 'C': [0.362, 0.227, 1.267, -0.562]}, ... columns=['A', 'B', 'C']) >>> df A B C 0 1 1 0.362 1 1 2 0.227 2 2 3 1.267 3 2 4 -0.562 Different aggregations per column >>> aggregated = df.groupby('A').agg({'B': 'min', 'C': 'sum'}) >>> aggregated[['B', 'C']].sort_index() # doctest: +NORMALIZE_WHITESPACE B C A 1 1 0.589 2 3 0.705 >>> aggregated = df.groupby('A').agg({'B': ['min', 'max']}) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE B min max A 1 1 2 2 3 4 >>> aggregated = df.groupby('A').agg('min') >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE B C A 1 1 0.227 2 3 -0.562 >>> aggregated = df.groupby('A').agg(['min', 'max']) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE B C min max min max A 1 1 2 0.227 0.362 2 3 4 -0.562 1.267 To control the output names with different aggregations per column, pandas-on-Spark also supports 'named aggregation' or nested renaming in .agg. It can also be used when applying multiple aggregation functions to specific columns. >>> aggregated = df.groupby('A').agg(b_max=ps.NamedAgg(column='B', aggfunc='max')) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE b_max A 1 2 2 4 >>> aggregated = df.groupby('A').agg(b_max=('B', 'max'), b_min=('B', 'min')) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE b_max b_min A 1 2 1 2 4 3 >>> aggregated = df.groupby('A').agg(b_max=('B', 'max'), c_min=('C', 'min')) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE b_max c_min A 1 2 0.227 2 4 -0.562 """# I think current implementation of func and arguments in pandas-on-Spark for aggregate# is different than pandas, later once arguments are added, this could be removed.iffunc_or_funcsisNoneandkwargsisNone:raiseValueError("No aggregation argument or function specified.")relabeling=func_or_funcsisNoneandis_multi_agg_with_relabel(**kwargs)ifrelabeling:(func_or_funcs,columns,order,)=normalize_keyword_aggregation(# type: ignore[assignment]kwargs)ifnotisinstance(func_or_funcs,(str,list)):ifnotisinstance(func_or_funcs,dict)ornotall(is_name_like_value(key)and(isinstance(value,str)orisinstance(value,list)andall(isinstance(v,str)forvinvalue))forkey,valueinfunc_or_funcs.items()):raiseValueError("aggs must be a dict mapping from column name ""to aggregate functions (string or list of strings).")else:agg_cols=[col.nameforcolinself._agg_columns]func_or_funcs={col:func_or_funcsforcolinagg_cols}psdf:DataFrame=DataFrame(GroupBy._spark_groupby(self._psdf,func_or_funcs,self._groupkeys))ifself._dropna:psdf=DataFrame(psdf._internal.with_new_sdf(psdf._internal.spark_frame.dropna(subset=psdf._internal.index_spark_column_names)))ifnotself._as_index:should_drop_index=set(ifori,gkeyinenumerate(self._groupkeys)ifgkey._psdfisnotself._psdf)iflen(should_drop_index)>0:psdf=psdf.reset_index(level=should_drop_index,drop=True)iflen(should_drop_index)<len(self._groupkeys):psdf=psdf.reset_index()ifrelabeling:psdf=psdf[order]psdf.columns=columns# type: ignore[assignment]returnpsdfagg=aggregate@staticmethoddef_spark_groupby(psdf:DataFrame,func:Mapping[Name,Union[str,List[str]]],groupkeys:Sequence[Series]=(),)->InternalFrame:groupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(groupkeys))]groupkey_scols=[s.spark.column.alias(name)fors,nameinzip(groupkeys,groupkey_names)]multi_aggs=any(isinstance(v,list)forvinfunc.values())reordered=[]data_columns=[]column_labels=[]forkey,valueinfunc.items():label=keyifis_name_like_tuple(key)else(key,)iflen(label)!=psdf._internal.column_labels_level:raiseTypeError("The length of the key must be the same as the column label level.")foraggfuncin[value]ifisinstance(value,str)elsevalue:column_label=tuple(list(label)+[aggfunc])ifmulti_aggselselabelcolumn_labels.append(column_label)data_col=name_like_string(column_label)data_columns.append(data_col)col_name=psdf._internal.spark_column_name_for(label)ifaggfunc=="nunique":reordered.append(F.expr("count(DISTINCT `{0}`) as `{1}`".format(col_name,data_col)))# Implement "quartiles" aggregate function for ``describe``.elifaggfunc=="quartiles":reordered.append(F.expr("percentile_approx(`{0}`, array(0.25, 0.5, 0.75)) as `{1}`".format(col_name,data_col)))else:reordered.append(F.expr("{1}(`{0}`) as `{2}`".format(col_name,aggfunc,data_col)))sdf=psdf._internal.spark_frame.select(groupkey_scols+psdf._internal.data_spark_columns)sdf=sdf.groupby(*groupkey_names).agg(*reordered)returnInternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsseringroupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(groupkeys,groupkey_names)],column_labels=column_labels,data_spark_columns=[scol_for(sdf,col)forcolindata_columns],)
[docs]defcount(self)->FrameLike:""" Compute count of group, excluding missing values. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], ... 'B': [np.nan, 2, 3, 4, 5], ... 'C': [1, 2, 1, 1, 2]}, columns=['A', 'B', 'C']) >>> df.groupby('A').count().sort_index() # doctest: +NORMALIZE_WHITESPACE B C A 1 2 3 2 2 2 """returnself._reduce_for_stat_function(F.count,only_numeric=False)
# TODO: We should fix See Also when Series implementation is finished.
[docs]deffirst(self)->FrameLike:""" Compute first of group values. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """returnself._reduce_for_stat_function(F.first,only_numeric=False)
[docs]deflast(self)->FrameLike:""" Compute last of group values. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """returnself._reduce_for_stat_function(lambdacol:F.last(col,ignorenulls=True),only_numeric=False)
[docs]defmax(self)->FrameLike:""" Compute max of group values. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """returnself._reduce_for_stat_function(F.max,only_numeric=False)
# TODO: examples should be updated.
[docs]defmean(self)->FrameLike:""" Compute mean of groups, excluding missing values. Returns ------- pyspark.pandas.Series or pyspark.pandas.DataFrame See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], ... 'B': [np.nan, 2, 3, 4, 5], ... 'C': [1, 2, 1, 1, 2]}, columns=['A', 'B', 'C']) Groupby one column and return the mean of the remaining columns in each group. >>> df.groupby('A').mean().sort_index() # doctest: +NORMALIZE_WHITESPACE B C A 1 3.0 1.333333 2 4.0 1.500000 """returnself._reduce_for_stat_function(F.mean,only_numeric=True)
[docs]defmin(self)->FrameLike:""" Compute min of group values. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """returnself._reduce_for_stat_function(F.min,only_numeric=False)
# TODO: sync the doc.
[docs]defstd(self,ddof:int=1)->FrameLike:""" Compute standard deviation of groups, excluding missing values. Parameters ---------- ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """assertddofin(0,1)returnself._reduce_for_stat_function(F.stddev_popifddof==0elseF.stddev_samp,only_numeric=True)
[docs]defsum(self)->FrameLike:""" Compute sum of group values See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """returnself._reduce_for_stat_function(F.sum,only_numeric=True)
# TODO: sync the doc.
[docs]defvar(self,ddof:int=1)->FrameLike:""" Compute variance of groups, excluding missing values. Parameters ---------- ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """assertddofin(0,1)returnself._reduce_for_stat_function(F.var_popifddof==0elseF.var_samp,only_numeric=True)
# TODO: skipna should be implemented.
[docs]defall(self)->FrameLike:""" Returns True if all values in the group are truthful, else False. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 2, 3, 3, 4, 4, 5, 5], ... 'B': [True, True, True, False, False, ... False, None, True, None, False]}, ... columns=['A', 'B']) >>> df A B 0 1 True 1 1 True 2 2 True 3 2 False 4 3 False 5 3 False 6 4 None 7 4 True 8 5 None 9 5 False >>> df.groupby('A').all().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 1 True 2 False 3 False 4 True 5 False """returnself._reduce_for_stat_function(lambdacol:F.min(F.coalesce(col.cast("boolean"),SF.lit(True))),only_numeric=False)
# TODO: skipna should be implemented.
[docs]defany(self)->FrameLike:""" Returns True if any value in the group is truthful, else False. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 2, 3, 3, 4, 4, 5, 5], ... 'B': [True, True, True, False, False, ... False, None, True, None, False]}, ... columns=['A', 'B']) >>> df A B 0 1 True 1 1 True 2 2 True 3 2 False 4 3 False 5 3 False 6 4 None 7 4 True 8 5 None 9 5 False >>> df.groupby('A').any().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 1 True 2 True 3 False 4 True 5 False """returnself._reduce_for_stat_function(lambdacol:F.max(F.coalesce(col.cast("boolean"),SF.lit(False))),only_numeric=False)
# TODO: groupby multiply columns should be implemented.
[docs]defdiff(self,periods:int=1)->FrameLike:""" First discrete difference of element. Calculates the difference of a DataFrame element compared with another element in the DataFrame group (default is the element in the same column of the previous row). Parameters ---------- periods : int, default 1 Periods to shift for calculating difference, accepts negative values. Returns ------- diffed : DataFrame or Series See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'a': [1, 2, 3, 4, 5, 6], ... 'b': [1, 1, 2, 3, 5, 8], ... 'c': [1, 4, 9, 16, 25, 36]}, columns=['a', 'b', 'c']) >>> df a b c 0 1 1 1 1 2 1 4 2 3 2 9 3 4 3 16 4 5 5 25 5 6 8 36 >>> df.groupby(['b']).diff().sort_index() a c 0 NaN NaN 1 1.0 3.0 2 NaN NaN 3 NaN NaN 4 NaN NaN 5 NaN NaN Difference with previous column in a group. >>> df.groupby(['b'])['a'].diff().sort_index() 0 NaN 1 1.0 2 NaN 3 NaN 4 NaN 5 NaN Name: a, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._diff(periods,part_cols=sg._groupkeys_scols),should_resolve=True)
[docs]defcumcount(self,ascending:bool=True)->Series:""" Number each item in each group from 0 to the length of that group - 1. Essentially this is equivalent to .. code-block:: python self.apply(lambda x: pd.Series(np.arange(len(x)), x.index)) Parameters ---------- ascending : bool, default True If False, number in reverse, from length of group - 1 to 0. Returns ------- Series Sequence number of each element within each group. Examples -------- >>> df = ps.DataFrame([['a'], ['a'], ['a'], ['b'], ['b'], ['a']], ... columns=['A']) >>> df A 0 a 1 a 2 a 3 b 4 b 5 a >>> df.groupby('A').cumcount().sort_index() 0 0 1 1 2 2 3 0 4 1 5 3 dtype: int64 >>> df.groupby('A').cumcount(ascending=False).sort_index() 0 3 1 2 2 1 3 1 4 0 5 0 dtype: int64 """ret=(self._groupkeys[0].rename().spark.transform(lambda_:SF.lit(0))._cum(F.count,True,part_cols=self._groupkeys_scols,ascending=ascending)-1)internal=ret._internal.resolved_copyreturnfirst_series(DataFrame(internal))
[docs]defcummax(self)->FrameLike:""" Cumulative max for each group. Returns ------- Series or DataFrame See Also -------- Series.cummax DataFrame.cummax Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cummax().sort_index() B C 0 NaN 4 1 0.1 4 2 20.0 4 3 10.0 1 It works as below in Series. >>> df.C.groupby(df.A).cummax().sort_index() 0 4 1 4 2 4 3 1 Name: C, dtype: int64 """returnself._apply_series_op(lambdasg:sg._psser._cum(F.max,True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defcummin(self)->FrameLike:""" Cumulative min for each group. Returns ------- Series or DataFrame See Also -------- Series.cummin DataFrame.cummin Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cummin().sort_index() B C 0 NaN 4 1 0.1 3 2 0.1 2 3 10.0 1 It works as below in Series. >>> df.B.groupby(df.A).cummin().sort_index() 0 NaN 1 0.1 2 0.1 3 10.0 Name: B, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._cum(F.min,True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defcumprod(self)->FrameLike:""" Cumulative product for each group. Returns ------- Series or DataFrame See Also -------- Series.cumprod DataFrame.cumprod Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cumprod().sort_index() B C 0 NaN 4 1 0.1 12 2 2.0 24 3 10.0 1 It works as below in Series. >>> df.B.groupby(df.A).cumprod().sort_index() 0 NaN 1 0.1 2 2.0 3 10.0 Name: B, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._cumprod(True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defcumsum(self)->FrameLike:""" Cumulative sum for each group. Returns ------- Series or DataFrame See Also -------- Series.cumsum DataFrame.cumsum Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cumsum().sort_index() B C 0 NaN 4 1 0.1 7 2 20.1 9 3 10.0 1 It works as below in Series. >>> df.B.groupby(df.A).cumsum().sort_index() 0 NaN 1 0.1 2 20.1 3 10.0 Name: B, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._cumsum(True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defapply(self,func:Callable,*args:Any,**kwargs:Any)->Union[DataFrame,Series]:""" Apply function `func` group-wise and combine the results together. The function passed to `apply` must take a DataFrame as its first argument and return a DataFrame. `apply` will then take care of combining the results back together into a single dataframe. `apply` is therefore a highly flexible grouping method. While `apply` is a very flexible method, its downside is that using it can be quite a bit slower than using more specific methods like `agg` or `transform`. pandas-on-Spark offers a wide range of method that will be much faster than using `apply` for their specific purposes, so try to use them before reaching for `apply`. .. note:: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in ``func``, for instance, as below: >>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]: ... return x[['B', 'C']] / x[['B', 'C']] If the return type is specified, the output column names become `c0, c1, c2 ... cn`. These names are positionally mapped to the returned DataFrame in ``func``. To specify the column names, you can assign them in a NumPy compound type style as below: >>> def pandas_div(x) -> ps.DataFrame[("index", int), [("a", float), ("b", float)]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> pdf = pd.DataFrame({'B': [1.], 'C': [3.]}) >>> def plus_one(x) -> ps.DataFrame[ ... (pdf.index.name, pdf.index.dtype), zip(pdf.columns, pdf.dtypes)]: ... return x[['B', 'C']] / x[['B', 'C']] .. note:: the dataframe within ``func`` is actually a pandas dataframe. Therefore, any pandas API within this function is allowed. Parameters ---------- func : callable A callable that takes a DataFrame as its first argument, and returns a dataframe. *args Positional arguments to pass to func. **kwargs Keyword arguments to pass to func. Returns ------- applied : DataFrame or Series See Also -------- aggregate : Apply aggregate function to the GroupBy object. DataFrame.apply : Apply a function to a DataFrame. Series.apply : Apply a function to a Series. Examples -------- >>> df = ps.DataFrame({'A': 'a a b'.split(), ... 'B': [1, 2, 3], ... 'C': [4, 6, 5]}, columns=['A', 'B', 'C']) >>> g = df.groupby('A') Notice that ``g`` has two groups, ``a`` and ``b``. Calling `apply` in various ways, we can get different grouping results: Below the functions passed to `apply` takes a DataFrame as its argument and returns a DataFrame. `apply` combines the result for each group together into a new DataFrame: >>> def plus_min(x): ... return x + x.min() >>> g.apply(plus_min).sort_index() # doctest: +NORMALIZE_WHITESPACE A B C 0 aa 2 8 1 aa 3 10 2 bb 6 10 >>> g.apply(sum).sort_index() # doctest: +NORMALIZE_WHITESPACE A B C A a aa 3 10 b b 3 5 >>> g.apply(len).sort_index() # doctest: +NORMALIZE_WHITESPACE A a 2 b 1 dtype: int64 You can specify the type hint and prevent schema inference for better performance. >>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> g.apply(pandas_div).sort_index() # doctest: +NORMALIZE_WHITESPACE c0 c1 0 1.0 1.0 1 1.0 1.0 2 1.0 1.0 >>> def pandas_div(x) -> ps.DataFrame[("index", int), [("f1", float), ("f2", float)]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> g.apply(pandas_div).sort_index() # doctest: +NORMALIZE_WHITESPACE f1 f2 index 0 1.0 1.0 1 1.0 1.0 2 1.0 1.0 In case of Series, it works as below. >>> def plus_max(x) -> ps.Series[int]: ... return x + x.max() >>> df.B.groupby(df.A).apply(plus_max).sort_index() # doctest: +SKIP 0 6 1 3 2 4 Name: B, dtype: int64 >>> def plus_min(x): ... return x + x.min() >>> df.B.groupby(df.A).apply(plus_min).sort_index() 0 2 1 3 2 6 Name: B, dtype: int64 You can also return a scalar value as a aggregated value of the group: >>> def plus_length(x) -> int: ... return len(x) >>> df.B.groupby(df.A).apply(plus_length).sort_index() # doctest: +SKIP 0 1 1 2 Name: B, dtype: int64 The extra arguments to the function can be passed as below. >>> def calculation(x, y, z) -> int: ... return len(x) + y * z >>> df.B.groupby(df.A).apply(calculation, 5, z=10).sort_index() # doctest: +SKIP 0 51 1 52 Name: B, dtype: int64 """ifnotcallable(func):raiseTypeError("%s object is not callable"%type(func).__name__)spec=inspect.getfullargspec(func)return_sig=spec.annotations.get("return",None)should_infer_schema=return_sigisNoneshould_retain_index=should_infer_schemais_series_groupby=isinstance(self,SeriesGroupBy)psdf=self._psdfifself._agg_columns_selected:agg_columns=self._agg_columnselse:agg_columns=[psdf._psser_for(label)forlabelinpsdf._internal.column_labelsiflabelnotinself._column_labels_to_exclude]psdf,groupkey_labels,groupkey_names=GroupBy._prepare_group_map_apply(psdf,self._groupkeys,agg_columns)ifis_series_groupby:name=psdf.columns[-1]pandas_apply=_builtin_table.get(func,func)else:f=_builtin_table.get(func,func)defpandas_apply(pdf:pd.DataFrame,*a:Any,**k:Any)->Any:returnf(pdf.drop(groupkey_names,axis=1),*a,**k)should_return_series=Falseifshould_infer_schema:# Here we execute with the first 1000 to get the return type.log_advice("If the type hints is not specified for `grouby.apply`, ""it is expensive to infer the data type internally.")limit=get_option("compute.shortcut_limit")pdf=psdf.head(limit+1)._to_internal_pandas()groupkeys=[pdf[groupkey_name].rename(psser.name)forgroupkey_name,psserinzip(groupkey_names,self._groupkeys)]grouped=pdf.groupby(groupkeys)ifis_series_groupby:pser_or_pdf=grouped[name].apply(pandas_apply,*args,**kwargs)else:pser_or_pdf=grouped.apply(pandas_apply,*args,**kwargs)psser_or_psdf=ps.from_pandas(pser_or_pdf)iflen(pdf)<=limit:ifisinstance(psser_or_psdf,ps.Series)andis_series_groupby:psser_or_psdf=psser_or_psdf.rename(cast(SeriesGroupBy,self)._psser.name)returncast(Union[Series,DataFrame],psser_or_psdf)iflen(grouped)<=1:withwarnings.catch_warnings():warnings.simplefilter("always")warnings.warn("The amount of data for return type inference might not be large enough. ""Consider increasing an option `compute.shortcut_limit`.")ifisinstance(psser_or_psdf,Series):should_return_series=Truepsdf_from_pandas=psser_or_psdf._psdfelse:psdf_from_pandas=cast(DataFrame,psser_or_psdf)index_fields=[field.normalize_spark_type()forfieldinpsdf_from_pandas._internal.index_fields]data_fields=[field.normalize_spark_type()forfieldinpsdf_from_pandas._internal.data_fields]return_schema=StructType([field.struct_fieldforfieldinindex_fields+data_fields])else:return_type=infer_return_type(func)ifnotis_series_groupbyandisinstance(return_type,SeriesType):raiseTypeError("Series as a return type hint at frame groupby is not supported ""currently; however got [%s]. Use DataFrame type hint instead."%return_sig)ifisinstance(return_type,DataFrameType):data_fields=return_type.data_fieldsreturn_schema=return_type.spark_typeindex_fields=return_type.index_fieldsshould_retain_index=len(index_fields)>0psdf_from_pandas=Noneelse:should_return_series=Truedtype=cast(Union[SeriesType,ScalarType],return_type).dtypespark_type=cast(Union[SeriesType,ScalarType],return_type).spark_typeifis_series_groupby:data_fields=[InternalField(dtype=dtype,struct_field=StructField(name=name,dataType=spark_type))]else:data_fields=[InternalField(dtype=dtype,struct_field=StructField(name=SPARK_DEFAULT_SERIES_NAME,dataType=spark_type),)]return_schema=StructType([field.struct_fieldforfieldindata_fields])defpandas_groupby_apply(pdf:pd.DataFrame)->pd.DataFrame:ifis_series_groupby:pdf_or_ser=pdf.groupby(groupkey_names)[name].apply(pandas_apply,*args,**kwargs)else:pdf_or_ser=pdf.groupby(groupkey_names).apply(pandas_apply,*args,**kwargs)ifshould_return_seriesandisinstance(pdf_or_ser,pd.DataFrame):pdf_or_ser=pdf_or_ser.stack()ifnotisinstance(pdf_or_ser,pd.DataFrame):returnpd.DataFrame(pdf_or_ser)else:returnpdf_or_sersdf=GroupBy._spark_group_map_apply(psdf,pandas_groupby_apply,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],return_schema,retain_index=should_retain_index,)ifshould_retain_index:# If schema is inferred, we can restore indexes too.ifpsdf_from_pandasisnotNone:internal=psdf_from_pandas._internal.with_new_sdf(spark_frame=sdf,index_fields=index_fields,data_fields=data_fields)else:index_names:Optional[List[Optional[Tuple[Any,...]]]]=Noneindex_spark_columns=[scol_for(sdf,index_field.struct_field.name)forindex_fieldinindex_fields]ifnotany([SPARK_INDEX_NAME_PATTERN.match(index_field.struct_field.name)forindex_fieldinindex_fields]):index_names=[(index_field.struct_field.name,)forindex_fieldinindex_fields]internal=InternalFrame(spark_frame=sdf,index_names=index_names,index_spark_columns=index_spark_columns,index_fields=index_fields,data_fields=data_fields,)else:# Otherwise, it loses index.internal=InternalFrame(spark_frame=sdf,index_spark_columns=None,data_fields=data_fields)ifshould_return_series:psser=first_series(DataFrame(internal))ifis_series_groupby:psser=psser.rename(cast(SeriesGroupBy,self)._psser.name)returnpsserelse:returnDataFrame(internal)
# TODO: implement 'dropna' parameter
[docs]deffilter(self,func:Callable[[FrameLike],FrameLike])->FrameLike:""" Return a copy of a DataFrame excluding elements from groups that do not satisfy the boolean criterion specified by func. Parameters ---------- f : function Function to apply to each subframe. Should return True or False. dropna : Drop groups that do not pass the filter. True by default; if False, groups that evaluate False are filled with NaNs. Returns ------- filtered : DataFrame or Series Notes ----- Each subframe is endowed the attribute 'name' in case you need to know which group you are working on. Examples -------- >>> df = ps.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar', ... 'foo', 'bar'], ... 'B' : [1, 2, 3, 4, 5, 6], ... 'C' : [2.0, 5., 8., 1., 2., 9.]}, columns=['A', 'B', 'C']) >>> grouped = df.groupby('A') >>> grouped.filter(lambda x: x['B'].mean() > 3.) A B C 1 bar 2 5.0 3 bar 4 1.0 5 bar 6 9.0 >>> df.B.groupby(df.A).filter(lambda x: x.mean() > 3.) 1 2 3 4 5 6 Name: B, dtype: int64 """ifnotcallable(func):raiseTypeError("%s object is not callable"%type(func).__name__)is_series_groupby=isinstance(self,SeriesGroupBy)psdf=self._psdfifself._agg_columns_selected:agg_columns=self._agg_columnselse:agg_columns=[psdf._psser_for(label)forlabelinpsdf._internal.column_labelsiflabelnotinself._column_labels_to_exclude]data_schema=(psdf[agg_columns]._internal.resolved_copy.spark_frame.drop(*HIDDEN_COLUMNS).schema)psdf,groupkey_labels,groupkey_names=GroupBy._prepare_group_map_apply(psdf,self._groupkeys,agg_columns)ifis_series_groupby:defpandas_filter(pdf:pd.DataFrame)->pd.DataFrame:returnpd.DataFrame(pdf.groupby(groupkey_names)[pdf.columns[-1]].filter(func))else:f=_builtin_table.get(func,func)defwrapped_func(pdf:pd.DataFrame)->pd.DataFrame:returnf(pdf.drop(groupkey_names,axis=1))defpandas_filter(pdf:pd.DataFrame)->pd.DataFrame:returnpdf.groupby(groupkey_names).filter(wrapped_func).drop(groupkey_names,axis=1)sdf=GroupBy._spark_group_map_apply(psdf,pandas_filter,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],data_schema,retain_index=True,)psdf=DataFrame(self._psdf[agg_columns]._internal.with_new_sdf(sdf))ifis_series_groupby:returncast(FrameLike,first_series(psdf))else:returncast(FrameLike,psdf)
@staticmethoddef_prepare_group_map_apply(psdf:DataFrame,groupkeys:List[Series],agg_columns:List[Series])->Tuple[DataFrame,List[Label],List[str]]:groupkey_labels:List[Label]=[verify_temp_column_name(psdf,"__groupkey_{}__".format(i))foriinrange(len(groupkeys))]psdf=psdf[[s.rename(label)fors,labelinzip(groupkeys,groupkey_labels)]+agg_columns]groupkey_names=[labeliflen(label)>1elselabel[0]forlabelingroupkey_labels]returnDataFrame(psdf._internal.resolved_copy),groupkey_labels,groupkey_names@staticmethoddef_spark_group_map_apply(psdf:DataFrame,func:Callable[[pd.DataFrame],pd.DataFrame],groupkeys_scols:List[Column],return_schema:StructType,retain_index:bool,)->SparkDataFrame:output_func=GroupBy._make_pandas_df_builder_func(psdf,func,return_schema,retain_index)sdf=psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS)returnsdf.groupby(*groupkeys_scols).applyInPandas(output_func,return_schema)@staticmethoddef_make_pandas_df_builder_func(psdf:DataFrame,func:Callable[[pd.DataFrame],pd.DataFrame],return_schema:StructType,retain_index:bool,)->Callable[[pd.DataFrame],pd.DataFrame]:""" Creates a function that can be used inside the pandas UDF. This function can construct the same pandas DataFrame as if the pandas-on-Spark DataFrame is collected to driver side. The index, column labels, etc. are re-constructed within the function. """frompyspark.sql.utilsimportis_timestamp_ntz_preferredarguments_for_restore_index=psdf._internal.arguments_for_restore_indexprefer_timestamp_ntz=is_timestamp_ntz_preferred()defrename_output(pdf:pd.DataFrame)->pd.DataFrame:pdf=InternalFrame.restore_index(pdf.copy(),**arguments_for_restore_index)pdf=func(pdf)# If schema should be inferred, we don't restore index. pandas seems restoring# the index in some cases.# When Spark output type is specified, without executing it, we don't know# if we should restore the index or not. For instance, see the example in# https://github.com/pyspark.pandas/issues/628.pdf,_,_,_,_=InternalFrame.prepare_pandas_frame(pdf,retain_index=retain_index,prefer_timestamp_ntz=prefer_timestamp_ntz)# Just positionally map the column names to given schema's.pdf.columns=return_schema.namesreturnpdfreturnrename_output
[docs]defrank(self,method:str="average",ascending:bool=True)->FrameLike:""" Provide the rank of values within each group. Parameters ---------- method : {'average', 'min', 'max', 'first', 'dense'}, default 'average' * average: average rank of group * min: lowest rank in group * max: highest rank in group * first: ranks assigned in order they appear in the array * dense: like 'min', but rank always increases by 1 between groups ascending : boolean, default True False for ranks by high (1) to low (N) Returns ------- DataFrame with ranking of values within each group Examples -------- >>> df = ps.DataFrame({ ... 'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df a b 0 1 1 1 1 2 2 1 2 3 2 2 4 2 3 5 2 3 6 3 3 7 3 4 8 3 4 >>> df.groupby("a").rank().sort_index() b 0 1.0 1 2.5 2 2.5 3 1.0 4 2.5 5 2.5 6 1.0 7 2.5 8 2.5 >>> df.b.groupby(df.a).rank(method='max').sort_index() 0 1.0 1 3.0 2 3.0 3 1.0 4 3.0 5 3.0 6 1.0 7 3.0 8 3.0 Name: b, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._rank(method,ascending,part_cols=sg._groupkeys_scols),should_resolve=True,)
# TODO: add axis parameter
[docs]defidxmax(self,skipna:bool=True)->FrameLike:""" Return index of first occurrence of maximum over requested axis in group. NA/null values are excluded. Parameters ---------- skipna : boolean, default True Exclude NA/null values. If an entire row/column is NA, the result will be NA. See Also -------- Series.idxmax DataFrame.idxmax pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 2, 2, 3], ... 'b': [1, 2, 3, 4, 5], ... 'c': [5, 4, 3, 2, 1]}, columns=['a', 'b', 'c']) >>> df.groupby(['a'])['b'].idxmax().sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 1 2 3 3 4 Name: b, dtype: int64 >>> df.groupby(['a']).idxmax().sort_index() # doctest: +NORMALIZE_WHITESPACE b c a 1 1 0 2 3 2 3 4 4 """ifself._psdf._internal.index_level!=1:raiseValueError("idxmax only support one-level index now")groupkey_names=["__groupkey_{}__".format(i)foriinrange(len(self._groupkeys))]sdf=self._psdf._internal.spark_framefors,nameinzip(self._groupkeys,groupkey_names):sdf=sdf.withColumn(name,s.spark.column)index=self._psdf._internal.index_spark_column_names[0]stat_exprs=[]forpsser,scolinzip(self._agg_columns,self._agg_columns_scols):name=psser._internal.data_spark_column_names[0]ifskipna:order_column=scol.desc_nulls_last()else:order_column=scol.desc_nulls_first()window=Window.partitionBy(*groupkey_names).orderBy(order_column,NATURAL_ORDER_COLUMN_NAME)sdf=sdf.withColumn(name,F.when(F.row_number().over(window)==1,scol_for(sdf,index)).otherwise(None))stat_exprs.append(F.max(scol_for(sdf,name)).alias(name))sdf=sdf.groupby(*groupkey_names).agg(*stat_exprs)internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsserinself._groupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_names)],column_labels=[psser._column_labelforpsserinself._agg_columns],data_spark_columns=[scol_for(sdf,psser._internal.data_spark_column_names[0])forpsserinself._agg_columns],)returnself._cleanup_and_return(DataFrame(internal))
# TODO: add axis parameter
[docs]defidxmin(self,skipna:bool=True)->FrameLike:""" Return index of first occurrence of minimum over requested axis in group. NA/null values are excluded. Parameters ---------- skipna : boolean, default True Exclude NA/null values. If an entire row/column is NA, the result will be NA. See Also -------- Series.idxmin DataFrame.idxmin pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 2, 2, 3], ... 'b': [1, 2, 3, 4, 5], ... 'c': [5, 4, 3, 2, 1]}, columns=['a', 'b', 'c']) >>> df.groupby(['a'])['b'].idxmin().sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 0 2 2 3 4 Name: b, dtype: int64 >>> df.groupby(['a']).idxmin().sort_index() # doctest: +NORMALIZE_WHITESPACE b c a 1 0 1 2 2 3 3 4 4 """ifself._psdf._internal.index_level!=1:raiseValueError("idxmin only support one-level index now")groupkey_names=["__groupkey_{}__".format(i)foriinrange(len(self._groupkeys))]sdf=self._psdf._internal.spark_framefors,nameinzip(self._groupkeys,groupkey_names):sdf=sdf.withColumn(name,s.spark.column)index=self._psdf._internal.index_spark_column_names[0]stat_exprs=[]forpsser,scolinzip(self._agg_columns,self._agg_columns_scols):name=psser._internal.data_spark_column_names[0]ifskipna:order_column=scol.asc_nulls_last()else:order_column=scol.asc_nulls_first()window=Window.partitionBy(*groupkey_names).orderBy(order_column,NATURAL_ORDER_COLUMN_NAME)sdf=sdf.withColumn(name,F.when(F.row_number().over(window)==1,scol_for(sdf,index)).otherwise(None))stat_exprs.append(F.max(scol_for(sdf,name)).alias(name))sdf=sdf.groupby(*groupkey_names).agg(*stat_exprs)internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsserinself._groupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_names)],column_labels=[psser._column_labelforpsserinself._agg_columns],data_spark_columns=[scol_for(sdf,psser._internal.data_spark_column_names[0])forpsserinself._agg_columns],)returnself._cleanup_and_return(DataFrame(internal))
[docs]deffillna(self,value:Optional[Any]=None,method:Optional[str]=None,axis:Optional[Axis]=None,inplace:bool=False,limit:Optional[int]=None,)->FrameLike:"""Fill NA/NaN values in group. Parameters ---------- value : scalar, dict, Series Value to use to fill holes. alternately a dict/Series of values specifying which value to use for each column. DataFrame is not supported. method : {'backfill', 'bfill', 'pad', 'ffill', None}, default None Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. If method is not specified, this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None Returns ------- DataFrame DataFrame with NA entries filled. Examples -------- >>> df = ps.DataFrame({ ... 'A': [1, 1, 2, 2], ... 'B': [2, 4, None, 3], ... 'C': [None, None, None, 1], ... 'D': [0, 1, 5, 4] ... }, ... columns=['A', 'B', 'C', 'D']) >>> df A B C D 0 1 2.0 NaN 0 1 1 4.0 NaN 1 2 2 NaN NaN 5 3 2 3.0 1.0 4 We can also propagate non-null values forward or backward in group. >>> df.groupby(['A'])['B'].fillna(method='ffill').sort_index() 0 2.0 1 4.0 2 NaN 3 3.0 Name: B, dtype: float64 >>> df.groupby(['A']).fillna(method='bfill').sort_index() B C D 0 2.0 NaN 0 1 4.0 NaN 1 2 3.0 1.0 5 3 3.0 1.0 4 """returnself._apply_series_op(lambdasg:sg._psser._fillna(value=value,method=method,axis=axis,limit=limit,part_cols=sg._groupkeys_scols),should_resolve=(methodisnotNone),)
[docs]defbfill(self,limit:Optional[int]=None)->FrameLike:""" Synonym for `DataFrame.fillna()` with ``method=`bfill```. Parameters ---------- axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. If method is not specified, this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None Returns ------- DataFrame DataFrame with NA entries filled. Examples -------- >>> df = ps.DataFrame({ ... 'A': [1, 1, 2, 2], ... 'B': [2, 4, None, 3], ... 'C': [None, None, None, 1], ... 'D': [0, 1, 5, 4] ... }, ... columns=['A', 'B', 'C', 'D']) >>> df A B C D 0 1 2.0 NaN 0 1 1 4.0 NaN 1 2 2 NaN NaN 5 3 2 3.0 1.0 4 Propagate non-null values backward. >>> df.groupby(['A']).bfill().sort_index() B C D 0 2.0 NaN 0 1 4.0 NaN 1 2 3.0 1.0 5 3 3.0 1.0 4 """returnself.fillna(method="bfill",limit=limit)
backfill=bfill
[docs]defffill(self,limit:Optional[int]=None)->FrameLike:""" Synonym for `DataFrame.fillna()` with ``method=`ffill```. Parameters ---------- axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. If method is not specified, this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None Returns ------- DataFrame DataFrame with NA entries filled. Examples -------- >>> df = ps.DataFrame({ ... 'A': [1, 1, 2, 2], ... 'B': [2, 4, None, 3], ... 'C': [None, None, None, 1], ... 'D': [0, 1, 5, 4] ... }, ... columns=['A', 'B', 'C', 'D']) >>> df A B C D 0 1 2.0 NaN 0 1 1 4.0 NaN 1 2 2 NaN NaN 5 3 2 3.0 1.0 4 Propagate non-null values forward. >>> df.groupby(['A']).ffill().sort_index() B C D 0 2.0 NaN 0 1 4.0 NaN 1 2 NaN NaN 5 3 3.0 1.0 4 """returnself.fillna(method="ffill",limit=limit)
pad=ffilldef_limit(self,n:int,asc:bool)->FrameLike:""" Private function for tail and head. """psdf=self._psdfifself._agg_columns_selected:agg_columns=self._agg_columnselse:agg_columns=[psdf._psser_for(label)forlabelinpsdf._internal.column_labelsiflabelnotinself._column_labels_to_exclude]psdf,groupkey_labels,_=GroupBy._prepare_group_map_apply(psdf,self._groupkeys,agg_columns,)groupkey_scols=[psdf._internal.spark_column_for(label)forlabelingroupkey_labels]sdf=psdf._internal.spark_frametmp_col=verify_temp_column_name(sdf,"__row_number__")# This part is handled differently depending on whether it is a tail or a head.window=(Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())ifascelseWindow.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()))sdf=(sdf.withColumn(tmp_col,F.row_number().over(window)).filter(F.col(tmp_col)<=n).drop(tmp_col))internal=psdf._internal.with_new_sdf(sdf)returnself._cleanup_and_return(DataFrame(internal).drop(groupkey_labels,axis=1))
[docs]deftail(self,n:int=5)->FrameLike:""" Return last n rows of each group. Similar to `.apply(lambda x: x.tail(n))`, but it returns a subset of rows from the original DataFrame with original index and order preserved (`as_index` flag is ignored). Does not work for negative values of n. Returns ------- DataFrame or Series Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [2, 3, 1, 4, 6, 9, 8, 10, 7, 5], ... 'c': [3, 5, 2, 5, 1, 2, 6, 4, 3, 6]}, ... columns=['a', 'b', 'c'], ... index=[7, 2, 3, 1, 3, 4, 9, 10, 5, 6]) >>> df a b c 7 1 2 3 2 1 3 5 3 1 1 2 1 1 4 5 3 2 6 1 4 2 9 2 9 2 8 6 10 3 10 4 5 3 7 3 6 3 5 6 >>> df.groupby('a').tail(2).sort_index() a b c 1 1 4 5 3 1 1 2 4 2 9 2 5 3 7 3 6 3 5 6 9 2 8 6 >>> df.groupby('a')['b'].tail(2).sort_index() 1 4 3 1 4 9 5 7 6 5 9 8 Name: b, dtype: int64 """returnself._limit(n,asc=False)
[docs]defshift(self,periods:int=1,fill_value:Optional[Any]=None)->FrameLike:""" Shift each group by periods observations. Parameters ---------- periods : integer, default 1 number of periods to shift fill_value : optional Returns ------- Series or DataFrame Object shifted within each group. Examples -------- >>> df = ps.DataFrame({ ... 'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df a b 0 1 1 1 1 2 2 1 2 3 2 2 4 2 3 5 2 3 6 3 3 7 3 4 8 3 4 >>> df.groupby('a').shift().sort_index() # doctest: +SKIP b 0 NaN 1 1.0 2 2.0 3 NaN 4 2.0 5 3.0 6 NaN 7 3.0 8 4.0 >>> df.groupby('a').shift(periods=-1, fill_value=0).sort_index() # doctest: +SKIP b 0 2 1 2 2 0 3 3 4 3 5 0 6 4 7 4 8 0 """returnself._apply_series_op(lambdasg:sg._psser._shift(periods,fill_value,part_cols=sg._groupkeys_scols),should_resolve=True,)
[docs]deftransform(self,func:Callable[...,pd.Series],*args:Any,**kwargs:Any)->FrameLike:""" Apply function column-by-column to the GroupBy object. The function passed to `transform` must take a Series as its first argument and return a Series. The given function is executed for each series in each grouped data. While `transform` is a very flexible method, its downside is that using it can be quite a bit slower than using more specific methods like `agg` or `transform`. pandas-on-Spark offers a wide range of method that will be much faster than using `transform` for their specific purposes, so try to use them before reaching for `transform`. .. note:: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in ``func``, for instance, as below: >>> def convert_to_string(x) -> ps.Series[str]: ... return x.apply("a string {}".format) When the given function has the return type annotated, the original index of the GroupBy object will be lost and a default index will be attached to the result. Please be careful about configuring the default index. See also `Default Index Type <https://koalas.readthedocs.io/en/latest/user_guide/options.html#default-index-type>`_. .. note:: the series within ``func`` is actually a pandas series. Therefore, any pandas API within this function is allowed. Parameters ---------- func : callable A callable that takes a Series as its first argument, and returns a Series. *args Positional arguments to pass to func. **kwargs Keyword arguments to pass to func. Returns ------- applied : DataFrame See Also -------- aggregate : Apply aggregate function to the GroupBy object. Series.apply : Apply a function to a Series. Examples -------- >>> df = ps.DataFrame({'A': [0, 0, 1], ... 'B': [1, 2, 3], ... 'C': [4, 6, 5]}, columns=['A', 'B', 'C']) >>> g = df.groupby('A') Notice that ``g`` has two groups, ``0`` and ``1``. Calling `transform` in various ways, we can get different grouping results: Below the functions passed to `transform` takes a Series as its argument and returns a Series. `transform` applies the function on each series in each grouped data, and combine them into a new DataFrame: >>> def convert_to_string(x) -> ps.Series[str]: ... return x.apply("a string {}".format) >>> g.transform(convert_to_string) # doctest: +NORMALIZE_WHITESPACE B C 0 a string 1 a string 4 1 a string 2 a string 6 2 a string 3 a string 5 >>> def plus_max(x) -> ps.Series[int]: ... return x + x.max() >>> g.transform(plus_max) # doctest: +NORMALIZE_WHITESPACE B C 0 3 10 1 4 12 2 6 10 You can omit the type hint and let pandas-on-Spark infer its type. >>> def plus_min(x): ... return x + x.min() >>> g.transform(plus_min) # doctest: +NORMALIZE_WHITESPACE B C 0 2 8 1 3 10 2 6 10 In case of Series, it works as below. >>> df.B.groupby(df.A).transform(plus_max) 0 3 1 4 2 6 Name: B, dtype: int64 >>> (df * -1).B.groupby(df.A).transform(abs) 0 1 1 2 2 3 Name: B, dtype: int64 You can also specify extra arguments to pass to the function. >>> def calculation(x, y, z) -> ps.Series[int]: ... return x + x.min() + y + z >>> g.transform(calculation, 5, z=20) # doctest: +NORMALIZE_WHITESPACE B C 0 27 33 1 28 35 2 31 35 """ifnotcallable(func):raiseTypeError("%s object is not callable"%type(func).__name__)spec=inspect.getfullargspec(func)return_sig=spec.annotations.get("return",None)psdf,groupkey_labels,groupkey_names=GroupBy._prepare_group_map_apply(self._psdf,self._groupkeys,agg_columns=self._agg_columns)defpandas_transform(pdf:pd.DataFrame)->pd.DataFrame:returnpdf.groupby(groupkey_names).transform(func,*args,**kwargs)should_infer_schema=return_sigisNoneifshould_infer_schema:# Here we execute with the first 1000 to get the return type.# If the records were less than 1000, it uses pandas API directly for a shortcut.log_advice("If the type hints is not specified for `grouby.transform`, ""it is expensive to infer the data type internally.")limit=get_option("compute.shortcut_limit")pdf=psdf.head(limit+1)._to_internal_pandas()pdf=pdf.groupby(groupkey_names).transform(func,*args,**kwargs)psdf_from_pandas:DataFrame=DataFrame(pdf)return_schema=force_decimal_precision_scale(as_nullable_spark_type(psdf_from_pandas._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema))iflen(pdf)<=limit:returnself._cleanup_and_return(psdf_from_pandas)sdf=GroupBy._spark_group_map_apply(psdf,pandas_transform,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],return_schema,retain_index=True,)# If schema is inferred, we can restore indexes too.internal=psdf_from_pandas._internal.with_new_sdf(sdf,index_fields=[field.copy(nullable=True)forfieldinpsdf_from_pandas._internal.index_fields],data_fields=[field.copy(nullable=True)forfieldinpsdf_from_pandas._internal.data_fields],)else:return_type=infer_return_type(func)ifnotisinstance(return_type,SeriesType):raiseTypeError("Expected the return type of this function to be of Series type, ""but found type {}".format(return_type))dtype=return_type.dtypespark_type=return_type.spark_typedata_fields=[InternalField(dtype=dtype,struct_field=StructField(name=c,dataType=spark_type))forcinpsdf._internal.data_spark_column_namesifcnotingroupkey_names]return_schema=StructType([field.struct_fieldforfieldindata_fields])sdf=GroupBy._spark_group_map_apply(psdf,pandas_transform,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],return_schema,retain_index=False,)# Otherwise, it loses index.internal=InternalFrame(spark_frame=sdf,index_spark_columns=None,data_fields=data_fields)returnself._cleanup_and_return(DataFrame(internal))
[docs]defnunique(self,dropna:bool=True)->FrameLike:""" Return DataFrame with number of distinct observations per group for each column. Parameters ---------- dropna : boolean, default True Don’t include NaN in the counts. Returns ------- nunique : DataFrame or Series Examples -------- >>> df = ps.DataFrame({'id': ['spam', 'egg', 'egg', 'spam', ... 'ham', 'ham'], ... 'value1': [1, 5, 5, 2, 5, 5], ... 'value2': list('abbaxy')}, columns=['id', 'value1', 'value2']) >>> df id value1 value2 0 spam 1 a 1 egg 5 b 2 egg 5 b 3 spam 2 a 4 ham 5 x 5 ham 5 y >>> df.groupby('id').nunique().sort_index() # doctest: +SKIP value1 value2 id egg 1 1 ham 1 2 spam 2 1 >>> df.groupby('id')['value1'].nunique().sort_index() # doctest: +NORMALIZE_WHITESPACE id egg 1 ham 1 spam 2 Name: value1, dtype: int64 """ifdropna:defstat_function(col:Column)->Column:returnF.countDistinct(col)else:defstat_function(col:Column)->Column:returnF.countDistinct(col)+F.when(F.count(F.when(col.isNull(),1).otherwise(None))>=1,1).otherwise(0)returnself._reduce_for_stat_function(stat_function,only_numeric=False)
defrolling(self,window:int,min_periods:Optional[int]=None)->"RollingGroupby[FrameLike]":""" Return an rolling grouper, providing rolling functionality per group. .. note:: 'min_periods' in pandas-on-Spark works as a fixed window size unlike pandas. Unlike pandas, NA is also counted as the period. This might be changed in the near future. Parameters ---------- window : int, or offset Size of the moving window. This is the number of observations used for calculating the statistic. Each window will be a fixed size. min_periods : int, default 1 Minimum number of observations in window required to have a value (otherwise result is NA). See Also -------- Series.groupby DataFrame.groupby """frompyspark.pandas.windowimportRollingGroupbyreturnRollingGroupby(self,window,min_periods=min_periods)defexpanding(self,min_periods:int=1)->"ExpandingGroupby[FrameLike]":""" Return an expanding grouper, providing expanding functionality per group. .. note:: 'min_periods' in pandas-on-Spark works as a fixed window size unlike pandas. Unlike pandas, NA is also counted as the period. This might be changed in the near future. Parameters ---------- min_periods : int, default 1 Minimum number of observations in window required to have a value (otherwise result is NA). See Also -------- Series.groupby DataFrame.groupby """frompyspark.pandas.windowimportExpandingGroupbyreturnExpandingGroupby(self,min_periods=min_periods)
[docs]defget_group(self,name:Union[Name,List[Name]])->FrameLike:""" Construct DataFrame from group with provided name. Parameters ---------- name : object The name of the group to get as a DataFrame. Returns ------- group : same type as obj Examples -------- >>> psdf = ps.DataFrame([('falcon', 'bird', 389.0), ... ('parrot', 'bird', 24.0), ... ('lion', 'mammal', 80.5), ... ('monkey', 'mammal', np.nan)], ... columns=['name', 'class', 'max_speed'], ... index=[0, 2, 3, 1]) >>> psdf name class max_speed 0 falcon bird 389.0 2 parrot bird 24.0 3 lion mammal 80.5 1 monkey mammal NaN >>> psdf.groupby("class").get_group("bird").sort_index() name class max_speed 0 falcon bird 389.0 2 parrot bird 24.0 >>> psdf.groupby("class").get_group("mammal").sort_index() name class max_speed 1 monkey mammal NaN 3 lion mammal 80.5 """groupkeys=self._groupkeysifnotis_hashable(name):raiseTypeError("unhashable type: '{}'".format(type(name).__name__))eliflen(groupkeys)>1:ifnotisinstance(name,tuple):raiseValueError("must supply a tuple to get_group with multiple grouping keys")iflen(groupkeys)!=len(name):raiseValueError("must supply a same-length tuple to get_group with multiple grouping keys")ifnotis_list_like(name):name=[name]cond=SF.lit(True)forgroupkey,iteminzip(groupkeys,name):scol=groupkey.spark.columncond=cond&(scol==item)ifself._agg_columns_selected:internal=self._psdf._internalspark_frame=internal.spark_frame.select(internal.index_spark_columns+self._agg_columns_scols).filter(cond)internal=internal.copy(spark_frame=spark_frame,index_spark_columns=[scol_for(spark_frame,col)forcolininternal.index_spark_column_names],column_labels=[s._column_labelforsinself._agg_columns],data_spark_columns=[scol_for(spark_frame,s._internal.data_spark_column_names[0])forsinself._agg_columns],data_fields=[s._internal.data_fields[0]forsinself._agg_columns],)else:internal=self._psdf._internal.with_filter(cond)ifinternal.spark_frame.head()isNone:raiseKeyError(name)returnself._cleanup_and_return(DataFrame(internal))
[docs]defmedian(self,numeric_only:bool=True,accuracy:int=10000)->FrameLike:""" Compute median of groups, excluding missing values. For multiple groupings, the result index will be a MultiIndex .. note:: Unlike pandas', the median in pandas-on-Spark is an approximated median based upon approximate percentile computation because computing median across a large dataset is extremely expensive. Parameters ---------- numeric_only : bool, default True Include only float, int, boolean columns. False is not supported. This parameter is mainly for pandas compatibility. Returns ------- Series or DataFrame Median of values within each group. Examples -------- >>> psdf = ps.DataFrame({'a': [1., 1., 1., 1., 2., 2., 2., 3., 3., 3.], ... 'b': [2., 3., 1., 4., 6., 9., 8., 10., 7., 5.], ... 'c': [3., 5., 2., 5., 1., 2., 6., 4., 3., 6.]}, ... columns=['a', 'b', 'c'], ... index=[7, 2, 4, 1, 3, 4, 9, 10, 5, 6]) >>> psdf a b c 7 1.0 2.0 3.0 2 1.0 3.0 5.0 4 1.0 1.0 2.0 1 1.0 4.0 5.0 3 2.0 6.0 1.0 4 2.0 9.0 2.0 9 2.0 8.0 6.0 10 3.0 10.0 4.0 5 3.0 7.0 3.0 6 3.0 5.0 6.0 DataFrameGroupBy >>> psdf.groupby('a').median().sort_index() # doctest: +NORMALIZE_WHITESPACE b c a 1.0 2.0 3.0 2.0 8.0 2.0 3.0 7.0 4.0 SeriesGroupBy >>> psdf.groupby('a')['b'].median().sort_index() a 1.0 2.0 2.0 8.0 3.0 7.0 Name: b, dtype: float64 """ifnotisinstance(accuracy,int):raiseTypeError("accuracy must be an integer; however, got [%s]"%type(accuracy).__name__)defstat_function(col:Column)->Column:returnF.percentile_approx(col,0.5,accuracy)returnself._reduce_for_stat_function(stat_function,only_numeric=numeric_only)
def_reduce_for_stat_function(self,sfun:Callable[[Column],Column],only_numeric:bool)->FrameLike:groupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]groupkey_scols=[s.alias(name)fors,nameinzip(self._groupkeys_scols,groupkey_names)]agg_columns=[psserforpsserinself._agg_columnsifisinstance(psser.spark.data_type,NumericType)ornotonly_numeric]sdf=self._psdf._internal.spark_frame.select(*groupkey_scols,*[psser.spark.columnforpsserinagg_columns])internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsserinself._groupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_names)],data_spark_columns=[scol_for(sdf,psser._internal.data_spark_column_names[0])forpsserinagg_columns],column_labels=[psser._column_labelforpsserinagg_columns],data_fields=[psser._internal.data_fields[0]forpsserinagg_columns],column_label_names=self._psdf._internal.column_label_names,)psdf:DataFrame=DataFrame(internal)iflen(psdf._internal.column_labels)>0:stat_exprs=[]forlabelinpsdf._internal.column_labels:psser=psdf._psser_for(label)stat_exprs.append(sfun(psser._dtype_op.nan_to_null(psser).spark.column).alias(psser._internal.data_spark_column_names[0]))sdf=sdf.groupby(*groupkey_names).agg(*stat_exprs)else:sdf=sdf.select(*groupkey_names).distinct()internal=internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],data_spark_columns=[scol_for(sdf,col)forcolininternal.data_spark_column_names],data_fields=None,)psdf=DataFrame(internal)ifself._dropna:psdf=DataFrame(psdf._internal.with_new_sdf(psdf._internal.spark_frame.dropna(subset=psdf._internal.index_spark_column_names)))ifnotself._as_index:should_drop_index=set(ifori,gkeyinenumerate(self._groupkeys)ifgkey._psdfisnotself._psdf)iflen(should_drop_index)>0:psdf=psdf.reset_index(level=should_drop_index,drop=True)iflen(should_drop_index)<len(self._groupkeys):psdf=psdf.reset_index()returnself._cleanup_and_return(psdf)@staticmethoddef_resolve_grouping_from_diff_dataframes(psdf:DataFrame,by:List[Union[Series,Label]])->Tuple[DataFrame,List[Series],Set[Label]]:column_labels_level=psdf._internal.column_labels_levelcolumn_labels=[]additional_pssers=[]additional_column_labels=[]tmp_column_labels=set()fori,col_or_sinenumerate(by):ifisinstance(col_or_s,Series):ifcol_or_s._psdfispsdf:column_labels.append(col_or_s._column_label)elifsame_anchor(col_or_s,psdf):temp_label=verify_temp_column_name(psdf,"__tmp_groupkey_{}__".format(i))column_labels.append(temp_label)additional_pssers.append(col_or_s.rename(temp_label))additional_column_labels.append(temp_label)else:temp_label=verify_temp_column_name(psdf,tuple(([""]*(column_labels_level-1))+["__tmp_groupkey_{}__".format(i)]),)column_labels.append(temp_label)tmp_column_labels.add(temp_label)elifisinstance(col_or_s,tuple):psser=psdf[col_or_s]ifnotisinstance(psser,Series):raiseValueError(name_like_string(col_or_s))column_labels.append(col_or_s)else:raiseValueError(col_or_s)psdf=DataFrame(psdf._internal.with_new_columns([psdf._psser_for(label)forlabelinpsdf._internal.column_labels]+additional_pssers))defassign_columns(psdf:DataFrame,this_column_labels:List[Label],that_column_labels:List[Label])->Iterator[Tuple[Series,Label]]:raiseNotImplementedError("Duplicated labels with groupby() and ""'compute.ops_on_diff_frames' option are not supported currently ""Please use unique labels in series and frames.")forcol_or_s,labelinzip(by,column_labels):iflabelintmp_column_labels:psser=col_or_spsdf=align_diff_frames(assign_columns,psdf,psser.rename(label),fillna=False,how="inner",preserve_order_column=True,)tmp_column_labels|=set(additional_column_labels)new_by_series=[]forcol_or_s,labelinzip(by,column_labels):iflabelintmp_column_labels:psser=col_or_snew_by_series.append(psdf._psser_for(label).rename(psser.name))else:new_by_series.append(psdf._psser_for(label))returnpsdf,new_by_series,tmp_column_labels@staticmethoddef_resolve_grouping(psdf:DataFrame,by:List[Union[Series,Label]])->List[Series]:new_by_series=[]forcol_or_sinby:ifisinstance(col_or_s,Series):new_by_series.append(col_or_s)elifisinstance(col_or_s,tuple):psser=psdf[col_or_s]ifnotisinstance(psser,Series):raiseValueError(name_like_string(col_or_s))new_by_series.append(psser)else:raiseValueError(col_or_s)returnnew_by_seriesclassDataFrameGroupBy(GroupBy[DataFrame]):@staticmethoddef_build(psdf:DataFrame,by:List[Union[Series,Label]],as_index:bool,dropna:bool)->"DataFrameGroupBy":ifany(isinstance(col_or_s,Series)andnotsame_anchor(psdf,col_or_s)forcol_or_sinby):(psdf,new_by_series,column_labels_to_exclude,)=GroupBy._resolve_grouping_from_diff_dataframes(psdf,by)else:new_by_series=GroupBy._resolve_grouping(psdf,by)column_labels_to_exclude=set()returnDataFrameGroupBy(psdf,new_by_series,as_index=as_index,dropna=dropna,column_labels_to_exclude=column_labels_to_exclude,)def__init__(self,psdf:DataFrame,by:List[Series],as_index:bool,dropna:bool,column_labels_to_exclude:Set[Label],agg_columns:List[Label]=None,):agg_columns_selected=agg_columnsisnotNoneifagg_columns_selected:forlabelinagg_columns:iflabelincolumn_labels_to_exclude:raiseKeyError(label)else:agg_columns=[labelforlabelinpsdf._internal.column_labelsifnotany(label==key._column_labelandkey._psdfispsdfforkeyinby)andlabelnotincolumn_labels_to_exclude]super().__init__(psdf=psdf,groupkeys=by,as_index=as_index,dropna=dropna,column_labels_to_exclude=column_labels_to_exclude,agg_columns_selected=agg_columns_selected,agg_columns=[psdf[label]forlabelinagg_columns],)def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeDataFrameGroupBy,item):property_or_func=getattr(MissingPandasLikeDataFrameGroupBy,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)returnself.__getitem__(item)def__getitem__(self,item:Any)->GroupBy:ifself._as_indexandis_name_like_value(item):returnSeriesGroupBy(self._psdf._psser_for(itemifis_name_like_tuple(item)else(item,)),self._groupkeys,dropna=self._dropna,)else:ifis_name_like_tuple(item):item=[item]elifis_name_like_value(item):item=[(item,)]else:item=[iifis_name_like_tuple(i)else(i,)foriinitem]ifnotself._as_index:groupkey_names=set(key._column_labelforkeyinself._groupkeys)fornameinitem:ifnameingroupkey_names:raiseValueError("cannot insert {}, already exists".format(name_like_string(name)))returnDataFrameGroupBy(self._psdf,self._groupkeys,as_index=self._as_index,dropna=self._dropna,column_labels_to_exclude=self._column_labels_to_exclude,agg_columns=item,)def_apply_series_op(self,op:Callable[["SeriesGroupBy"],Series],should_resolve:bool=False,numeric_only:bool=False,)->DataFrame:applied=[]forcolumninself._agg_columns:applied.append(op(column.groupby(self._groupkeys)))ifnumeric_only:applied=[colforcolinappliedifisinstance(col.spark.data_type,NumericType)]ifnotapplied:raiseDataError("No numeric types to aggregate")internal=self._psdf._internal.with_new_columns(applied,keep_order=False)ifshould_resolve:internal=internal.resolved_copyreturnDataFrame(internal)def_cleanup_and_return(self,psdf:DataFrame)->DataFrame:returnpsdf# TODO: Implement 'percentiles', 'include', and 'exclude' arguments.# TODO: Add ``DataFrame.select_dtypes`` to See Also when 'include'# and 'exclude' arguments are implemented.
[docs]defdescribe(self)->DataFrame:""" Generate descriptive statistics that summarize the central tendency, dispersion and shape of a dataset's distribution, excluding ``NaN`` values. Analyzes both numeric and object series, as well as ``DataFrame`` column sets of mixed data types. The output will vary depending on what is provided. Refer to the notes below for more detail. .. note:: Unlike pandas, the percentiles in pandas-on-Spark are based upon approximate percentile computation because computing percentiles across a large dataset is extremely expensive. Returns ------- DataFrame Summary statistics of the DataFrame provided. See Also -------- DataFrame.count DataFrame.max DataFrame.min DataFrame.mean DataFrame.std Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]}) >>> df a b c 0 1 4 7 1 1 5 8 2 3 6 9 Describing a ``DataFrame``. By default only numeric fields are returned. >>> described = df.groupby('a').describe() >>> described.sort_index() # doctest: +NORMALIZE_WHITESPACE b c count mean std min 25% 50% 75% max count mean std min 25% 50% 75% max a 1 2.0 4.5 0.707107 4.0 4.0 4.0 5.0 5.0 2.0 7.5 0.707107 7.0 7.0 7.0 8.0 8.0 3 1.0 6.0 NaN 6.0 6.0 6.0 6.0 6.0 1.0 9.0 NaN 9.0 9.0 9.0 9.0 9.0 """forcolinself._agg_columns:ifisinstance(col.spark.data_type,StringType):raiseNotImplementedError("DataFrameGroupBy.describe() doesn't support for string type for now")psdf=self.aggregate(["count","mean","std","min","quartiles","max"])sdf=psdf._internal.spark_frameagg_column_labels=[col._column_labelforcolinself._agg_columns]formatted_percentiles=["25%","50%","75%"]# Split "quartiles" columns into first, second, and third quartiles.forlabelinagg_column_labels:quartiles_col=name_like_string(tuple(list(label)+["quartiles"]))fori,percentileinenumerate(formatted_percentiles):sdf=sdf.withColumn(name_like_string(tuple(list(label)+[percentile])),scol_for(sdf,quartiles_col)[i],)sdf=sdf.drop(quartiles_col)# Reorder columns lexicographically by agg column followed by stats.stats=["count","mean","std","min"]+formatted_percentiles+["max"]column_labels=[tuple(list(label)+[s])forlabel,sinproduct(agg_column_labels,stats)]data_columns=map(name_like_string,column_labels)# Reindex the DataFrame to reflect initial grouping and agg columns.internal=psdf._internal.copy(spark_frame=sdf,column_labels=column_labels,data_spark_columns=[scol_for(sdf,col)forcolindata_columns],data_fields=None,)# Cast columns to ``"float64"`` to match `pandas.DataFrame.groupby`.returnDataFrame(internal).astype("float64")
classSeriesGroupBy(GroupBy[Series]):@staticmethoddef_build(psser:Series,by:List[Union[Series,Label]],as_index:bool,dropna:bool)->"SeriesGroupBy":ifany(isinstance(col_or_s,Series)andnotsame_anchor(psser,col_or_s)forcol_or_sinby):psdf,new_by_series,_=GroupBy._resolve_grouping_from_diff_dataframes(psser.to_frame(),by)returnSeriesGroupBy(first_series(psdf).rename(psser.name),new_by_series,as_index=as_index,dropna=dropna,)else:new_by_series=GroupBy._resolve_grouping(psser._psdf,by)returnSeriesGroupBy(psser,new_by_series,as_index=as_index,dropna=dropna)def__init__(self,psser:Series,by:List[Series],as_index:bool=True,dropna:bool=True):ifnotas_index:raiseTypeError("as_index=False only valid with DataFrame")super().__init__(psdf=psser._psdf,groupkeys=by,as_index=True,dropna=dropna,column_labels_to_exclude=set(),agg_columns_selected=True,agg_columns=[psser],)self._psser=psserdef__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeSeriesGroupBy,item):property_or_func=getattr(MissingPandasLikeSeriesGroupBy,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)def_apply_series_op(self,op:Callable[["SeriesGroupBy"],Series],should_resolve:bool=False,numeric_only:bool=False,)->Series:ifnumeric_onlyandnotisinstance(self._agg_columns[0].spark.data_type,NumericType):raiseDataError("No numeric types to aggregate")psser=op(self)ifshould_resolve:internal=psser._internal.resolved_copyreturnfirst_series(DataFrame(internal))else:returnpsser.copy()def_cleanup_and_return(self,psdf:DataFrame)->Series:returnfirst_series(psdf).rename().rename(self._psser.name)defagg(self,*args:Any,**kwargs:Any)->None:returnMissingPandasLikeSeriesGroupBy.agg(self,*args,**kwargs)defaggregate(self,*args:Any,**kwargs:Any)->None:returnMissingPandasLikeSeriesGroupBy.aggregate(self,*args,**kwargs)defsize(self)->Series:returnsuper().size().rename(self._psser.name)size.__doc__=GroupBy.size.__doc__# TODO: add keep parameter
[docs]defnsmallest(self,n:int=5)->Series:""" Return the smallest `n` elements. Parameters ---------- n : int Number of items to retrieve. See Also -------- pyspark.pandas.Series.nsmallest pyspark.pandas.DataFrame.nsmallest Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df.groupby(['a'])['b'].nsmallest(1).sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 0 1 2 3 2 3 6 3 Name: b, dtype: int64 """ifself._psser._internal.index_level>1:raiseValueError("nsmallest do not support multi-index now")groupkey_col_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]sdf=self._psser._internal.spark_frame.select(*[scol.alias(name)forscol,nameinzip(self._groupkeys_scols,groupkey_col_names)],*[scol.alias(SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,scolinenumerate(self._psser._internal.index_spark_columns)],self._psser.spark.column,NATURAL_ORDER_COLUMN_NAME,)window=Window.partitionBy(*groupkey_col_names).orderBy(scol_for(sdf,self._psser._internal.data_spark_column_names[0]).asc(),NATURAL_ORDER_COLUMN_NAME,)temp_rank_column=verify_temp_column_name(sdf,"__rank__")sdf=(sdf.withColumn(temp_rank_column,F.row_number().over(window)).filter(F.col(temp_rank_column)<=n).drop(temp_rank_column)).drop(NATURAL_ORDER_COLUMN_NAME)internal=InternalFrame(spark_frame=sdf,index_spark_columns=([scol_for(sdf,col)forcolingroupkey_col_names]+[scol_for(sdf,SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))foriinrange(self._psdf._internal.index_level)]),index_names=([psser._column_labelforpsserinself._groupkeys]+self._psdf._internal.index_names),index_fields=([psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_col_names)]+[field.copy(name=SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,fieldinenumerate(self._psdf._internal.index_fields)]),column_labels=[self._psser._column_label],data_spark_columns=[scol_for(sdf,self._psser._internal.data_spark_column_names[0])],data_fields=[self._psser._internal.data_fields[0]],)returnfirst_series(DataFrame(internal))
# TODO: add keep parameter
[docs]defnlargest(self,n:int=5)->Series:""" Return the first n rows ordered by columns in descending order in group. Return the first n rows with the smallest values in columns, in descending order. The columns that are not specified are returned as well, but not used for ordering. Parameters ---------- n : int Number of items to retrieve. See Also -------- pyspark.pandas.Series.nlargest pyspark.pandas.DataFrame.nlargest Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df.groupby(['a'])['b'].nlargest(1).sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 1 2 2 4 3 3 7 4 Name: b, dtype: int64 """ifself._psser._internal.index_level>1:raiseValueError("nlargest do not support multi-index now")groupkey_col_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]sdf=self._psser._internal.spark_frame.select(*[scol.alias(name)forscol,nameinzip(self._groupkeys_scols,groupkey_col_names)],*[scol.alias(SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,scolinenumerate(self._psser._internal.index_spark_columns)],self._psser.spark.column,NATURAL_ORDER_COLUMN_NAME,)window=Window.partitionBy(*groupkey_col_names).orderBy(scol_for(sdf,self._psser._internal.data_spark_column_names[0]).desc(),NATURAL_ORDER_COLUMN_NAME,)temp_rank_column=verify_temp_column_name(sdf,"__rank__")sdf=(sdf.withColumn(temp_rank_column,F.row_number().over(window)).filter(F.col(temp_rank_column)<=n).drop(temp_rank_column)).drop(NATURAL_ORDER_COLUMN_NAME)internal=InternalFrame(spark_frame=sdf,index_spark_columns=([scol_for(sdf,col)forcolingroupkey_col_names]+[scol_for(sdf,SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))foriinrange(self._psdf._internal.index_level)]),index_names=([psser._column_labelforpsserinself._groupkeys]+self._psdf._internal.index_names),index_fields=([psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_col_names)]+[field.copy(name=SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,fieldinenumerate(self._psdf._internal.index_fields)]),column_labels=[self._psser._column_label],data_spark_columns=[scol_for(sdf,self._psser._internal.data_spark_column_names[0])],data_fields=[self._psser._internal.data_fields[0]],)returnfirst_series(DataFrame(internal))
# TODO: add bins, normalize parameter
[docs]defvalue_counts(self,sort:Optional[bool]=None,ascending:Optional[bool]=None,dropna:bool=True)->Series:""" Compute group sizes. Parameters ---------- sort : boolean, default None Sort by frequencies. ascending : boolean, default False Sort in ascending order. dropna : boolean, default True Don't include counts of NaN. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 2, 2, 3, 3, 3], ... 'B': [1, 1, 2, 3, 3, np.nan]}, ... columns=['A', 'B']) >>> df A B 0 1 1.0 1 2 1.0 2 2 2.0 3 3 3.0 4 3 3.0 5 3 NaN >>> df.groupby('A')['B'].value_counts().sort_index() # doctest: +NORMALIZE_WHITESPACE A B 1 1.0 1 2 1.0 1 2.0 1 3 3.0 2 Name: B, dtype: int64 Don't include counts of NaN when dropna is False. >>> df.groupby('A')['B'].value_counts( ... dropna=False).sort_index() # doctest: +NORMALIZE_WHITESPACE A B 1 1.0 1 2 1.0 1 2.0 1 3 3.0 2 NaN 1 Name: B, dtype: int64 """groupkeys=self._groupkeys+self._agg_columnsgroupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(groupkeys))]groupkey_cols=[s.spark.column.alias(name)fors,nameinzip(groupkeys,groupkey_names)]sdf=self._psdf._internal.spark_frameagg_column=self._agg_columns[0]._internal.data_spark_column_names[0]sdf=sdf.groupby(*groupkey_cols).count().withColumnRenamed("count",agg_column)ifself._dropna:_groupkey_column_names=groupkey_names[:len(self._groupkeys)]sdf=sdf.dropna(subset=_groupkey_column_names)ifdropna:_agg_columns_names=groupkey_names[len(self._groupkeys):]sdf=sdf.dropna(subset=_agg_columns_names)ifsort:ifascending:sdf=sdf.orderBy(scol_for(sdf,agg_column).asc())else:sdf=sdf.orderBy(scol_for(sdf,agg_column).desc())internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsseringroupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(groupkeys,groupkey_names)],column_labels=[self._agg_columns[0]._column_label],data_spark_columns=[scol_for(sdf,agg_column)],)returnfirst_series(DataFrame(internal))
[docs]defunique(self)->Series:""" Return unique values in group. Uniques are returned in order of unknown. It does NOT sort. See Also -------- pyspark.pandas.Series.unique pyspark.pandas.Index.unique Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df.groupby(['a'])['b'].unique().sort_index() # doctest: +SKIP a 1 [1, 2] 2 [2, 3] 3 [3, 4] Name: b, dtype: object """returnself._reduce_for_stat_function(F.collect_set,only_numeric=False)
defis_multi_agg_with_relabel(**kwargs:Any)->bool:""" Check whether the kwargs pass to .agg look like multi-agg with relabling. Parameters ---------- **kwargs : dict Returns ------- bool Examples -------- >>> is_multi_agg_with_relabel(a='max') False >>> is_multi_agg_with_relabel(a_max=('a', 'max'), ... a_min=('a', 'min')) True >>> is_multi_agg_with_relabel() False """ifnotkwargs:returnFalsereturnall(isinstance(v,tuple)andlen(v)==2forvinkwargs.values())defnormalize_keyword_aggregation(kwargs:Dict[str,Tuple[Name,str]],)->Tuple[Dict[Name,List[str]],List[str],List[Tuple]]:""" Normalize user-provided kwargs. Transforms from the new ``Dict[str, NamedAgg]`` style kwargs to the old defaultdict[str, List[scalar]]. Parameters ---------- kwargs : dict Returns ------- aggspec : dict The transformed kwargs. columns : List[str] The user-provided keys. order : List[Tuple[str, str]] Pairs of the input and output column names. Examples -------- >>> normalize_keyword_aggregation({'output': ('input', 'sum')}) (defaultdict(<class 'list'>, {'input': ['sum']}), ['output'], [('input', 'sum')]) """aggspec:Dict[Union[Any,Tuple],List[str]]=defaultdict(list)order:List[Tuple]=[]columns,pairs=zip(*kwargs.items())forcolumn,aggfuncinpairs:ifcolumninaggspec:aggspec[column].append(aggfunc)else:aggspec[column]=[aggfunc]order.append((column,aggfunc))# For MultiIndex, we need to flatten the tuple, e.g. (('y', 'A'), 'max') needs to be# flattened to ('y', 'A', 'max'), it won't do anything on normal Index.ifisinstance(order[0][0],tuple):order=[(*levs,method)forlevs,methodinorder]returnaggspec,list(columns),orderdef_test()->None:importosimportdoctestimportsysimportnumpyfrompyspark.sqlimportSparkSessionimportpyspark.pandas.groupbyos.chdir(os.environ["SPARK_HOME"])globs=pyspark.pandas.groupby.__dict__.copy()globs["np"]=numpyglobs["ps"]=pyspark.pandasspark=(SparkSession.builder.master("local[4]").appName("pyspark.pandas.groupby tests").getOrCreate())(failure_count,test_count)=doctest.testmod(pyspark.pandas.groupby,globs=globs,optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,)spark.stop()iffailure_count:sys.exit(-1)if__name__=="__main__":_test()