Spark: Add or Remove a Column from a DataFrame

Adding a Column

There is no way to natively modify a struct in Spark, so instead one has to rewrite it with a new struct that differs from previous one. I’ve written a utility function to do just that:

def add_struct_member(df: DataFrame,
                      field_name: str,
                      member_name: str,
                      src: Column or str,
                      keep_src: bool = False) -> DataFrame:
    Adds a new member to struct field
    :param df: source DataFrame
    :param field_name: struct field name inside the DF
    :param member_name: member name inside the struct to add
    :param src: source column, can be a column or a string
    :param keep_src: whether to keep source column or drop it

    # gets all field names as string array
    col_names = df.schema[field_name].dataType.fieldNames()
    if isinstance(src, str):
        src = f.col(src)
    existing_cols = [f.col(f"{field_name}.{x}") for x in col_names if x != member_name]
    df = (df.withColumn(field_name,
                        f.struct(*existing_cols, src.alias(member_name))))
    if not keep_src:
        df = df.drop(src)

    return df

Removing a Column

Very similar to before, and allows to supply a list of columns to delete:

def drop_struct_member(df: DataFrame, field_name: str, member_name: str or List[str]) -> DataFrame:
    col_names = df.schema[field_name].dataType.fieldNames()
    ignore = member_name if isinstance(member_name, list) else [member_name]
    df = (df
                      f.struct([f.col(f"{field_name}.{x}") for x in col_names if x not in ignore])))
    return df

To contact me, send an email anytime or leave a comment below.