Source code for niimpy.analysis.rhythms

import pandas as pd
import datetime


def _align_data(data: pd.DataFrame, period: int, freq: str) -> pd.DataFrame:
    """
    Returns a pandas DataFrame aligned by timestamp, starting either at 00:00 daily or 00:00 of a Monday weekly.

    Parameters
    ----------
    data : pandas.DataFrame
        Input dataframe with a Timestamp index.
    period : int
        Number of hours for the desired time span (e.g., 24 for daily, 168 for weekly).
    type : str
        Type of alignment to perform, can be 'daily' or 'weekly'.

    Returns
    -------
    aligned_data : pandas.DataFrame
        Dataframe aligned based on the provided alignment type.

    Raises
    ------
    ValueError:
        Raised if an invalid alignment type is provided.
    """
    
    if freq not in ["daily", "weekly"]:
        raise ValueError(f"Incorrect type '{type}'. Choose between 'daily' and 'weekly'.")
    
    results = []

    for user in data['user'].unique():
        user_data = data[data['user'] == user].sort_index()
        
        # Identify the first index starting at hour 0
        start_indices = user_data.index[user_data.index.hour == 0]

        # If freq is weekly, further refine to start on a Monday
        if freq == "weekly":
            start_indices = start_indices[start_indices.dayofweek == 0]

        if start_indices.empty:
            continue
        
        start_time = start_indices[0]
        end_time = start_time + datetime.timedelta(hours=period)

        # Check if data spans the given period
        if end_time <= user_data.index.max():
            filtered_data = user_data[start_time:end_time]
            results.append(filtered_data)

    results = pd.concat(results)

    return results


def _aggregate(data: pd.DataFrame, groupby_cols: list, freq: str) -> pd.DataFrame:
    """
    Returns a pandas DataFrame aggregated based on the given frequency ('daily' or 'weekly').

    Parameters
    ----------
    data : pandas.DataFrame
        Input dataframe with a Timestamp index and 'user' column.
    freq : str
        Desired frequency for aggregation, either 'daily' or 'weekly'.

    Returns
    -------
    aggregated_data : pandas.DataFrame
        Dataframe aggregated by the specified frequency.

    Raises
    ------
    ValueError:
        Raised if an invalid aggregation frequency is provided.
    """
    
    if freq not in ["daily", "weekly"]:
        raise ValueError(f"Incorrect frequency '{freq}'. Choose between 'daily' and 'weekly'.")
    
    agg_data = []

    for name, user_data in data.groupby(groupby_cols):
        if freq == 'daily':

            agg_features = user_data.groupby(user_data.index.hour).sum(numeric_only=True)
        else:  # freq == 'weekly'
            user_data['hour_dayofweek'] = user_data.index.strftime('%A %H')
            agg_features = user_data.groupby(['hour_dayofweek']).sum(numeric_only=True)
            agg_features.index.names = ['time']

        for idx, col in enumerate(groupby_cols):
            agg_features[col] = name if isinstance(name, str) else name[idx]
        agg_data.append(agg_features)

    return pd.concat(agg_data)


def _compute_distribution(data: pd.DataFrame,  cols: list, groupby_cols: list, freq: str) -> pd.DataFrame:
    """
    Returns a pandas DataFrame with count distribution for each unique value in specified columns, aggregated by frequency.

    Parameters
    -------
    data : pandas.DataFrame
        Input dataframe with a Timestamp index.
    cols : list
        List of column name(s) for which to compute distribution.
    frequency : str
        Aggregation frequency, either 'daily' or 'weekly'.

    Returns
    -------
    distribution : pandas.DataFrame
        Dataframe with count distribution for specified columns.

    Raises
    -------
    ValueError:
        Raised if a specified column is not present in the input dataframe.
    """
    
    # Validate frequency
    if freq not in ["daily", "weekly"]:
        raise ValueError(f"The specified frequency '{freq}' is not valid. Choose 'daily' or 'weekly'.")

    for col in cols:
        if col not in data.columns:
            raise ValueError(f"The specified column '{col}' does not exist in the input dataframe.")
        
        values_sum = data.groupby(by=groupby_cols)[col].transform('sum')
        data[f'{col}_distr'] = data[col] / values_sum
    return data


