Parsing Array of Strings in Spark
Let’s say you have a column which is an array of strings, where strings are in turn json documents, like {id: 1, name: "whatever"}
. How would you parse it to an array of proper structs?
There is a good high-order function called transform
that will help to transform each array element with json_tuple
, so the code ideally can look like:
df = (df
.withColumn("sa", f.expr("transform(sa, x -> struct(json_tuple(x, 'id') as id, json_tuple(x, 'name') as name))")))
however it won’t work:
Error: org.apache.spark.sql.AnalysisException: Generators are not supported when it’s nested in expressions bla bla bla…
This is due to the fact that transform
is a generator function and json_tuple
is also a generator, therefore you can’t combine them. One could also explode array, apply json_tuple
, then group back and join back to main dataframe, however it’s just ugly.
My only solution so far was to just crate a UDF:
@f.udf(returnType=ArrayType(StructType([
StructField("id", StringType()),
StructField("name", StringType())
])))
def my_extract(ar):
r = []
if isinstance(ar, list):
for a in ar:
j = json.loads(a)
r.append({"id": str(j.get("id")), "name": str(j.get("label"))})
return r if len(r) else None
df = df.withColumn("sa", my_extract("sa"))
I really don’t like it - classic UDF and parsing JSON for each value, but it has to do for now.
Update: Better Solution
A better solution looks crippled, but doesn’t use any UDFs the idea is to use a non-generator function, but one of the built-in spark ones:
df = (df
.withColumn("sa",
f.expr("""transform(sa, x -> struct(
from_json(x, 'id string')['id'] as id,
from_json(x, 'name string')['name'] as name))""")))
in this case we use non-generator from_json
(can’t find the docs therefore pasting PySpark source wrapper):
@ignore_unicode_prefix
@since(2.1)
def from_json(col, schema, options={}):
"""
Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
as keys type, :class:`StructType` or :class:`ArrayType` with
the specified schema. Returns `null`, in the case of an unparseable string.
:param col: string column in json format
:param schema: a StructType or ArrayType of StructType to use when parsing the json column.
:param options: options to control parsing. accepts the same options as the json datasource
.. note:: Since Spark 2.3, the DDL-formatted string or a JSON format string is also
supported for ``schema``.
>>> from pyspark.sql.types import *
>>> data = [(1, '''{"a": 1}''')]
>>> schema = StructType([StructField("a", IntegerType())])
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
>>> df.select(from_json(df.value, "a INT").alias("json")).collect()
[Row(json=Row(a=1))]
>>> df.select(from_json(df.value, "MAP<STRING,INT>").alias("json")).collect()
[Row(json={u'a': 1})]
>>> data = [(1, '''[{"a": 1}]''')]
>>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=[Row(a=1)])]
>>> schema = schema_of_json(lit('''{"a": 0}'''))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=None))]
>>> data = [(1, '''[1, 2, 3]''')]
>>> schema = ArrayType(IntegerType())
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=[1, 2, 3])]
"""
sc = SparkContext._active_spark_context
if isinstance(schema, DataType):
schema = schema.json()
elif isinstance(schema, Column):
schema = _to_java_column(schema)
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
return Column(jc)
This is still not efficient, as from_json
parses entire string and constructs a full map of properties. And we do it twice, for identical object, but way much better than using UDFs.
To contact me, send an email anytime or leave a comment below.