How to read CSV data from memory in Apache Spark
Today I had an issue with Spark - there was a need to read a CSV data, however the CSV file itself was stored on an FTP server, in a zip archive, which contained one single file - CSV. Unfortunately I couldn’t use local disk for that (genuine reasons!).
Here is a sample code to do that:
def read_zipped(file_path: str) -> DataFrame:
log.info(f"downloading {name}...")
# download from FTP as binary data
with ftp.open(file_path, "r") as ff:
zip_bin_data = ff.read()
log.info(f"unzipping {name}...")
byte_file = io.BytesIO(zip_bin_data)
with zipfile.ZipFile(byte_file, "r") as zip_ref:
entry_name = zip_ref.filelist[0]
log.info(f"first entry: {entry_name}")
csv_bin_data = zip_ref.read(entry_name)
csv_data = csv_bin_data.decode("utf-8")
log.info(f"got CSV line of {len(csv_data)} char(s)")
lines = csv_data.splitlines()
log.info(f"{len(lines)} line(s)")
# read as csv from memory
dt = spark.sparkContext.parallelize(lines)
df = spark.read.csv(dt, header=True)
# fix column names
for col_name in df.columns:
if " " in col_name:
df = df.withColumnRenamed(col_name, col_name.replace(" ", ""))
return df
The trick here is using csv_data.splitlines
first to divide text into lines - unlike csv_data.split("\n")
which behaves differently on different OSes.
Then creating DataSet[Row]
by using sc.parallelize(lines)
and passing into spark.read.csv
which on it’s own doesn’t document the fact that you can pass something other than path to files to read which must be on a local disk.
This function returns a well formatted DataFrame
I can use further.
To contact me, send an email anytime or leave a comment below.