Question

Multi-key GroupBy with shared data on one key

I am working with a large dataset that includes multiple unique groups of data identified by a date and a group ID. Each group contains multiple IDs, each with several attributes. Here’s a simplified structure of my data:

| date       | group_id | inner_id | attr_a | attr_b | attr_c |
|------------|----------|----------|--------|--------|--------|
| 2023-06-01 | A1       | 001      | val    | val    | val    |
| 2023-06-01 | A1       | 002      | val    | val    | val    |
...

Additionally, for each date, I have a large matrix associated with it:

| date       | matrix       |
|------------|--------------|
| 2023-06-01 | [[...], ...] |
...

I need to apply a function for each date and group_id that processes data using both the group attributes and the matrix associated with that date. The function looks like this:

def run(group_data: pd.DataFrame, matrix) -> pd.DataFrame:
    # process data
    return processed_data

Here, group_data contains the attributes for a specific group:

| inner_id | attr_a | attr_b | attr_c |
|----------|--------|--------|--------|
| 001      | val    | val    | val    |
...

Here is my current implementation, it works but I can only run ~200 dates at a time because I am broadcasting all data to all workers (I have ~2k dates, ~100 groups per date, ~150 inner elements per group)

def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
    # Convert matrices to a dictionary mapping dates to matrix
    date_matrices = matrices.rdd.collectAsMap()

    # Broadcast the matrices
    broadcasted_matrices = spark_context.broadcast(date_matrices)

    # Function to apply calculations
    def apply_calculation(group_key: Tuple[str, str], data_group: pd.DataFrame) -> pd.DataFrame:
        date = group_key[1]
        return custom_calculation_function(broadcasted_matrices.value[date], data_group)

    # Apply the function to each group
    return data.groupby('group_id', 'date').applyInPandas(apply_calculation, schema_of_result)

How can I optimize this computation to parallelize the processing effectively, ensuring that the matrices are not redundantly loaded into memory more than necessary?

 6  244  6
1 Jan 1970

Solution

 1

It seems like you don't want to broadcast all the matrices to all the workers resulting in a rather large overhead. This answer also seems to tackle a similar problem where data accessed outside the function scope results in a large computation.

I'm not completely familiar with pySpark myself yet, but I would assume that Spark can handle a join (basically your lookup broadcasted_matrices.value[date]) and a groupby rather efficient. Maybe you can try something like this:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import DataFrame
from typing import Tuple

# this was just used for my type hinting
spark_context = SparkContext()
schema = None

def custom_calculation_function(group_key: Tuple[str, str], data_group: pd.DataFram) -> pd.DataFrame:
    matrix = data_group["matrix"]
    rest_of_data = data_group.loc[:, data_group.columns != 'matrix']
    ... # whatever you want to do

def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
    return (data
            .join(matrices, on="date", how="left")
            .groupby('date', 'group_id', "inner_id")
            .applyInPandas(custom_calculation_function, schema)
            )
# maybe some additional work / intermediate schema is needed to construct your final schema
2024-07-06
mortom123