Thanks!

The bleeding edge: Spark, Parquet, and S3

By Arnon Rotem-Gal-Oz
spark parquet s3 - Square

Spark is shaping up as the leading alternative to Map/Reduce for several reasons including the wide adoption by the different Hadoop distributions, combining both batch and streaming on a single platform and a growing library of machine-learning integration (both in terms of included algorithms and the integration with machine learning languages namely R and Python).

At AppsFlyer, we’ve been using Spark for a while now as the main framework for ETL (Extract, Transform & Load) and analytics. A recent example is the new version of our retention report that we recently released, which utilized Spark  to crunch several data streams (> 1TB a day) with ETL (mainly data cleansing) and analytics (a stepping stone towards full click-fraud detection) to produce the report.

One of the main changes we introduced in this report is the move from building on Sequence files to using Parquet files.

Parquet is a columnar data format, which is probably the best option today for storing long term big data for analytics purposes (unless you are heavily invested in Hive, where Orc is the more suitable format). The advantages of Parquet  vs. Sequence files are performance and compression without losing the benefit of wide support by big-data tools  (Spark, Hive, Drill, Tajo, Presto etc.).

One relatively unique aspect of our infrastructure for big data is that we do not use Hadoop (perhaps that’s a topic for a separate post). We are using Mesos as a resource manager instead of YARN and we use Amazon S3 instead of HDFS as a distributed storage solution. HDFS has several advantages over S3, however, the cost/benefit for running long running HDFS clusters on AWS  vs. using S3 are overwhelming in favor of S3.

That said, the combination of Spark, Parquet and S3 posed several challenges for us and this post will list the major ones and the solutions we came up with to cope with them.

Parquet and Spark

Parquet and Spark seem to have been in a love-hate relationship for a while now.

On the one hand, the Spark documentation touts  Parquet as one of the best formats for analytics of big data (it is) and on the other hand the support for Parquet in Spark is incomplete and annoying to use. Things are surely moving in the right direction but there are still a few quirks and pitfalls to watch out for.

To start on a positive note,Spark and Parquet integration has come a  long way in the past few months. Previously, one had to jump through hoops just to be able to convert existing data to Parquet.

The introduction of DataFrames to Spark made this process much, much simpler. When the input format is supported by the DataFrame API e.g. the input is JSON  (built-in) or Avro (which isn’t built in Spark yet, but you can use a library to read it) converting to Parquet is just a matter of reading the input format on one side and persisting it as Parquet on the other. Consider for example the following snippet in Scala:

val inputPath = "../data/json"
val outputPath = "../data/parquet"
val data = sqlContext.read.json(inputPath)
date.write.parquet(outputPath)

Even when you are handling a format where the schema isn’t part of the data, the conversion process is quite simple as Spark lets you specify the schema programmatically.

The Spark documentation is pretty straightforward and contains examples in Scala, Java and Python.

Furthermore, it isn’t too complicated to define schemas in other languages. For instance, here (AppsFlyer), we use Clojure as our main development language so we developed a couple of helper functions to do just that.

The sample code below provides the details: The first thing is to extract the data from whatever structure we have and specify the schema we like. The code below takes an event-record and extracts various data points from it into a vector of the form [:column_name value optional_data_type]. The data type is optional since it is assumed to be a string unless otherwise specified.

(defn record-builder
  [event-record]
  (let [..
        raw-device-params (extract event-record "raw_device_params")
        result [...
                [:operator (get raw-device-params "operator")]
                [:model (get raw-device-params "model")]
                ...
                [:launch_counter counter DataTypes/LongType]]]
  result))

The next step is to use the above mentioned structure to both extract the schema and convert to DataFrame Rows:

(defn extract-dataframe-schema
  [rec]
  (let [fields (reduce (fn [lst schema-line]
                         (let [k (first schema-line)
                               t (if (= (count schema-line) 3) (last schema-line) DataTypes/StringType) ]
                           (conj lst (DataTypes/createStructField (name k) t NULLABLE)))) [] rec)
        arr (ArrayList. fields)]
    (DataTypes/createStructType arr)))

