Parsing Array of Strings in Spark

Let’s say you have a column which is an array of strings, where strings are in turn json documents, like {id: 1, name: "whatever"}. How would you parse it to an array of proper structs?

There is a good high-order function called transform that will help to transform each array element with json_tuple, so the code ideally can look like:

df = (df
      .withColumn("sa", f.expr("transform(sa, x -> struct(json_tuple(x, 'id') as id, json_tuple(x, 'name') as name))")))

however it won’t work:

Error: org.apache.spark.sql.AnalysisException: Generators are not supported when it’s nested in expressions bla bla bla…

This is due to the fact that transform is a generator function and json_tuple is also a generator, therefore you can’t combine them. One could also explode array, apply json_tuple, then group back and join back to main dataframe, however it’s just ugly.

My only solution so far was to just crate a UDF:

    StructField("id", StringType()),
    StructField("name", StringType())
def my_extract(ar):
    r = []
    if isinstance(ar, list):
        for a in ar:
            j = json.loads(a)
            r.append({"id": str(j.get("id")), "name": str(j.get("label"))})

    return r if len(r) else None
df = df.withColumn("sa", my_extract("sa"))

I really don’t like it - classic UDF and parsing JSON for each value, but it has to do for now.

Update: Better Solution

A better solution looks crippled, but doesn’t use any UDFs the idea is to use a non-generator function, but one of the built-in spark ones:

df = (df
                  f.expr("""transform(sa, x -> struct(
                  	from_json(x, 'id string')['id'] as id,
                  	from_json(x, 'name string')['name'] as name))""")))

in this case we use non-generator from_json (can’t find the docs therefore pasting PySpark source wrapper):

def from_json(col, schema, options={}):
    Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
    as keys type, :class:`StructType` or :class:`ArrayType` with
    the specified schema. Returns `null`, in the case of an unparseable string.

    :param col: string column in json format
    :param schema: a StructType or ArrayType of StructType to use when parsing the json column.
    :param options: options to control parsing. accepts the same options as the json datasource

    .. note:: Since Spark 2.3, the DDL-formatted string or a JSON format string is also
              supported for ``schema``.

    >>> from pyspark.sql.types import *
    >>> data = [(1, '''{"a": 1}''')]
    >>> schema = StructType([StructField("a", IntegerType())])
    >>> df = spark.createDataFrame(data, ("key", "value"))
    >>>, schema).alias("json")).collect()
    >>>, "a INT").alias("json")).collect()
    >>>, "MAP<STRING,INT>").alias("json")).collect()
    [Row(json={u'a': 1})]
    >>> data = [(1, '''[{"a": 1}]''')]
    >>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
    >>> df = spark.createDataFrame(data, ("key", "value"))
    >>>, schema).alias("json")).collect()
    >>> schema = schema_of_json(lit('''{"a": 0}'''))
    >>>, schema).alias("json")).collect()
    >>> data = [(1, '''[1, 2, 3]''')]
    >>> schema = ArrayType(IntegerType())
    >>> df = spark.createDataFrame(data, ("key", "value"))
    >>>, schema).alias("json")).collect()
    [Row(json=[1, 2, 3])]

    sc = SparkContext._active_spark_context
    if isinstance(schema, DataType):
        schema = schema.json()
    elif isinstance(schema, Column):
        schema = _to_java_column(schema)
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
    return Column(jc)

This is still not efficient, as from_json parses entire string and constructs a full map of properties. And we do it twice, for identical object, but way much better than using UDFs.

Thanks! You can always email me or use contact form for more questions/comments etc.