Home Spark
Post
Cancel

Spark

Introduction

  • spark - developed at uc berkley as an improvement of hadoop
  • it borrowed concepts from hadoop but now works independent of it
  • unlike hive etc, spark doesn’t convert into the map reduce equivalent
  • it is much more performant than hadoop
  • it offered much more flexibility when compared to hadoop -
    • unlike hadoop which relies on yarn as a cluster manager, it can work with different cluster managers like mesos and kubernetes. cluster manager, resource manager, container orchestrator, etc all mean the same thing
    • unlike hadoop which relies on hdfs for storage, it can work with cloud storage options as well
  • eventually, it became its own thing as it has much more features
  • why spark is popular - we write for e.g. sql and all the complexities around distributed processing is abstracted away from us. also, it is a unified platform - all capabilities - including batch processing, stream processing and ml are in one platform
  • databricks - founded by original developers of spark. for e.g. it makes deploying spark applications much easier, has an optimized runtime for spark, etc

Driver, Executor and Deployment Modes

  • we submit our spark application to spark
  • spark creates a master (called driver in spark) and slaves (called executor in spark)
  • the driver will just assign work to the executors, while the executors perform all the heavy tasks
  • when we used local[*] in the java program, we basically used the local cluster manager. this is why we never had to build a jar and submit it to a spark cluster. * probably means spark will decide how many threads to use, but we can specify a number as well. a single jvm is used in this case. this is a useful alternative when testing out things in local. both the driver and executors are inside one jvm
  • apart from local, the real cluster managers supported by spark include yarn, mesos and kubernetes
  • now, there are two deployment modes for running spark in an actual cluster -
    • client mode - the driver will be run on the client side. the client itself will spawn executors in the spark cluster. this is what happens when we use interactive clients like spark-shell. so, the driver dies automatically when the interactive shell is closed
    • cluster mode - both the driver and the executors will run on the cluster. this is what happens when we submit built jars to a spark cluster
  • i think deployment modes are not applicable when using the local cluster manager, since there is no actual cluster over there in the first place, since both driver and executors were inside the same jvm

Getting Started

  • download spark from here
  • tar -xvzf spark-3.5.0-bin-hadoop3.tgz
  • pom.xml - the junit configuration was needed because otherwise i was getting “cannot access class sun.nio.ch.DirectBuffer”. i am using java 8 and latest versions of spark and junit possible
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project
      xmlns="http://maven.apache.org/POM/4.0.0"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.example</groupId>
      <artifactId>spark-batch</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.13</scala.version>
        <spark.version>3.5.0</spark.version>
        <junit.version>5.10.1</junit.version>
        <surefire.version>3.2.1</surefire.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.junit.jupiter</groupId>
          <artifactId>junit-jupiter</artifactId>
          <scope>test</scope>
        </dependency>
      </dependencies>
    
      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>org.junit</groupId>
            <artifactId>junit-bom</artifactId>
            <version>${junit.version}</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>${surefire.version}</version>
            <configuration>
              <argLine>
                --add-opens=java.base/java.lang=ALL-UNNAMED
                --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
                --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
                --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED
                --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED
                --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
                --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
                --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
                --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
                --add-opens=java.base/sun.security.action=ALL-UNNAMED
                --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
                --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
              </argLine>
            </configuration>
          </plugin>
        </plugins>
      </build>
    </project>
    
  • app - notice how we derive master and file name from args, so that we can use the same spark code for running in both cases - when we use locally installed hadoop in pseudo distributed mode and when we use the local cluster manager
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    
    public class Main {
    
      private static final Logger log = Logger.getLogger(Main.class.getName());
    
      public static void main(String[] args) {
        String master = args[0];
        SparkSession spark = SparkSession.builder()
            .master(master)
            .appName("TablesDemo")
            .getOrCreate();
    
        log.info("reading file...");
        String fileName = args[1];
        Dataset<Row> surveyDf = read(spark, fileName);
    
        log.info("performing transformations...");
        Dataset<Row> countByCountryDf = countByCountry(surveyDf);
    
        log.info("final stats = " + countByCountryDf.collectAsList());
    
        // try (Scanner sc = new Scanner(System.in)) {
        //   log.info("waiting for user acknowledgement");
        //   sc.nextLine();
        // }
      }
    
      protected static Dataset<Row> countByCountry(Dataset<Row> surveyDf) {
        return surveyDf.filter(col("age").lt(40))
            .select("Age", "Country", "state", "Gender")
            .groupBy("Country")
            .count();
      }
    
      protected static Dataset<Row> read(SparkSession spark, String fileName) {
        return spark.read()
            .format("csv")
            .option("header", true)
            .option("inferSchema", true)
            .option("path", fileName)
            .load();
      }
    }
    
  • we also pause the program for user input when using local cluster manager so that we can view the spark ui - the spark ui would only be visible while the job is running. the spark ui is visible at http://localhost:4040/jobs/
  • we can override defaults at cluster level from ~/spark-3.5.0-bin-hadoop3/conf/. this includes files like log4j2.properties.template (for logging), spark-defaults.conf.template (for configuring what we specify via SparkConf), spark-env.sh.template (for properties like java home) etc
  • writing tests - note how because we had broken our application down into separate chunks using different methods, we were able to unit test our application easily - refactor the transformations into separate methods, which input and output Dataset<Row>, then simply call this method in the unit test and call collectAsList on the output to view it as a list and assert on it
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    
    @TestInstance(TestInstance.Lifecycle.PER_CLASS)
    public class MainTest {
    
      SparkSession spark;
    
      @BeforeAll
      void setup() {
        System.out.println("setting up spark...");
        spark = SparkSession.builder()
            .master("local[*]")
            .appName("Main")
            .getOrCreate();
      }
    
      @AfterAll
      void cleanup() {
        System.out.println("cleaning up spark...");
        spark.close();
      }
    
      @Test
      void whenCsvIsRead_thenDatasetIsReadSuccessfully() {
        Dataset<Row> input = Main
            .read(spark, "src/main/resources/sample.csv");
    
        assertEquals(9, input.count());
      }
    
      @Test
      void whenCountByCountryIsCalled_thenResultIsOk() {
        Dataset<Row> input = Main
            .read(spark, "src/main/resources/sample.csv");
        Dataset<Row> output = Main.countByCountry(input);
        Map<String, Long> countMap = output.collectAsList().stream()
            .collect(Collectors.toMap((a) -> a.getString(0), (a) -> a.getLong(1)));
    
        assertEquals(4, countMap.get("United States"));
        assertEquals(2, countMap.get("Canada"));
        assertEquals(1, countMap.get("United Kingdom"));
      }
    }
    
  • understand that we could not have directly performed assertions on the dataframe, a dataframe is just partitions of data sitting in different executors. so, we first call collectAsList() to get all the data into the driver, and then we can easily perform assertions
  • we could also have generated mock data as below, instead of reading from csv in tests like we did above. both methods have their own pros and cons imho - generating mock data repeatedly has a lot of code, while reading using a csv means slower test - by mocking data we can generate data specific for each test, while using a csv does help with cleaner code
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    @Test
    void whenCountByCountryIsCalled_thenResultIsOk_unit() {
      StructType schema = new StructType(new StructField[] {
          DataTypes.createStructField("Age", DataTypes.IntegerType, true),
          DataTypes.createStructField("Gender", DataTypes.StringType, true),
          DataTypes.createStructField("Country", DataTypes.StringType, true),
          DataTypes.createStructField("state", DataTypes.StringType, true) });
    
      List<Row> rows = Arrays.asList(new Row[] {
          RowFactory.create(37, "Female", "United States", "IL"),
          RowFactory.create(44, "M", "United States", "In"),
          RowFactory.create(32, "Male", "Canada", "NA") });
    
      Dataset<Row> input = spark.createDataFrame(rows, schema);
      Dataset<Row> output = Main.countByCountry(input);
      Map<String, Long> countMap = output.collectAsList().stream()
          .collect(Collectors.toMap((a) -> a.getString(0), (a) -> a.getLong(1)));
    
      assertEquals(1, countMap.get("United States"));
      assertEquals(1, countMap.get("Canada"));
    }
    