[docs] def compute_rhythms(df: pd.DataFrame, timebin: str, cols: list, groupby_cols: list, period: int, freq: str, group: str = None) -> pd.DataFrame: """ Returns a pandas DataFrame containing rhythm computations for the input data based on specified frequency. Parameters ---------- df : pandas.DataFrame Input dataframe with a Timestamp index and 'user' column. timebin : str Time bin for grouping, using pandas frequency string (e.g., '1h', '2h', '4h', '6h'). cols : list List of columns to compute the count distribution for. groupby_cols : list Columns by which to group the data. period : int Duration in hours for which rhythm is computed. E.g., 8 weeks would be 8*7*24 hours. freq : str Sampling frequency, either 'daily' or 'weekly'. Steps ----- 1. Data alignment based on the given frequency: - 'daily' starts at the first 00 hour. - 'weekly' starts at the first Monday 00 hour. 2. Resample data based on the given timebin. 3. Aggregate columns by summing values within each bin. 4. Normalize data to compute bin's percentage contribution to the total. Returns ------- rhythms : pandas.DataFrame Dataframe detailing call count distribution rhythms. Raises ------ ValueError: If provided frequency is incorrect or a specified column isn't in the dataframe. """ # Check if index is a DateTime index if not isinstance(df.index, pd.DatetimeIndex): raise ValueError("The input DataFrame must have a DateTime index.") aligned_df = _align_data(df, period=period, freq=freq) resampled_df = aligned_df.groupby(groupby_cols).resample(timebin, include_groups=False).sum().reset_index(level=0) # keep time index agg_data = _aggregate(resampled_df, groupby_cols=groupby_cols, freq=freq) rhythms = _compute_distribution(agg_data, cols=cols, groupby_cols=groupby_cols, freq=freq) return rhythms
[docs] def rhythm( df, period = "4W", freq = "1W", bin = "1D", cols= [], groupby_cols= None, ): """ Compute rhythms from the input data. - Resample data to the given frequency. - Aggregate data to the specified period. - Compute the distribution over the frequency for the specified columns. Parameters ---------- df : pandas.DataFrame Input dataframe with a Timestamp index. period : str Period for which to compute the rhythm. Default is 'M' for monthly. freq : str Frequency for the rhythm. Default is 'W' for weekly. bin : str Time bin for the rhythm computation. Defaults to 'D' for daily. cols : list List of columns to compute the count distribution for. groupby_cols : list Columns by which to group the data. Defaults to user and device Returns ------- rhythms : pandas.DataFrame Dataframe detailing rhythms. """ if groupby_cols is None: groupby_cols = ["user", "device"] groupby_cols = [col for col in groupby_cols if col in df.columns] df = df[groupby_cols+cols] # if column type is not numeric, use count, otherwise sum for col in cols: if df[col].dtype == 'object': df[col] = 1 df = df.groupby(groupby_cols).resample(bin, include_groups=False).sum() df.reset_index(groupby_cols, inplace=True) freq_in_bins = pd.to_timedelta(freq) // pd.to_timedelta(bin) period_in_bins = pd.to_timedelta(period) // pd.to_timedelta(bin) def _get_bin_index(df): df = df.reset_index() df["bin"] = df.index % period_in_bins df = df.set_index("index") return df df = df.groupby(groupby_cols).apply(_get_bin_index, include_groups=False) df.reset_index(groupby_cols, inplace=True) df = df.groupby(groupby_cols+["bin"]).sum() df.reset_index(groupby_cols, inplace=True) df["freq"] = df.index // freq_in_bins freq_sum = df.groupby(groupby_cols+["freq"], as_index=False).sum() df = pd.merge(df, freq_sum, on=groupby_cols+["freq"], how="left", suffixes=('', '_sum')) for col in cols: df[col+"_rhythm"] = df[col] / df[col+"_sum"] df.fillna(0, inplace=True) df.drop(columns=[col, col+"_sum"], inplace=True) df.drop(columns=["freq"], inplace=True) return df