(defn as-rows
  [rec]
  (let [values (object-array (reduce (fn [lst v] (conj lst v)) [] rec))]
    (RowFactory/create values)))

Finally we apply these functions over an RDD, convert it to a data frame and save as Parquet:

(let [..
     schema (trans/extract-dataframe-schema (record-builder nil))
     ..
     rdd (spark/map record-builder some-rdd-we-have)
     rows (spark/map trans/as-rows rdd)
     dataframe (spark/create-data-frame sql-context rows schema)
    ]
(spark/save-parquert dataframe output-path :overwrite))

As mentioned above, things are on the up and up for Parquet and Spark but the road is not clear yet.

Some of the problems we encountered include:

  • A critical bug in the version 1.4 release where a race condition when writing Parquet files caused significant data loss on jobs (This bug is fixed in version 1.4.1 – so if you are using Spark 1.4 and Parquet upgrade yesterday!)
  • Filter pushdown optimization, which is turned off  by default since Spark still uses Parquet 1.6.0rc3  – even though 1.6.0 has been out for awhile (it seems Spark 1.5 will use parquet 1.7.0 so the problem will be solved)
  • Parquet is not “natively” supported in Spark, instead, Spark relies on Hadoop support for the Parquet format – this is not a problem in itself, but for us it caused major performance issues when we tried to use Spark and Parquet with S3 – more on that in the next section

Parquet, Spark, and S3

Amazon S3 (Simple Storage Services) is an object storage solution that is relatively cheap to use.

It does have a few disadvantages vs. a “real” file system; the major one is eventual consistency i.e. changes made by one process are not immediately visible to other applications. (If you are using Amazon’s EMR you can use EMRFS “consistent view” to overcome this.) However, if you understand this limitation, S3 is still a viable input and output source, at least for batch jobs.

As mentioned above, Spark doesn’t have a native S3 implementation and relies on Hadoop classes to abstract the data access to Parquet.