Using Spark Local Cluster Manager

  • launch.json - equivalent of run configurations in intellij
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    {
      "version": "0.2.0",
      "configurations": [
        {
          "type": "java",
          "name": "Main [Local]",
          "request": "launch",
          "mainClass": "com.example.spark_batch.Main",
          "projectName": "spark-batch",
          "args": ["local[*]", "src/main/resources/sample.csv"]
        }
      ]
    }
    

Using Spark Submit + Hadoop

  • setup hadoop in pseudo distributed mode using this link
  • namenode format - ~/hadoop-3.3.6/bin/hdfs namenode -format
  • start all components using ~/hadoop-3.3.6/sbin/start-all.sh
  • create initial hdfs directories - ~/hadoop-3.3.6/bin/hdfs dfs -mkdir -p /user/$USER
  • copy the necessary files -
    • ~/hadoop-3.3.6/bin/hdfs dfs -put src/main/resources/sample.csv
    • ~/hadoop-3.3.6/bin/hdfs dfs -ls
  • build the jar. note - i am able to work without generating fat jars / use shade plugin for now, but based on the use case, that might be necessary -
    1
    
    mvn clean install
    
  • submitting jobs to spark - note the arguments, we need to specify them explicitly since default value of master is local and default value of deploy-mode is client
    1
    2
    3
    4
    5
    6
    7
    
    ~/spark-3.5.0-bin-hadoop3/bin/spark-submit \
      --verbose \
      --class com.example.spark_batch.Main \
      --master yarn \
      --deploy-mode cluster \
      target/spark-batch-1.0-SNAPSHOT.jar \
      sample.csv yarn
    
  • other important options which we could have sent spark-submit include
    • executor-memory, driver-memory - ram
    • executor-cores, driver-cores - cpu cores
    • num-executors - number of executors

Using Spark Shell

  • using interactive clients - ./bin/spark-shell
  • on starting spark shell, we can access the ui at http://localhost:4040/jobs/
  • e.g. we can run the following commands -
    1
    2
    
    val df = spark.read.csv("full_path_to_sample.csv")
    df.show()
    
  • note how we did not have to create a spark session manually here, unlike when writing spark applications

Dataframe + Hadoop

  • dataframe - distributed table with a well defined schema i.e. each column has a specific data type
  • working with dataframe is like working with tables in sql
  • data stored in for e.g. hdfs is broken into smaller splits. these splits are of size 128mb by default
  • the dataframe too will basically be composed of smaller chunks called partitions, where each partition might represent the hdfs split
  • my understanding - the fact that spark does everything using memory instead of using files like map reduce is what makes spark more performant when compared to hadoop as well
  • above, we talked about storage / data, now we talk about compute
  • we submit the job to spark
  • spark will then with the help of yarn’s resource manager create the application master in one of the worker nodes. recall how containers are used hadoop 2.x onwards, so this application master would be created inside a yarn / hadoop container
  • now, the spark driver will run inside this application master container
  • then, the spark driver will talk to yarn’s resource manager and create more worker containers
  • then, spark will spawn executors inside these worker containers
  • now, each executor will be responsible for some partition(s) of data, which it loads in its memory
  • while doing all this, spark will take care of rack awareness i.e. assigning executors the partitions in such a way that there is minimum network transfer between hdfs and executor / container

Spark CPU and Memory

  • recall we said that we can dictate how many cores an executor can have when submitting the spark job
  • e.g. we say executors should have 4 cores. nowadays, each core can itself be split into virtual cores as well
  • this virtual core is called a slot in spark
  • so, if we have 2 virtual cores per core, and if we assigned our executor 4 cores, it essentially has 8 slots
  • a task uses one slot
  • so, number of tasks that can run in a cluster = (number of slots in the executor) * (number of executors)
  • now, assume that spark computes that it needs 30 tasks but we only have 20 slots in our cluster
  • spark is intelligent enough to schedule the 20 tasks and queue the remaining 10 tasks
  • my understanding - we can control the “slots” (yes, slots not cores) via spark.driver.cores / spark.executor.cores, and i think these are same as passing --driver-cores / --executor-cores to spark-submit
  • till now, we talked about cpu, and now we talk about memory
  • note for pyspark -
    • the driver will have a python component running alongside it. so, if for e.g. using yarn, inside the application master container, there will be a python process and the actual jvm driver
    • if we use functionality / libraries of python not available in pyspark, then the executors too will have a python component running alongside them. so, if for e.g. using yarn, inside the container, there will be a python process and the actual jvm executor
  • when setting memory limits, we have two variations for both executor and driver -
    • spark.driver.memory, spark.driver.memoryOverhead - my assumption - spark.driver.memory is same as passing --driver-memory to spark-submit
    • spark.executor.memory, spark.executor.memoryOverhead - my assumption - spark.executor.memory is same as passing --executor-memory to spark-submit
  • the memory variant is for the actual jvm driver / jvm executor, while the memory overhead variant is for non jvm processes (like the one needed when using pyspark)
  • so, e.g. we set spark.executor.memory to 1gb and spark.executor.memoryOverhead to 0.1
    • spark driver would ask yarn for containers having memory 1.1gb. note - this value should of course be lesser than a worker node’s physical memory, otherwise we will get exceptions
    • so, out of the 1.1gb for a container, 1gb would be allocated for executor jvm process, while the remaining 100mb would be allocated for non jvm processes, like the sidecar needed for pyspark
  • there is more to how memory is broken down in spark, it is too much for my brain for now 😛

Job Scheduling

  • there are two sides to this
    • job scheduling across different applications - dynamic resource allocation
    • job scheduling inside the same application - spark schedulers
  • just covering from theoretical perspective, how to configure this can be found here

Dynamic Resource Allocation

  • e.g. we have a spark job that uses up all the resources in our cluster
  • now, we submit another small job
  • but, this job cannot run since all the resources have already been used up
  • a small job has to wait for the large job to complete
  • so, spark has two strategies - static allocation and dynamic allocation
  • static allocation - the default. the driver will ask for all the resources for its executors upfront. it will hold on to them for the entire duration till the entire job is over
  • when we asked for some executors via the num-executors option, it meant that the spark driver would hold on to these resources for the entire duration of the job
  • however, the number of executors the stages actually use can change dynamically
  • remember - number of executors used in a stage depends on the number of tasks a stage has
  • e.g. if a stage has 20 tasks, and we have executors with 5 slots (and sufficient memory), we will actually be using 20 / 5 = 4 executors
  • but clearly, the number of executors actually needed by spark can change across stages
  • so, we can instead use dynamic resource allocation - where instead of us manually specifying the number of executors to use, it is determined dynamically for every stage
  • by default, static allocation is used, but we should consider using dynamic allocation if we are using a shared cluster for multiple jobs

Spark Schedulers

  • if our spark application has multiple jobs -
    1
    2
    
    df1.join(df2).count()
    df3.join(df4).count()
    
  • by default, spark driver would execute this code synchronously. so, first all jobs for the first line would finish and then the all jobs for the second line would start and finish
  • however, what if we use multithreading? - e.g. something like this -
    1
    2
    3
    4
    
    Thread t1 = new Thread(() -> df1.join(df2).count());
    Thread t2 = new Thread(() -> df3.join(df4).count());
    t1.start(); t2.start();
    t1.join(); t2.join();
    
  • this means that the jobs for both would be triggered in parallel
  • this is what we might actually want as well - what is the point of stalling the second job for the first job?
  • however, when kicking off the jobs in parallel like this, they will content with each other for resources
  • solution - by default, spark uses the fifo scheduler, but we can ask it to use the fair scheduler as well
  • fifo scheduler - the first job gets priority. it gets all the resources for itself. then the second job gets the leftover, and would be stalled if not enough resources are available
  • fair scheduler - assign resources to tasks in a round robin fashion. all issues like starvation (short job waiting for a long running job) etc are prevented

