Spark: Add or Remove Struct Member/Field
To drop struct member:
def drop_struct_member(df: DataFrame, field_name: str, member_name: str or List[str]) -> DataFrame:
col_names = df.schema[field_name].dataType.fieldNames()
ignore = member_name if isinstance(member_name, list) else [member_name]
df = (df
.withColumn(field_name,
f.struct([f.col(f"{field_name}.{x}") for x in col_names if x not in ignore])))
return df
usage:
drop_struct_member(df, "stuct_field_name", "struct_member_name")
To add a new member to struct:
def add_struct_member(df: DataFrame,
field_name: str,
member_name: str,
src: Column or str,
keep_src: bool = False) -> DataFrame:
"""
Adds a new member to struct field
:param df: source DataFrame
:param field_name: struct field name inside the DF
:param member_name: member name inside the struct to add
:param src: source column, can be a column or a string
:param keep_src: whether to keep source column or drop it
:return:
"""
# gets all field names as string array
col_names = df.schema[field_name].dataType.fieldNames()
if isinstance(src, str):
src = f.col(src)
existing_cols = [f.col(f"{field_name}.{x}") for x in col_names if x != member_name]
df = (df.withColumn(field_name,
f.struct(*existing_cols, src.alias(member_name))))
if not keep_src:
df = df.drop(src)
return df
Usage:
add_struct_member(df, "struct_field_name", "new_member_name", column_to_use_as_new_member)
To contact me, send an email anytime or leave a comment below.