Why Cpu is the main bottleneck instead IO: 1. Hardware has been improved. 2.Spark's IO has been optimized. 3.Data Formats have improved. 4. Serialization and hashing are CPU-bound bottlenecks.
Though JVM is pretty good, as Spark applications push the boundary of performance, the overhead of JVM objects and garbage collection become non-negligible.
Spark, however, understands how data flows through various stages of computation and the scope of jobs and tasks. As a result, Spark knows much more information than the JVM Garbage Collector about the life cycles of memory blocks, and thus should be able to manage memory more efficiently.
(using an explicit memory manager to convert most Spark operations to operate directly against binary data rather than Java objects. This builds on sun.misc.Unsafe, an advanced functionality provided by the JVM that exposes C-style memory access.)
2. Cache-aware computation
Cache-aware computation improves the speed of data processing through more effective use L1/L2/L3 CPU caches, as they are orders of magnitude faster than main memory. When profiling Spark user applications, we have found that a large fraction of the CPU time is spent waiting for data to be fetched from main memory.
3. Code Generation
At runtime, Spark dynamically generates bytecode for evaluating these expressions, rather than stepping through a slower interpreter for each row. Compared with interpretation, code generation reduces the boxing of primitive data types and, more importantly, avoids expensive polymorphic function dispatches.
Several parts of project Tungsten leverage the DataFrame model:
Spark is powerful, but as Spark grows, we want to have wider audiences beyond "Big Data" engineers to leverage the power of distributed computing. Here comes the new DataFrame API (an extension of the existing RDD API), which is inspired by data frames in R and Python (Pandas). What's more, DataFrames go through the Catalyst optimizer, and enable optimized execution similar to that of Spark SQL queries.
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val a = distData.toDF
val b = a.collect() //b is an array, can access it by b(2)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")