Transformations and Actions

  • spark dataframes are immutable
  • we tell spark driver the transformations we would like to do
  • these transformations are simple sql statements - e.g. filter where age > 40, projection of columns, grouping, etc
  • each transformation then results in a new dataframe
  • transformations can be further categorized into narrow transformation and wide transformation
  • narrow transformation - each partition of data can be processed independently. a transformation on one partition is independent of a transformation on another partition. e.g. filtering
  • wide transformation - partitions need to be repartitioned. e.g. in group by, all rows belonging to the same group need to be brought into the same partition. this process of repartitioning of data for a wide transformation is called a shuffle
  • execution plan - we write the transformations one by one using a builder pattern. but spark might not execute the operations in the same way - it will construct an execution plan, which is an optimized version of our transformations - e.g. if we filter then use project, it would move the projection before the filtering
  • lazy evaluation - spark will not execute the transformations immediately - it will build the execution plan described above and wait for us to call an action. actions include read, write, collect, show. the moment we call an action, the execution plan is triggered, and we see a job
  • collect will basically collect all the data in the driver. so, be mindful of out of memory exceptions when performing this operation

Jobs, Stages and Tasks

  • our entire spark application is broken down into jobs
  • a job is triggered only once an action is encountered (recall lazy evaluation)
  • jobs are further broken down into stages
  • stages are further broken down into tasks
  • so, tasks are the unit of work
  • a task basically executes on one slot of executor and is responsible for a partition of data
  • a task is a bunch of narrow transformations
  • all the tasks of a single stage operate in parallel
  • each wide transformation results in a new stage, due to the repartitioning that is needed
  • before the tasks of a next stage start, all tasks of the previous stage should complete, because that was the entire point behind wide transformation - it depends on all the previous stage’s partitions and not just one
  • when going from one stage to another, since data is being shuffled / repartitioned, data is temporarily written to a buffer which spark calls exchange
  • so, the idea probably is to wait for all tasks of a stage to complete and then with the help of exchange, get the right partition of data to the right executor and finally kick off the tasks of the new stage
  • this process of copying data from the write exchange to the read exchange is called shuffle / sort

job stages tasks

Debugging Spark

  • debugging spark is not easy - all the code we write is first converted into an execution plan and is lazily evaluated
  • so, when we place debug pointers in our code, we are just stepping through the driver thread (which is not even doing anything). we are not stepping through the executor thread actually performing those transformations
  • we can however use lambda accepting transformations like map, flatMap, forEach, etc
  • when we place debug pointer inside these lambdas, we will be able to see the executor thread performing them
  • logs are the best way of debugging a production spark application, which is running in a distributed environment
  • first step is to log using the log4j2 libraries that come as a transient dependency from spark libraries, which i did in the code snippet shown earlier
  • second step would be to provide the appropriate log configuration like -
    • log4j2.properties file to use. this can be cluster wide or application specific, depends on use case
    • configure the file and console appenders, specifying file names for the file appenders
    • actually locating the log files for the driver vs the executors in the cluster manager

Spark Structured APIs

  • a bit of history -
    • spark first came up with rdd, which was a better alternative to map reduce
    • then spark came up with dataframe api, which was easier to work with
    • however, we could not use the regular lambda transformations like map, filter, etc which rdd had
    • so, the idea would be that we would convert between rdd to dataframe to use these, e.g. dataframe has toJavaRDD()
    • however, on going from dataframe to rdd, we would lose out on the optimizer
    • so, spark then came up with dataset api -
      • we can use for e.g. java pojos which would give us compile time safety
      • it supported the regular lambda transformations
      • dataframe is now Dataset<Row>. row is a generic object, so it does not have the compile time safety unlike if we use pojos
      • note - apparently, in java, spark does not have the concept of DataFrame, so we should instead use Dataset<Row> anyway
  • rdd stands for resilient distributed dataset
    • resilient because if for e.g. there is a failure in one of the executors, spark knows how to load the partitions the failed executor was responsible for into a new executor
    • distributed because spark partitions the data into smaller chunks and processes them in parallel
  • spark calls its dataset api as structured apis
  • structured apis basically use rdd underneath
  • spark asks us to use structured apis where possible, since there is a catalyst optimizer (also called tungsten) sitting in between structured apis and rdd, so we lose out on the optimizations when using rdd directly
  • use rdd only for specific use cases like custom partitioning
  • using dataframe (i.e. dataset of row) vs dataset (i.e. dataset of a specific pojo) -
    • dataset will have compile time safety - using .filter((person) -> person.age > 40) has compile time safety unlike .where(col("age").gt(40))
    • dataset is less optimal when compared to dataframe - serialization is an important step in distributed computing. dataset serialization will use java serializers, while dataframe serialization will be able to use tungsten underneath, which is more performant
  • there is also something called spark sql - we can write a string which will exactly look like sql. this long string would be an alternative to chaining methods in spark dataframe api -
    1
    2
    3
    4
    5
    
    Dataset<Row> inputDf = // read....
    inputDf.createOrReplaceTempView("survey");
    Dataset<Row> countDf = spark.sql("select Country, count(*) from survey " + 
      "where age > 40 " + 
      "group by Country");
    
  • this sql works just like dataframe, so there is no performance impact there
  • based on everything above, i will probably use dataframe all the way. i would also use the java apis and not sql, since it has a bit better compile time safety / auto complete as compared to writing sql strings
  • spark engine - this sits on top of the chosen cluster manager. recall how unlike yarn is a part of hadoop, spark does not come with a cluster manager, and supports yarn, mesos, kubernetes. spark engine acts as an interface between spark and the chosen cluster manager

spark ecosystem

Execution Plan / Catalyst Optimizer Working

  • the catalyst optimizer works internally in following steps
  • or we can say that spark executes the execution plan in the following steps -
  • generate an ast (abstract syntax tree). any errors in our field names, sql function usage, etc would be caught here
  • now we will have a logical plan
  • perform optimization on our logical plan. the optimization here includes techniques like -
    • predicate pushdown - push filtering operations earlier to reduce the amount of data transfer
    • partition pruning - when writing to internal sources, we can specify partitioning scheme, and then there will be a different directory for each partition. this has been discussed in data sources. predicate pushdown can go up to as early as only reading some partitions of data when loading the data into dataframes
    • projection pruning - push projection operations earlier to reduce the amount of data transfer
  • generate a bunch of physical plans, and associate a cost with each of them. e.g. one plan uses shuffle join, another uses broadcast join
  • finally, a cost model evaluates the most optimal physical plan
  • wholestage code generation - generate the bytecode to run on each executor

execution plan

