Spark Driver Java

Running SparkSQL in standalone mode

0

Votes

pom.xml

  <properties>
      <spark.version>2.3.0</spark.version>
  </properties>

  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
  </dependency>

run.sh

#
# 1) Unpack Spark archive to $SPARK_HOME
# 2) "cp conf/spark-env.sh.template conf/spark-env.sh"
# 3) edit "conf/spark-env.sh"
# 4) make sure "spark-env.sh" scripts are identical on all master and worker nodes
#

# 5) To actually start Spark master and worker(s), run from $SPARK_HOME directory:

# .. on master
"./sbin/start-master.sh"

# .. on worker(s)
"./bin/spark-class org.apache.spark.deploy.worker.Worker spark://127.0.0.1:8091"

spark-env.sh

# Options for the daemons used in the standalone deploy mode
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=8091
export SPARK_MASTER_WEBUI_PORT=8090
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=24g

SparkDriver.java

import org.apache.spark.sql.SparkSession;

public class SparkDriver {
    public static void main(String[] args) {
        final SparkSession session = SparkSession.builder().
            appName("MySparkApp" + System.currentTimeMillis()).
            master(SparkEnvCfg.sparkMasterUrl()).
            config(SparkEnvCfg.SPARK_EXECUTOR_MEMORY, "1g").
            config(SparkEnvCfg.SPARK_SERIALIZER, SparkEnvCfg.KRYO).
            config(SparkEnvCfg.SPARK_SQL_SHUFFLE_PARTITIONS, "2").
            config(SparkEnvCfg.SPARK_EXECUTOR_EXTRA_JAVA_OPTIONS, SparkEnvCfg.JMXREMOTE_ENABLED).
            getOrCreate();

        // TODO use session to create and manipulate datasets
    }
}

SparkDriver.scala

import java.io.File
import org.apache.spark.sql.SparkSession

object SparkDriver {
  def sparkSession(name : String) : SparkSession = {
    SparkSession.builder().
      appName(name).
      master("local").
      config(SparkCtxCfg.SPARK_EXECUTOR_MEMORY, "1g").
      config(SparkCtxCfg.SPARK_SERIALIZER, SparkCtxCfg.KRYO).
      config(SparkCtxCfg.SPARK_SQL_SHUFFLE_PARTITIONS, "2").
      config(SparkCtxCfg.SPARK_WAREHOUSE_DIR, "target/spark-warehouse").
      config(SparkCtxCfg.SPARK_JARS, SparkCtxCfg.toAbsolutePaths("", "")).
      config(SparkCtxCfg.SPARK_DRIVER_HOST, "localhost").
      config(SparkCtxCfg.SPARK_DRIVER_PORT, "31000").
      getOrCreate()
  }
}

object SparkCtxCfg {
  val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"

  val SPARK_SERIALIZER = "spark.serializer"

  val ALLOW_MULTIPLE_CONTEXTS = "spark.driver.allowMultipleContexts"

  val SPARK_JARS = "spark.jars"

  val SPARK_WAREHOUSE_DIR = "spark.sql.warehouse.dir"

  val KRYO = "org.apache.spark.serializer.KryoSerializer"

  val SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"

  val DEFAULT_SPARK_MASTER_URL = "spark://127.0.0.1:7077"

  val SPARK_DRIVER_HOST = "spark.driver.host"

  val SPARK_DRIVER_PORT = "spark.driver.port"

  def envProperty(name : String, otherwise : String) : String = {
    val prop = System.getProperty(name)
    if (prop == null) otherwise else prop
  }

  def availableProcessors() : String = {
    Integer.toString(Runtime.getRuntime.availableProcessors())
  }

  def toAbsolutePaths(jarsString: String, baseDir: String): String = {
    if (jarsString == null || jarsString.length == 0) {
      return ""
    }
    val libDir: String = if (baseDir.endsWith(File.separator)) baseDir
    else baseDir + File.separator
    toAbsolutePaths(libDir, jarsString.split(",")).mkString(",")
  }

  private def toAbsolutePaths(libDir: String, jarFileNames: Array[String]): Array[String] = {
    jarFileNames.map(jar => libDir + jar)
  }
}

SparkEnvCfg.java

public final class SparkEnvCfg {
    public static final String SPARK_EXECUTOR_MEMORY = "spark.executor.memory";

    public static final String SPARK_SERIALIZER = "spark.serializer";

    public static final String SPARK_EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";

    public static final String KRYO = "org.apache.spark.serializer.KryoSerializer";

    public static final String JMXREMOTE_ENABLED = "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=" + executorJmxPort();

    public static final String SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions";

    public static String sparkMasterUrl() {
        return envProperty("spark.master.url", "spark://127.0.0.1:8091");
    }

    public static String cacheBatchSize() {
        return envProperty("spark.cache.batch", "100000");
    }

    public static String executorMemory() {
        return envProperty("spark.executor.memory", "24g");
    }

    private static String executorJmxPort() {
        return envProperty("spark.executor.jmx", "20000");
    }

    public static String envProperty(String name, String otherwise) {
        final String prop = System.getProperty(name);
        return prop == null ? otherwise : prop;
    }    
}

Vote Here

You must earn at least 1 vote on your snippets to be allowed to vote

Terms Of Use

Privacy Policy

Featured snippets are MIT license

Gears & Masters

Advertise

DevOpsnipp.com © 2020

medium.png