Spark: Add or Remove Struct Member/Field

To drop struct member:

def drop_struct_member(df: DataFrame, field_name: str, member_name: str or List[str]) -> DataFrame:
    col_names = df.schema[field_name].dataType.fieldNames()
    ignore = member_name if isinstance(member_name, list) else [member_name]
    df = (df
          .withColumn(field_name,
                      f.struct([f.col(f"{field_name}.{x}") for x in col_names if x not in ignore])))
    return df

usage:

drop_struct_member(df, "stuct_field_name", "struct_member_name")

To add a new member to struct:

def add_struct_member(df: DataFrame,
                      field_name: str,
                      member_name: str,
                      src: Column or str,
                      keep_src: bool = False) -> DataFrame:
    """
    Adds a new member to struct field
    :param df: source DataFrame
    :param field_name: struct field name inside the DF
    :param member_name: member name inside the struct to add
    :param src: source column, can be a column or a string
    :param keep_src: whether to keep source column or drop it
    :return:
    """

    # gets all field names as string array
    col_names = df.schema[field_name].dataType.fieldNames()
    if isinstance(src, str):
        src = f.col(src)
    existing_cols = [f.col(f"{field_name}.{x}") for x in col_names if x != member_name]
    df = (df.withColumn(field_name,
                        f.struct(*existing_cols, src.alias(member_name))))
    if not keep_src:
        df = df.drop(src)

    return df

Usage:

add_struct_member(df, "struct_field_name", "new_member_name", column_to_use_as_new_member)


To contact me, send an email anytime or leave a comment below.