Data Sources

  • data sources in spark can be external or internal
  • external - external to spark. some notable ones include
    • jdbc data sources - oracle, ms sql, postgres, mysql
    • no sql - cassandra, mongo db
    • cloud data warehouses - snowflake, redshift
    • streaming sources - kinesis, kafka
  • internal - this can be either hdfs or cloud based storage e.g. s3 (preferred)
  • for internal source, there are several file formats which we have to consider. again, spark supports various file formats like parquet, json, csv, avro, etc
  • there are two ways to access external sources -
    • ingest using external tools to write data from external sources to internal sources. data goes unmodified from different sources into internal sources. then spark reads from these internal sources directly. useful when using spark for batch processing
    • make spark directly read from these different external sources
  • batch processing prefers first option because for e.g. our db capacity was provisioned with otlp workloads in mind, and might not be optimized for spark based big data workloads. thus, it helps decouple the two from performance, security, etc perspective
  • stream processing prefers second option
  • so basically, while we established the architecture that data from external sources -> some tools -> internal sources -> spark, we can however, directly do data from external sources -> spark
    spark architecture
  • finally, sinks work in the same way in spark - they can be internal or external
  • we use DataFrameReader to read from internal / external sources, which we obtain via spark.read()
  • we specify the type using format
  • we provide configuration using option
  • we can also provide a mode, which determines the behavior when spark encounters a malformed record. it can be -
    • permissive (default) - make all columns null and place the record in a new column
    • drop malformed - ignore the malformed records
    • fail fast - terminate the program
  • schema -
    • for file formats like csv, either it can be defined explicitly using schema (preferred), or it can infer the schema automatically (prone to errors)
    • for file formats like avro / parquet, the schema is a part of the file format itself and therefore spark derives its schema from the file format itself
  • so basically, while we can use schema for defining the schema explicitly, remember this is applicable only for formats like csv and json, so best case would be to avoid these file formats altogether and try and use parquet / avro formats where possible
  • spark has its own data types, and they map to different types specific to the language we use, e.g. we can see how spark types map to java types here
    spark to java types
  • the last boolean flag specifies whether the field is nullable or not
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    StructType schema = new StructType(new StructField[] {
        DataTypes.createStructField("FL_DATE", DataTypes.DateType, true),
        DataTypes.createStructField("OP_CARRIER", DataTypes.StringType, true),
        DataTypes.createStructField("ORIGIN", DataTypes.StringType, true),
        DataTypes.createStructField("DEST", DataTypes.StringType, true),
        DataTypes.createStructField("CANCELLED", DataTypes.IntegerType, true),
        DataTypes.createStructField("DISTANCE", DataTypes.IntegerType, true) });
    
    Dataset<Row> flightDf = spark.read()
        .format("csv")
        .option("header", "true")
        .option("path", "src/main/resources/flight*.csv")
        .option("dateFormat", "M/d/y")
        .option("mode", "FAILFAST")
        .schema(schema)
        .load();
    
    flightDf.printSchema();
    flightDf.show();
    
  • note how we specified the date format in configuration as well - for column specific configuration, maybe we can use to_date etc to convert from string to date type
  • writing data - the default format used by spark is parquet if not specified
  • the mode can be -
    • append - append to the existing data
    • overwrite
    • error if exists
    • ignore - write if location is empty, ignore otherwise
  • so, when i use the code below -
    1
    2
    3
    4
    5
    
    flightDf.write()
        .format("avro")
        .mode("overwrite")
        .option("path", "src/main/resources/output/sinks_demo")
        .save();
    
  • i get the following output -
    simple output
  • note - df.write() has return type of DataFrameWriter (recall spark.read() had return type of DataFrameReader)
  • note - for spark to avro, i had to add following dependencies, since avro related dependencies are bundled separately from spark
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-avro_${scala.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    
    <!-- for some reason, was getting jackson related issues -->
    <!-- but adding the below version manually solved it 🤔 -->
    <jackson-databind.version>2.15.3</jackson-databind.version>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson-databind.version}</version>
    </dependency>
    
  • partition by - the code is as follows - df.write().partitionBy("OP_CARRIER", "ORIGIN")
  • notice how it is chained to DataFrameWriter and not Dataset<Row> unlike repartition - so, my understanding - partition by is when we want to write the output and make it optimized for future jobs that might read this output. however, repartition can help with optimizing the current job itself
  • the columns we partition on are not visible in the output files, because they are essentially part of the directory names!
  • note how directories for origin are nested inside directories for carrier
    partition by output
  • we can also chain maxRecordsPerFile to the DataFrameWriter, just like we chained partitionBy. it is useful when there are some partitions that become too big for spark to process. e.g. in the above example, if for carrier nw and origin den, the number of flights were too many, by using this option, this directory too will contain multiple files
  • why use bucketing - since partitioning results in a unique directory for each value, partitioning by a column having too many unique values might not be a good idea, since it would result in too many directories (partitions) with too less data. so, we can instead use bucketing for columns having too many unique values
  • how it works - we specify the number of buckets and the column to bucket using. then, spark will do hash(column_value) % number_of_buckets to get the bucket in which the row should be stored
  • sorting - sorting can further improve the performance - e.g. if we had to perform joins, and the data is already sorted on the columns used for join, we can skip the sort phase in the shuffle join (described later)
  • so, just like partitionBy, i chained the following to DataFrameWriter -
    1
    2
    3
    4
    5
    6
    
    df.write()
      .bucketBy(2, "OP_CARRIER", "ORIGIN")
      .sortBy("OP_CARRIER", "ORIGIN")
      .mode("overwrite")
      .option("path", "src/main/resources/output/sinks_demo/")
      .save();
    
  • however, i got the following exception on running the above - 'save' does not support bucketBy and sortBy right now

