Parsing Array of Strings in Spark

Let’s say you have a column which is an array of strings, where strings are in turn json documents, like {id: 1, name: "whatever"}. How would you parse it to an array of proper structs?

There is a good high-order function called transform that will help to transform each array element with json_tuple, so the code ideally can look like:

df = (df
      .withColumn("sa", f.expr("transform(sa, x -> struct(json_tuple(x, 'id') as id, json_tuple(x, 'name') as name))")))

however it won’t work:

Error: org.apache.spark.sql.AnalysisException: Generators are not supported when it’s nested in expressions bla bla bla…

This is due to the fact that transform is a generator function and json_tuple is also a generator, therefore you can’t combine them. One could also explode array, apply json_tuple, then group back and join back to main dataframe, however it’s just ugly.

My only solution so far was to just crate a UDF:

    StructField("id", StringType()),
    StructField("name", StringType())
def my_extract(ar):
    r = []
    if isinstance(ar, list):
        for a in ar:
            j = json.loads(a)
            r.append({"id": str(j.get("id")), "name": str(j.get("label"))})

    return r if len(r) else None
df = df.withColumn("sa", my_extract("sa"))

I really don’t like it - classic UDF and parsing JSON for each value, but it has to do for now.

Update: Better Solution

A better solution looks crippled, but doesn’t use any UDFs the idea is to use a non-generator function, but one of the built-in spark ones:

df = (df
                  f.expr("""transform(sa, x -> struct(
                  	from_json(x, 'id string')['id'] as id,
                  	from_json(x, 'name string')['name'] as name))""")))

in this case we use non-generator from_json (can’t find the docs therefore pasting PySpark source wrapper):

def from_json(col, schema, options={}):
    Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
    as keys type, :class:`StructType` or :class:`ArrayType` with
    the specified schema. Returns `null`, in the case of an unparseable string.

    :param col: string column in json format
    :param schema: a StructType or ArrayType of StructType to use when parsing the json column.
    :param options: options to control parsing. accepts the same options as the json datasource

    .. note:: Since Spark 2.3, the DDL-formatted string or a JSON format string is also
              supported for ``schema``.

    >>> from pyspark.sql.types import *
    >>> data = [(1, '''{"a": 1}''')]
    >>> schema = StructType([StructField("a", IntegerType())])
    >>> df = spark.createDataFrame(data, ("key", "value"))
    >>>, schema).alias("json")).collect()
    >>>, "a INT").alias("json")).collect()
    >>>, "MAP<STRING,INT>").alias("json")).collect()
    [Row(json={u'a': 1})]
    >>> data = [(1, '''[{"a": 1}]''')]
    >>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
    >>> df = spark.createDataFrame(data, ("key", "value"))
    >>>, schema).alias("json")).collect()
    >>> schema = schema_of_json(lit('''{"a": 0}'''))
    >>>, schema).alias("json")).collect()
    >>> data = [(1, '''[1, 2, 3]''')]
    >>> schema = ArrayType(IntegerType())
    >>> df = spark.createDataFrame(data, ("key", "value"))
    >>>, schema).alias("json")).collect()
    [Row(json=[1, 2, 3])]

    sc = SparkContext._active_spark_context
    if isinstance(schema, DataType):
        schema = schema.json()
    elif isinstance(schema, Column):
        schema = _to_java_column(schema)
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
    return Column(jc)

This is still not efficient, as from_json parses entire string and constructs a full map of properties. And we do it twice, for identical object, but way much better than using UDFs.

Have a question⁉ Contact me.