Hadoop provides 3 file system clients to S3:

  • S3 block file system (URI schema of the form “s3://..”) which doesn’t seem to work with Spark
  • S3 native file system (“s3n://..” URIs) – download  Spark distribution that supports Hadoop 2.* and up if you want to use this (tl;dr – you don’t)
  • s3a – a replacement for s3n that removes some of the limitations and problems of s3n. Download Spark with Hadoop 2.6 and up to use this one

When we used Spark 1.3 we encountered many problems when we tried to use S3, so we started out using s3n – which worked for the most part, i.e. we got jobs running and completing but a lot of them failed with various read timeout and host unknown exceptions.

Looking at the tasks within the jobs the picture was even grimmer with high percentages of failures that pushed us to increase timeouts and retries to ridiculous levels. When we moved to Spark 1.4.1, we took another stab at trying s3a.

This time around we got it to work.

The first thing we had to do was to set both spark.executor.extraClassPath and spark.executor.extraDriverPath to point at the aws-java-sdk and the hadoop-aws jars since apparently both are missing from the “Spark with Hadoop 2.6” build. Naturally we used the 2.6 version of these files but then we were hit by this little problem. Hadoop 2.6 AWS implementation has a bug which causes it to split S3 files  in unexpected ways (e.g. a 400 files jobs ran with 18 million tasks) luckily replacing Hadoop AWS jar to version 2.7.0 for Spark solved this problem and using s3a prefixes works without hitches (and provides better performance than s3n).

Finding the right S3 Hadoop library contributes to the stability of our jobs but  regardless of S3 library (s3n or s3a) the performance of Spark jobs that use Parquet files was abysmal.

When looking at the Spark UI, the actual work of handling the data seemed quite reasonable but Spark spent a huge amount of time before actually starting the work and after the job was “completed” before it actually terminated. We like to call this phenomena the “Parquet Tax.”

Obviously we couldn’t live with the “Parquet Tax” so we delved into the log files of our jobs and discovered several issues.

This first one has to do with startup times of Parquet jobs. The people that built Spark understood that schema can evolve over time and provides a nice feature for DataFrames called “schema merging.” If you look at schema in a big data lake/reservoir (or whatever it is called today) you can definitely expect the schema to evolve over time.

However if you look at a directory that is the result of a single job there is no difference in the schema… It turns out that when Spark initializes a job, it reads the footers of all the Parquet files to perform the schema merging.

All this work is done from the driver before any tasks are allocated to the executor and can take long minutes, even hours (e.g. we have jobs that look back at half a year of install data). It isn’t documented but looking at the Spark code  you can override this behavior by specifying mergeSchema as false :

In Scala:

val file = sqx.read.option("mergeSchema", "false").parquet(path) 

and in Clojure:

(-> ^SQLContext sqtx
    (.read)
    (.format "parquet")
    (.options (java.util.HashMap. {"mergeSchema" "false" "path" path}))
    (.load))

Note that this doesn’t work in Spark 1.3. In Spark 1.4 it works as expected and in Spark 1.4.1 it causes Spark only to look at _common_metadata file which is not the end of the world since it is a small file and there’s only one of these per directory. However, this brings us to another aspect of the “Parquet Tax” – the “end of job” delays.

Turning off schema merging and controlling the schema used by Spark helped cut down the job start up times but, as mentioned we still suffered from long delays at the end of jobs. We already knew of one Hadoop<->S3 related problem when using text files. Hadoop  being immutable first writes files to a temp directory and then copies them over. With S3 that’s not a problem but the copy operation is very very expensive.

With text files, DataBricks created DirectOutputCommitter  (probably for their Spark SaaS offering). Replacing the output committer for text files is fairly easy – you just need to set “spark.hadoop.mapred.output.committer.class” on the Spark configuration e.g.:

(spark-conf/set "spark.hadoop.mapred.output.committer.class" "com.appsflyer.spark.DirectOutputCommitter")

A similar solution exists for Parquet and unlike the solution for text files it is even part of the Spark distribution.

However, to make things complicated you have to configure it on Hadoop configuration and not on the Spark configuration. To get the Hadoop configuration you first need to create a Spark context from the Spark configuration, call hadoopConfiguration on it and then set “spark.sql.parquet.output.committer.class” as in:

(let [ctx (spark/spark-context conf)
      hadoop-conf (.hadoopConfiguration ^JavaSparkContext ctx)]
     (.set hadoop-conf "spark.sql.parquet.output.committer.class" "org.apache.spark.sql.parquet.DirectParquetOutputCommitter"))

Using the DirectParquetOutputCommitter provided a significant reduction in the “Parquet Tax” but we still found that some jobs were taking a very long time to complete.

Again the problem was the file system assumptions Spark and Hadoop hold which were  the culprits. Remember the “_common_metadata” Spark looks at the onset of a job – well, Spark spends a lot of time at the end of the job creating both this file and an additional MetaData file with additional info from the files that are in the directory. Again this is all done from one place (the driver) rather than being handled by the executors.

When the job results in small files (even when there are couple of thousands of those) the process takes reasonable time. However, when the job results in larger files (e.g. when we ingest a full day of application launches) this takes upward of an hour. As with mergeSchema the solution is to manage metadata manually so we set “parquet.enable.summary-metadata” to false (again on the Hadoop configuration and generate the _common_metadata file ourselves (for the large jobs)

To sum up, Parquet and especially Spark are works in progress – making cutting edge technologies work for you can be a challenge and require a lot of digging.

The documentation is far from perfect at times but luckily all the relevant technologies are open source (even the Amazon SDK), so you can always dive into the bug reports, code etc. to understand how things actually work and find the solutions you need. Also, from time to time you can find articles and blog posts that explain how to overcome the common issues in technologies you are using. I hope this post clears off some of the complications  of integrating Spark, Parquet and S3, which are, at the end of the day, all great technologies with a lot of potential.

Morri Feldman and Michael Spector contributed to this post.

Arnon Rotem-Gal-Oz

Arnon Rotem-Gal-Oz was the Chief Data Officer of Appsflyer. Arnon has more than 20 years of experience developing, managing and architecting large distributed systems using varied platforms and technologies. Arnon is the author of SOA Patterns from Manning publications.
Background
Ready to start making good choices?