Spark + Hive

  • so, we have to use saveAsTable. my understanding - till now, we were simply storing data as normal files, and they were accessible like a regular directory structure, but for bucketing and sorting, we need to bring in “database support” of spark
  • my understanding - whatever we discuss in this part has been borrowed from hive
  • since spark too has concepts of database and tables, there are two things spark needs to store -
    • the actual data - this is what we have seen till now, when for e.g. we saw files being stored inside folders (i.e. partitions)
    • the metadata - the table name, database name, etc. this is stored in something called metastore. by default, an in memory implementation is used i.e. the duration of this metastore is the same as the spark session
  • there are two kinds of tables in spark -
    • managed tables - spark will manage both the metadata and the actual data. by default, the actual data is stored inside spark.sql.warehouse.dir. when we for e.g. drop a table, both the metadata and the actual data get deleted
    • unmanaged tables - also called external tables. spark will only manage the metadata. when creating a table, we specify the location of the actual data. useful when for e.g. the actual data already exists somewhere and is not managed by us. when we for e.g. drop a table, only the metadata is deleted, and the actual data is untouched
  • managed tables are preferred - we can do optimizations like bucketing and sorting. with unmanaged tables, we have to rely on the existing data structure. we need unmanaged tables when we need to perform spark operations on already existing data
  • my thought - one technique might be to port data from unmanaged to managed tables for better performance / more flexibility? this should ideally again be something spark can do -
    • read from unmanaged tables
    • perform some transformations like sorting and bucketing
    • finally write to managed tables
  • we need to add the spark + hive dependency
    1
    2
    3
    4
    5
    
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_${scala.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    
  • then, chain the hive support in the spark session builder -
    1
    2
    3
    4
    5
    
    SparkSession spark = SparkSession.builder()
        .master("local[*]")
        .appName("TablesDemo")
        .enableHiveSupport()
        .getOrCreate();
    
  • now, we first create a database - (otherwise default would be used)
    1
    2
    3
    
    spark.sql("create database if not exists tables_demo");
    spark.catalog().setCurrentDatabase("tables_demo");
    spark.catalog().listDatabases().show();
    
  • output of list databases -
    list databases
  • finally, we write a dataframe as follows. note - notice how we do not provide the path parameter, since it is managed table territory, therefore the spark.sql.warehouse.dir will be used, and we call saveAsTable(db_name.table_name) instead of save()
    1
    2
    3
    4
    5
    
    flightsDf.write()
        .bucketBy(2, "OP_CARRIER", "ORIGIN")
        .sortBy("OP_CARRIER", "ORIGIN")
        .mode("overwrite")
        .saveAsTable("tables_demo.bucketed_by");
    
  • output - notice how two new directories - for metadata (metastore_db) and for storing the actual data (spark_warehouse) are created. data is stored inside <<database>>.db/<<table>>
    bucket by

Transformations

  • for transformations, we can either use spark functions like we do in sql, or we can use lambda accepting transformations like groupByKey
  • for specifying columns in transformations, either use column_name directly as a string, or use df.col("column_name"). note - we cannot use both methods in the same transformation
  • udf or user defined functions - register custom functions to use inside spark -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    UserDefinedFunction parse_gender = udf((String gender) -> {
      Pattern malePattern = Pattern.compile("^m$|^male$|^m.n$", Pattern.CASE_INSENSITIVE);
      Pattern femalePattern = Pattern.compile("^f$|^female$|^wom.n$", Pattern.CASE_INSENSITIVE);
      return malePattern.matcher(gender).find() ? "male"
          : (femalePattern.matcher(gender).find() ? "female" : "unknown");
    }, DataTypes.StringType);
    spark.udf().register("parse_gender", parse_gender);
    
    Dataset<Row> gendersDf = surveyDf
        .select("Gender")
        .withColumn("gender_cleaned", expr("parse_gender(Gender)"));
    
    gendersDf.show();
    
  • a function for adding a unique identifier to each record - monotonically_increasing_id. this number would be unique across all partitions but remember that it would not necessarily be continuous
  • usual sql constructs like renaming using alias, changing data type using cast, etc are available
  • explode - e.g. our record contains an array field. this field will ensure our result contains a record for each element of the array. e.g. our input has 2 elements in the array for the first record, and 3 elements in the array for the second record. the output will have 5 elements

Example

  • input -

    daymonthyear
    2812002
    23581
    12126
    7863
    23581
  • transformation -
    1
    2
    3
    4
    5
    6
    
    Dataset<Row> cleanedDobDf = dobDf.withColumn("year_parsed",
      when(col("year").leq(23), col("year").plus(2000))
          .when(col("year").leq(99), col("year").plus(1900))
          .otherwise(col("year")))
          .withColumn("date", concat_ws("/", col("day"), col("month"), col("year_parsed")))
          .withColumn("parsed_date", to_date(col("date"), "d/M/yyyy"));
    
  • output -

    daymonthyearyear_parseddateparsed_date
    2812002200228/1/20022002-01-28
    23581198123/5/19811981-05-23
    12126200612/12/20062006-12-12
    786319637/8/19631963-08-07
    23581198123/5/19811981-05-23

Aggregations

  • simple aggregations - note the different aggregations carefully, e.g. difference between count("*") vs count("Description")
    1
    2
    3
    4
    5
    6
    
    Dataset<Row> aggDf = inputDf.select(
        count("*").alias("total_count"),
        count("Description").alias("non_null_description_count"),
        countDistinct("InvoiceNo").alias("unique_invoices"),
        sum("Quantity").alias("total_quantity"),
        avg("UnitPrice").alias("avg_unit_price"));
    
  • grouping aggregations - we can also perform groupings using groupBy
    1
    2
    3
    4
    5
    
    Dataset<Row> aggByCountryAndInvoiceDf = inputDf
        .groupBy("Country", "InvoiceNo")
        .agg(count("Quantity").alias("total_quantity"),
            round(sum(col("UnitPrice").multiply(col("Quantity"))), 2).alias("invoice_value"));
    aggByCountryAndInvoiceDf.show();
    
  • note - when we chained groupBy, it returns a RelationalGroupedDataset, and when we again chained agg to it, it was converted back to our usual Dataset
  • window aggregations - e.g. we need the running total by week for every country. three things to keep in mind for windowing aggregations -
    • identify the partitioning columns - e.g. here, restart the running total for every country
    • identify the ordering of columns - e.g. here, ensure that the data is ordered by the week number, week 3’s running total = week 1’s sale + week 2’s sale + week 3’s sale, and this is only possible when we order by week
    • identify the window bounds - e.g. here, it starts at the first record and ends at the current record, like described in the week 3 example above
  • example - note - for the bounds, we also have something called unboundedFollowing, but for our use case, unboundedPreceding and currentRow was enough
    1
    2
    3
    4
    5
    6
    7
    
    WindowSpec windowSpec = Window.partitionBy("country")
        .orderBy("week_number")
        .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
    
    Dataset<Row> outputDf = inputDf
        .withColumn("running_total", sum("invoice_value").over(windowSpec));
    outputDf.show();
    
  • output would be automatically sorted by country and week number based on the window we specified, and it would have the running total column added to it, which automatically resets for every country

Joins

  • bringing “left” and “right” dataframe together
  • we combine them using the join expression and the join type
    1
    2
    3
    4
    
    Dataset<Row> orderWithProductsDf = orderDf.join(productDf,
        orderDf.col("prod_id").equalTo(productDf.col("prod_id")),
        "inner");
    orderWithProductsDf.show();
    
  • order schema - (order_id, prod_id, unit_price, qty)
  • product schema - (prod_id, prod_name, list_price, qty)
  • therefore, the joined table’s schema - (order_id, prod_id, unit_price, qty, prod_id, prod_name, list_price, qty)
  • note how the joined table’s schema contains two columns for quantity
    • one is from the product - it probably indicates in stock
    • one is from the order - it indicates quantity of product ordered
  • assume we wanted to select only some columns (only order’s quantity, not product’s quantity) -
    1
    2
    
    Dataset<Row> orderWithProductsDf = orderDf.join( ... )
      .select("order_id", "prod_name", "unit_price", "qty");
    
  • we get the following exception - [AMBIGUOUS_REFERENCE] Reference 'qty' is ambiguous, could be: ['qty', 'qty'].
  • how it works - we pass column names, internally spark converts to it the right identifier. when we pass qty, it probably finds two identifiers and hence gets confused
  • some solutions -
    • rename columns before joining (withColumnRenamed)
      1
      
      productDf = productDf.withColumnRenamed("qty", "product_qty");
      
    • drop one of the ambiguous columns -
      1
      2
      3
      
      Dataset<Row> orderWithProductsDf = orderDf.join( ... )
          .drop(productDf.col("qty"))
          .select("order_id", "prod_name", "unit_price", "qty");
      
    • specify explicitly which dataframe’s quantity to use - notice the end of the select clause
      1
      2
      
      Dataset<Row> orderWithProductsDf = orderDf.join( ... )
          .select(col("order_id"), col("prod_name"), col("unit_price"), orderDf.col("qty"));
      
  • outer joins - e.g. when we use an outer join like left, we might receive nulls for some columns. to get rid of the nulls, we can use coalesce, which will set the final value to the first non null value from the list it receives. e.g. below, we do a left join to get all the orders. prod_name comes from the product dataframe. if one of the products are missing, prod_name would be null. so, we tell spark to use prod_id of order dataset if prod_name of product dataset is missing
    1
    2
    3
    4
    5
    
    Dataset<Row> orderWithProductsDf = orderDf.join(productDf,
        orderDf.col("prod_id").equalTo(productDf.col("prod_id")),
        "left")
        .withColumn("prod_name", coalesce(col("prod_name"), orderDf.col("prod_id")))
        .select(col("order_id"), col("prod_name"), col("unit_price"), orderDf.col("qty"));
    
  • one small trick for clean code - note how we write each of the chaining all in one place. we can for e.g. extract some parts to variables like join conditions, some complex transformations, etc
  • another technique might be to extract parts of logic to functions that accept and return dataframes. this also helps unit test these bits of logic
  • there are two kinds of joining techniques used by spark - shuffle join and broadcast join

Shuffle Joins

  • imagine when we have two datasets with 3 partitions, and we have three executors
  • so first a shuffle + sort happens to ensure that the same keys from both datasets belong to the same executor
  • now, we can simply perform a merge to join the two datasets
  • refer diagram here to recall breakdown of exchange and shuffle + sort
  • small note - i am defaulting to thinking of shuffle joins as shuffle sort merge joins. there is another variation - shuffle hash joins, which is less optimal when compared to shuffle sort merge joins, so i am ignoring shuffle hash joins for now

shuffle join working

Optimizing Shuffle Joins

  • reduce data being joined - because this shuffle and sort process is of course the bottleneck and can cause out of memory like issues, we should consider techniques like filtering (basically reducing) the amount of data we are joining, performing aggregations before joining, etc. basically, code intelligently
  • maximize parallelism - the maximum parallelism possible when performing a join is the minimum of the three parameters below, so try maximizing the three parameters below -
    • maximum number of executors our cluster allows
    • spark.sql.shuffle.partitions - this determines the number of partitions after a shuffle i.e. after a wide transformation happens. the default value of this is 200. so, in case of join, this will give the number of partitions of the joined dataset?
    • number of unique keys in the datasets involved in the join - handling key skews / hot partitions - discussed later
  • bucketed joins - if the datasets are already bucketed and sorted using the keys involved in the join, we will not have to rely on the shuffling and sorting done by spark at all! the idea is to partition, sort and store the datasets in bucketed fashion before the join starts. we then load the datasets and perform the joins, and there would be no shuffle involved in the joins

Broadcast Joins

  • shuffle joins are used when we join two large datasets
  • however, we can use broadcast joins when either (or maybe both) the datasets are small
  • assume the smaller dataset can be stored inside one partition, while the larger dataset has 200 partitions
  • if using the shuffle join technique, first all of the 200 + 1 partitions will be sent for shuffle and sort
  • this means there is network transfer involved to first send the data to exchanges and then load it back in a sorted manner into the executors
  • however, in the broadcast join technique, the partitions of the larger dataset can stay where they were, and the smaller dataset can be copied over to all the executors having the larger dataset partitions
  • this way, we avoid having to move (shuffle) the larger dataset’s partition over the network
  • essentially, we are broadcasting the smaller dataset to all the executors having the larger dataset
  • the driver and executor memory should be > than the size of the smaller dataset, so that it can fit inside the memory
  • notice how below the larger dataset stays as it is unlike earlier where the larger dataset was sorted
    broadcast join working
  • the threshold which decides when to use a broadcast join is spark.sql.autoBroadcastJoinThreshold, which is 10mb by default
  • note - unlike shuffle join, broadcast join is a hash join always. recall in shuffle joins, we have the concept of both hash joins (rarely used) and sort merge joins
  • we can also provide spark the hint to use broadcast join like so, if we are not happy with the defaults -
    1
    2
    3
    
    import static org.apache.spark.sql.functions.broadcast;
    
    Dataset<Row> joinedDf = flightsDf1.join(broadcast(flightsDf2), ...
    
  • note - to confirm all this, go to http://localhost:4040/ -> Sql / Dataframe tab -> select the sql query
  • we can tell from this if an exchange was involved or we skipped it by using bucketed joins, if shuffle join was used or broadcast join was used, etc

Spark AQE

  • aqe - adaptive query execution
  • it includes optimizations discussed below
  • set spark.sql.adaptive.enabled to true for this (it should be enabled by default in new versions), and rest of the optimizations discussed in this sections will be automatically enabled
  • crazy granular configurations can be seen here, use documentation for specific configuration, just learning things from theoretical perspective for now

Dynamically Deciding Shuffle Partitions

  • earlier, after a wide transformation, for e.g. group by, the number of output partitions from a stage would be = spark.sql.shuffle.partitions (default 200), but lets say we set it to 10
  • what if i only have e.g. 5 groups after a group by statement?
  • spark would still create a total of 10 partitions, therefore 10 tasks in the subsequent stage
  • now, our spark job would eat up the resources for 5 empty tasks as well
  • remember that for a wide transformation, spark stalls all the tasks of its previous stage, so the empty tasks are just sitting idle
  • this optimization by aqe resolves this issue
  • spark will look at the number of unique groups, and then dynamically adjust the number of output partitions
  • now, assume one of the partitions was relatively larger
  • spark used one task for one partition of data
  • spark would complete all the tasks except this one quickly
  • again, remember that for a wide transformation, spark stalls all the tasks of its previous stage, so the tasks that get over quickly are just sitting idle
  • this optimization by aqe resolves this issue as well
  • spark would now also look at the number of records in each group
  • spark can merge some partitions to be handled by one task
  • so, since one task = one slot, that task would process multiple partitions of data one by one serially
  • e.g. this way, our job ended up using only 4 slots optimally - this is better than for e.g. using 5 slots, out of which 4 would get over pretty quickly, since the 5th slot now can be allocated to some other job
  • remember how this is different from dynamic resource allocation - dynamic resource allocation changes the executors dynamically, while dynamically deciding shuffle partitions changes the number of output partitions and what partition goes to what executor dynamically
  • so, recap - two optimizations -
    • determine the number of shuffle partitions dynamically
    • dynamically coalesce the smaller shuffle partitions

aqe shuffle partitions

Dynamically Switching Join Strategies

  • we already know that broadcast joins are more optimal than the regular shuffle joins
  • however, assume one of the tables have a lot of complex transformations before being involved in the join
  • spark may not be able to decide whether or not to use broadcast join, and would default to using shuffle join
  • however, with aqe enabled, spark can after shuffling decide to go for a broadcast join
  • the optimization here is that while we are involved in the shuffle process (therefore the network transfer) of the shuffle join, we still get rid of the sort and merge process, which is more expensive than a simple broadcast join

Dynamically Optimizing Skew Joins

  • if for e.g. we are joining two tables, and we have a hot / skewed partition
  • before aqe, number of partitions / tasks = number of unique keys involved in the joins
  • after aqe, spark is intelligent enough to break the hot partition into smaller chunks
  • now, these smaller chunks can be processed in parallel in different tasks
  • thus, we will not have an overly sized task (and thus out of memory exceptions) anymore
  • note how we had 3 tasks without aqe, but now have 4 tasks with aqe
  • note how the partition of the smaller dataset is copied

aqe skew joins

Dynamic Partition Pruning

  • it is enabled by default
  • first, recall partition pruning
  • it is usually used for efficiency gains in a star schema design, so thats the lingo used in this section
  • e.g. we have sql like below -
    1
    2
    3
    4
    
    select *
    from fact join dimension
    on fact.dimension_id = dimension.id
    where dimension.some_attribute = 'xyz'
    
  • my understanding of constraints needed for dynamic partition pruning -
    • should be a broadcast join (dimension table would be broadcasted)
    • fact table should be partitioned using dimension_id
    • and of course, dynamic partition pruning should be enabled
  • now, how this join would work is -
    • the dimension table would be filtered using some_attribute = ‘xyz’
    • the filtered dimension table would be broadcast everywhere
    • spark would be intelligent enough to only load the partitions of the fact table where dimension_id is present in the ids of the filtered dimension table

Caching

  • two methods for caching - chain cache() or persist() on the dataframe
  • cache will cache using the default storage and memory, and does not allow configuration
  • persist allows for more configuration around storage
    • use disk or memory or a combination of both
    • when storing in disk, data would of course be serialized, but when storing in memory, we can either store it in deserialized format or serialized format. serialized format advantage - would be compact therefore acquire less space. serialized format disadvantage - it would need to be serialized before storing / deserialized after reading from memory, hence it would use cpu
    • use replication
  • the default for persist / cache is memory + disk, with deserialization for memory and no replication
  • both cache and persist are lazy like transformations - they are only triggered once there is an action
    1
    2
    
    Dataset<Row> cachedDf = df.cache();
    cachedDf.count();
    
  • spark need not cache all partitions, it would only cache the partitions based on the actions we use, e.g. if we use take(10), it would just cache the first partition, since the first partition should be self sufficient in providing with 10 records. however, if for e.g. we used an action like count(), it would have to cache all partitions
  • however, spark will always either cache the entire partition or nothing, it will never cache a portion of the partition
  • when to cache - when we use the same dataframe in multiple actions
  • to evict from cache, chain unpersist() on the dataframe

Repartition and Coalesce

  • repartition - the code is as follows - partitionedDf = df.repartition(3)
  • when we try to write this repartitioned dataframe, the output looks like follows - repartition output
  • note - above, we saw repartition(number), but we can also use repartition(columns...) or repartition(number, columns...)
  • when we do not specify a number to repartition and just column names, the number of partitions created = spark.sql.shuffle.partitions
  • so basically, the number of partitions in repartition = either specified by us in the function call, or set via spark.sql.shuffle.partitions, and the column used for this partitioning can be specified by us as well
  • when to use repartition - to improve performance, but we should be absolutely sure, since repartition would cause a shuffle
  • when we are reducing number of partitions, do not use repartition, use coalesce
  • coalesce will only collapse the partitions on the same worker node, thus avoiding a shuffle sort
  • so my guess - if for e.g. we call coalesce(10), but the data was on 11 worker nodes, total number of partitions finally would be 11?

Hints

  • we can add hints related to partitioning and joins
  • hints - no guarantee that they would be used
  • in the dataframe api, we can either use spark sql functions, or use dataframe.hint()
  • join hint example using both techniques -
    1
    2
    
    df1.join(broadcast(df2))
    df1.join(df2.hint("broadcast"))
    
  • partitioning hint example - df.hint("coalesce", 4)
  • note - i don’t think there is any difference between chaining coalesce directly vs using it as a hint
  • when writing the same using sql, there is a special comment syntax we use

Shared Variables

  • these were both primarily used in rdd apis, but can have a niche use case in dataframe world as well

Broadcast Variables

  • broadcast variables use case - e.g. our udf uses some static reference data
  • the reference data is for e.g. 5-10 mb, i.e. too big to store in plain code
  • so, we can for e.g. store it in a file, and broadcast it to all the nodes
  • this way, this variable can then be used inside the udf
  • my understanding - maybe we can use closure as well i.e. we store the data in like a variable outside the udf, and then access it in the udf
  • disadvantage of using closure - if for e.g. we have 1000 tasks running on 30 nodes, there would be 1000 deserializations. in case of broadcast variables however, there would only be 30 deserializations
  • example -
    1
    2
    3
    
    SparkSession spark = // ...
    Broadcast<int[]> broadcastVar = spark.sparkContext().broadcast(new int[] {1, 2, 3});
    broadcastVar.value(); // can be used inside a udf. it returns [1, 2, 3]
    
  • note - a better technique could also have been to somehow load this reference data as a dataframe if possible

Accumulators

  • accumulators are like a global variable that we can update
  • e.g. from our udf, we would like to update a variable based on some condition
  • so, these variables can be updated on a per row basis
  • these variables basically live in the driver, and the executors internally communicate with the driver to update this variable
  • example -
    1
    2
    3
    4
    
    SparkSession spark = // ...
    LongAccumulator accum = spark.sparkContext.longAccumulator();
    numberDf.foreach((x) -> accum.add(1));
    accum.value(); // should print the count of rows
    
  • note - there is no shuffle etc involved in this process of realizing the final value of the accumulator - it is being mutated inside the driver by the executor communicating the changes to the driver
  • so, these accumulators can either be updated from transformations like udf, or actions like forEach like we saw
  • however, understand - if we use accumulators from within for e.g. udf, the value of accumulator can go bad - e.g. if a task fails, the executor will retry it - the accumulator cannot discard the partial changes made to it via the failed task, since there are too many concurrent modifications happening on it already via other tasks
  • however, this does not happen when using an accumulator from inside actions like forEach

Spark Speculation

  • can be enabled via spark.speculation, false by default
  • example we have 10 tasks, and all of them complete under 2 seconds, but one of them takes 10 seconds
  • spark will automatically identify the slow running tasks and run a duplicate copy of this task
  • this way, whichever one of the two finishes faster is used by spark, and the other task is killed
  • useful when for e.g. the original task was running slow due to a fault in the worker node that it was running on, which was causing it to be slow
  • running speculative tasks does have overhead in terms of resources
  • e.g. if there are data skews or out of memory issues in our application, spark would still run copies of this task (which too will run slow or maybe fail) without realizing that the root cause is actually the data / faulty configuration itself

Streaming Introduction

  • earlier convention was batch processing - data first comes and sits in the lake
  • then, there would be jobs that can be run for e.g. daily to perform the processing
  • however, with time, jobs started demanding for smaller and quicker batches
  • the idea is not to schedule the jobs in smaller intervals
  • instead, we start viewing data as a stream that is in motion and not at rest
  • spark streaming is an extension of the dataframe apis
  • spark uses micro batches for achieving stream processing
  • spark automatically takes care of lot of challenges like start and end time of batches, intermediate state management, etc
  • initially, spark used dstreams - built on top of rdd
  • now, sparks offers structured streaming apis - built on top of dataframe apis i.e. supports sql
  • additionally, event time semantics are supported by structured streaming apis as well, which were not available in the d stream apis
  • word count example using netcat - notice how for reading data, read() changed to readStream(), but otherwise, everything else stays the same. readStream() returns a DataStreamReader (recall read used to return DataFrameReader)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    SparkSession spark = SparkSession.builder()
        .master("local[*]")
        .appName("Streaming Demo")
        .getOrCreate();
    
    Dataset<Row> lines = spark.readStream()
        .format("socket")
        .option("host", "localhost")
        .option("port", "9999")
        .load();
    
  • data from the socket comes in a column value. we want to split each line into its constituent words, and create a separate row for each word
    1
    2
    3
    
    Dataset<Row> wordCount = lines.select(explode(split(col("value"), " ")).alias("word"))
      .groupBy("word")
      .count();
    
  • finally, we try writing it to the console. again, write() changes to writeStream(). writeStream returns a DataStreamWriter (recall write used to return a DataFrameWriter)
    1
    2
    3
    4
    5
    6
    
    StreamingQuery streamingQuery = wordCount.writeStream()
        .format("console")
        .option("checkpointLocation", "checkpoint")
        .outputMode("complete")
        .start();
    streamingQuery.awaitTermination();
    
  • note - we used streamingQuery.awaitTermination() above to simulate running an application indefinitely, and we got streamingQuery from the result of writing to a streaming sink
  • note - sinks terminate when application is stopped / due to some error condition
  • however, what if were writing to multiple sinks?
    • we can use spark.streams().awaitAnyTermination(), when any of the streaming sinks terminate
    • remember to have multiple checkpoint locations - do not use the same checkpoint location for multiple streaming sinks
  • start the netcat utility using nc -lk 9999, and run the app to see the streaming output in the console
  • working - first, spark creates an optimized logical plan, just like it did in case of dataframes
  • now, it would create a job that reads from the source, processes it and finally writes it to the sink
  • underneath, spark runs a background thread
  • based on our trigger configuration, a new spark job is created. so, a spark job will not be created at every interval, it would only be created based on our trigger configuration, and all this is taken care of us by a background thread spark streaming jobs
  • trigger determines how often to trigger the micro batch
  • the default is unspecified. trigger a micro batch immediately, but stall this current micro batch until there is some input in the source
  • trigger can also be based on for e.g. time interval - if the previous micro batch exceeds the time limit, the new batch starts after the previous batch finishes. however, if the previous micro batch finishes before the specified time limit, the new batch would wait till the mark reaches the time. for this, just chain the below to the writeStream()
    1
    
    .trigger(Trigger.ProcessingTime("1 minute"))
    
  • finally, trigger can also be continuous - this is an experimental feature, where the performance is even faster than the current micro batch approach
  • some popular streaming sources / sinks - netcat (already seen above), file and kafka
  • the file source is capable of monitoring the path for new files. it can also use archival i.e. move the processed files to a different directory / delete the processed files altogether
  • so, only sinks available are kafka, file and console for streaming requirements. how to for e.g. use jdbc? we can use forEachBatch, which is maybe called for every micro batch? -
    1
    2
    3
    4
    5
    6
    
    outputDf.writeStream().foreachBatch((df, batchId) -> {
        df.write()
            .format("xyz")
            // ...
            .save();
    });
    
  • output modes -
    • append - like insert only. used when previous outputs are not affected
    • update - like upsert i.e. either new records are added or old records are updated
    • complete - overwrite the complete result every time
  • update vs complete example -
    • input -
      streaming input
    • complete -
      streaming output complete
    • update -(look at batch 2 in particular)
      streaming output update
  • append does not make sense with aggregations like count, so it would throw an error like this - Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;. the why - this is because append means immutable - the other two output modes - complete and update have some way of reflecting updates made to previous groups, but append cannot allow for updating of existing groups, only creating of new groups. now maybe how aggregations work in spark streaming - spark receives a record, decides which group this record should belong to, and updates that group. this updating is not allowed in append mode, hence append mode does not support aggregations
  • a spark streaming application is like a web server i.e. keeps running unlike when submitting batch jobs to spark
  • even a streaming application will stop at least at some point due to reasons like some failure, some maintenance, etc
  • so, we need to be able to handle this stopping and restarting gracefully
  • gracefully = exactly once processing
  • exactly once processing basically means neither should we end up reading an input twice, nor missing an input record
  • this is what checkpoint location helps achieve
  • checkpoint location maintains things like -
    • what was the input boundaries of the last micro batch
    • state information (e.g. running total of the word count)
  • we just saw how checkpoints helps spark achieve exactly once processing. however, exactly once processing also depends on sources and sinks - e.g. source should be replayable i.e. allow reading of old messages. e.g. using kafka / files as streaming sources allows for this. similarly, sinks should be idempotent i.e. it should recognize duplicates instead of adding duplicates to the data
  • what if our application has a bug? - we fix the spark code, we rerun spark-submit. now, can we rely on check pointing to continue the job from where it left off after the job was stopped and restarted?
    • yes, if our fix was something like filter out malformed records
    • no, if our fix changed the aggregation strategy etc, since maybe it messes up the checkpoint state altogether

Streaming Using Kafka

  • add the following dependency -
    1
    2
    3
    4
    5
    
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    
  • use the following to establish a connection -
    1
    2
    3
    4
    5
    
    Dataset<Row> kafkaSourceDf = spark.readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "invoices")
      .load();
    
  • when we try printing the schema - kafkaSourceDf.printSchema();, we get the following -
    1
    2
    3
    4
    5
    6
    7
    
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = true)
    |-- partition: integer (nullable = true)
    |-- offset: long (nullable = true)
    |-- timestamp: timestamp (nullable = true)
    |-- timestampType: integer (nullable = true)
    
  • the value is in binary format. here is how to extract all fields into dataframe friendly format
    • assume we create the schema of the payload somewhere
    • then, we can cast the value field to a string
    • then, call from_json on it, which also needs the schema
    • this means all our data would be available as a struct type under the attribute value
    • finally, based on this, i chained a .select, so that i do not have to access fields using value.attribute, but just using attribute -
    1
    2
    3
    
    Dataset<Row> flattenedDf = kafkaSourceDf
      .select(from_json(col("value").cast("string"), schema).alias("value"))
      .select("value.*")
    
  • this doc is great for debugging when writing kafka related code - creating topics, publishing to topics using kafka-producer, consuming from kafka-consumer, etc
  • now, when we try flattenedDf.printSchema();, we get the right schema which we can use in our transformations
  • to understand - how does kafka + spark actually work i.e. does spark rely on offset committing logic of kafka, or does spark itself maintain the offset inside the checkpoint directory
  • writing to kafka - while reading from kafka, we deserialized the value attribute. while writing to kafka, we need to convert our dataframe into two fields of key and value
    • combine all fields into a struct
    • convert this field to json
    • rename this condensed field to value
    • pick any other attribute to act as key
    1
    2
    3
    
    .select(
        to_json(struct("*")).alias("value"),
        col("InvoiceNumber").alias("key"));
    

