How to read CSV data from memory in Apache Spark

Today I had an issue with Spark - there was a need to read a CSV data, however the CSV file itself was stored on an FTP server, in a zip archive, which contained one single file - CSV. Unfortunately I couldn’t use local disk for that (genuine reasons!).

Here is a sample code to do that:

def read_zipped(file_path: str) -> DataFrame:
    log.info(f"downloading {name}...")

    # download from FTP as binary data
    with ftp.open(file_path, "r") as ff:
        zip_bin_data = ff.read()

    log.info(f"unzipping {name}...")
    byte_file = io.BytesIO(zip_bin_data)
    with zipfile.ZipFile(byte_file, "r") as zip_ref:
        entry_name = zip_ref.filelist[0]
        log.info(f"first entry: {entry_name}")
        csv_bin_data = zip_ref.read(entry_name)

    csv_data = csv_bin_data.decode("utf-8")
    log.info(f"got CSV line of {len(csv_data)} char(s)")

    lines = csv_data.splitlines()
    log.info(f"{len(lines)} line(s)")

    # read as csv from memory
    dt = spark.sparkContext.parallelize(lines)
    df = spark.read.csv(dt, header=True)

    # fix column names
    for col_name in df.columns:
        if " " in col_name:
            df = df.withColumnRenamed(col_name, col_name.replace(" ", ""))

    return df

The trick here is using csv_data.splitlines first to divide text into lines - unlike csv_data.split("\n") which behaves differently on different OSes.

Then creating DataSet[Row] by using sc.parallelize(lines) and passing into spark.read.csv which on it’s own doesn’t document the fact that you can pass something other than path to files to read which must be on a local disk.

This function returns a well formatted DataFrame I can use further.


To contact me, send an email anytime or leave a comment below.