Streaming Transformations

  • stateless transformations - do not need to maintain state across micro batches. e.g. filter, map, flatMap, explode, etc
  • stateful transformations - need to maintain state across micro batches. e.g. for computing totals etc as we process new records, the state needs to be stored as a part of the checkpoint. e.g. grouping, aggregations
  • now, stateless transformations do not support complete output mode. think why -
    • if our streaming transformations are only stateless, 10 input records would contain 10 output records
    • this means we will have to include input records as a part of the output every time
    • this means all records need to be stored in the state, which is not efficient for spark
  • so, as a side effect - we can run into out of memory issues when using spark streaming due to excessive state. spark stores all this state inside memory for efficiency
  • it also stores it in the checkpoint location so that for e.g. when the application dies / is stopped due to some reason, it can resume from where it left off
  • so, we have two concepts - time bound state and unbounded state
  • time bound state - e.g. we calculate a weekly running total. spark knows that it can get rid of records older than a week,since they do not contribute to the total. this is also called managed state, since spark can manage this state
  • unbounded state - there is no time bounds we can specify for the state. therefore, we ourselves need to specify some kind of cleanup logic for the state, so that our application does not encounter out of memory issues. this is also called unmanaged state, since the cleanup logic is on us to implement

Window Aggregations

  • this is the time bound state / managed state that we talked about above
  • trigger time - determines when a micro batch starts and ends
  • event time - the actual time when the event occurred
  • important - the bounds of the window we specify has nothing to do with the trigger time
  • the window we specify uses the event time to decide which window the record should be a part of
  • spark also handles late events - e.g. we get an event for 10.00-10.15 when we have already performed processing for 10.15-10.30 and 10.30-10.45
  • e.g. we create a window of 15 minutes -
    • this basically means a new column called window of type struct would be added to our dataset, with two fields - start and end
    • spark will automatically decide for us which of these groups a record belongs to, based on the column name we specify. this column acts as the event time - e.g. created time in this example
    • since this is basically inside a group, we can specify more columns to group on. e.g. we specify type column in the group by clause. then, we get windows for each of the type separately
    • finally, we perform an aggregation - all records where type is buy, have their amount attribute added to total buy, all records where type is sell, have their amount added to total sell
    • so basically think about whats in state of spark - for all groups i.e. windows, spark is storing the computed aggregate and updating it as and when new records arrive
    • confusion, note - remember how this window is so much more different than the windowing aggregation we saw earlier - there, there was no grouping or aggregation involved - based on our specification, we were automatically able to add a new column for running total
    1
    2
    3
    4
    5
    
    Dataset<Row> outputDf = stockSourceDf
      .groupBy(window(col("CreatedTime"), "15 minute"))
      .agg(
          sum(when(col("Type").equalTo("BUY"), col("Amount")).otherwise(lit("0"))).alias("TotalBuy"),
          sum(when(col("Type").equalTo("SELL"), col("Amount")).otherwise(lit("0"))).alias("TotalSell"));
    
  • remember - spark had to maintain old windows inside its state as well, to help it with late events
  • watermark - helps expire old window state, so that out of memory etc exceptions are not caused. remember how this is the biggest advantage of using managed state
  • so, we need to decide how late can an event be, post which -
    • we can simply ignore the event
    • we can clean up the state for that window
  • for this, we simply need to chain the withWatermark. note -
    • chain it before the group by clause
    • column name used for windowing and column name specified inside watermark should be the same
    1
    2
    
    .withWatermark("CreatedTime", "30 minutes")
    .groupBy(window(col("CreatedTime"), "15 minute"))
    
  • how should the cleanup happen? - all windows with end_time < (max_event_time - watermark) can be ejected from state (note - max_event_time i think means event with maximum time in the micro batch). e.g. say our watermark is 30 minutes, and we receive a record with event time = 10.48. all windows with end time before 10.48 - 30 = 10.18 would be ejected from the spark state. this is the managed state / automatic cleanup that we were talking about in time bound state
  • watermark and complete output mode do not make sense together - spark cannot cleanup state if it has to output all the records for every micro batch
  • recall how we had talked about append mode not working when we have group by etc in our streaming jobs, because append cannot update groups. however, think about watermarks - when the max_event_time - watermark moves, all windows with ends below this line can be closed. hence, when we introduce watermarks and windows with aggregations, spark supports append mode. all windows which have been declared closed by spark are output after the micro batch gets over
  • summary of the difference between output modes when using watermark + windowing -
    • complete - output all windows, ignore watermark concept
    • update - output all windows which were updated by the micro batch, eject all windows from state which are declared stale by spark via watermark concept
    • append - eject all windows from state and only output windows which have been declared stale by spark via watermark concept, do not output all windows that were updated like update output mode
  • tumbling windows vs sliding windows -
    • tumbling windows do not overlap, while sliding windows can have an overlap
    • my understanding - in tumbling windows, window duration = sliding interval, whereas in sliding windows, both are unequal
    • in tumbling windows, an event can be a part of only one window. in sliding windows, an event can be a part of multiple windows, e.g. 10.18 can be a part of 10.10-10.20 and 10.15-10.25
    • so, the only difference in syntax is we now pass two parameters - window duration and sliding window size
    1
    
    .groupBy(window(col("CreatedTime"), "15 minute", "5 minute"))
    

Streaming Joins

Streaming to Static

  • commonly used for stream enrichment
  • stateless - spark does not have to maintain any state - this is because every time we get an event, we can simply compute the rows it produces as a result of the join and output these results, since they would not change / the event would not be needed for computing future joins anymore
  • for each micro batch, spark is smart enough to refresh the static dataframe i.e. imagine when the application is already running, we insert new data into the static dataframe underlying source, e.g. jdbc. spark will reload the static dataframe with the new data when a new event comes in for the streaming dataframe
  • inner join is supported
  • left outer join is possible when the streaming dataframe is on the left. why - assume right outer join was allowed. spark would have to predict for the static dataframe’s record whether or not a row is present in the streaming dataframe. this cannot be concluded, since streams grow infinitely. this is why right (and full) outer joins are not supported

Streaming to Streaming

  • stateful - we need to maintain both sides of data forever in the state, unlike when joining streaming dataframe to static dataframe. remember how this is stateful, but streaming to static can be stateless
  • we can solve this problem using 🥁 withWatermark. specify a watermark on both streams being joined, so that spark can remove events that are stale
  • inner join is supported
  • left outer join is possible but with some limitations, TODO
  • TODO: spark interview question of memory
This post is licensed under CC BY 4.0 by the author.
Contents

Java

Elasticsearch