Terms Of Use

Privacy Policy

Featured snippets are MIT license

Gears & Masters

DevOpsnipp.com © 2020

medium.png

call_pyspark.sh

Spark helpers

11

Votes

call_pyspark.sh

MESOS_IP=mesos://zk://10.2.95.5:2181/qs-dmitry-dgqt
DRIVER_IP=10.2.95.5
EXECUTOR_IMAGE=docker-dev.hli.io/ccm/hli-rspark-plink:2.0.1
CORES=16
RAM=50g

./bin/pyspark \
    --conf spark.master=${MESOS_IP} \
    --conf spark.driver.host=${DRIVER_IP} \
    --conf spark.driver.maxResultSize=5g \
    --conf spark.driver.memory=1g \
    --conf spark.driver.cores=1 \
    --conf spark.mesos.coarse=true \
    --conf spark.mesos.executor.docker.image=${EXECUTOR_IMAGE} \
    --conf spark.mesos.executor.home=/usr/local/spark \
    --conf spark.task.maxFailures=10 \
    --conf spark.executor.extraClassPath=/usr/local/spark/extra_jars/* \
    --conf spark.driver.extraClassPath=/usr/local/spark/extra_jars/* \
    --conf spark.sql.parquet.compression.codec=gzip \
    --conf spark.sql.warehouse.dir=file:///tmp \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.kryoserializer.buffer.max=1g \
    --conf spark.task.cpus=1 \
    --conf spark.executor.memory=${RAM} \
    --conf spark.cores.max=${CORES} \
    --conf spark.sql.shuffle.partitions=2000 \
    --conf spark.shuffle.spill=true \
    --conf spark.executor.heartbeatInterval=10 \
    --conf spark.mesos.executor.docker.volumes=/docker:/docker

call_spark_shell.sh

MESOS_IP=mesos://zk://10.2.95.5:2181/qs-dmitry-dgqt
DRIVER_IP=10.2.95.5
EXECUTOR_IMAGE=docker-dev.hli.io/ccm/hli-rspark-plink:2.0.1
CORES=16
RAM=50g

./bin/spark-shell \
    --conf spark.master=${MESOS_IP} \
    --conf spark.driver.host=${DRIVER_IP} \
    --conf spark.driver.maxResultSize=5g \
    --conf spark.driver.memory=1g \
    --conf spark.driver.cores=1 \
    --conf spark.mesos.coarse=true \
    --conf spark.mesos.executor.docker.image=${EXECUTOR_IMAGE} \
    --conf spark.mesos.executor.home=/usr/local/spark \
    --conf spark.task.maxFailures=10 \
    --conf spark.executor.extraClassPath=/usr/local/spark/extra_jars/* \
    --conf spark.driver.extraClassPath=/usr/local/spark/extra_jars/* \
    --conf spark.sql.parquet.compression.codec=gzip \
    --conf spark.sql.warehouse.dir=file:///tmp \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.kryoserializer.buffer.max=1g \
    --conf spark.task.cpus=1 \
    --conf spark.executor.memory=${RAM} \
    --conf spark.cores.max=${CORES} \
    --conf spark.sql.shuffle.partitions=2000 \
    --conf spark.shuffle.spill=true \
    --conf spark.executor.heartbeatInterval=10 \
    --conf spark.mesos.executor.docker.volumes=/docker:/docker

pyspark_program_host.py

# dependencies
# export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
# pip install py4j
# http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark

from pyspark import SparkContext, SparkConf
from time import sleep

master='mesos://zk://10.2.95.5:2181/qs-dmitry-dgqt'
spark_image='docker-dev.hli.io/ccm/hli-rspark-plink:2.0.1'
cores='32'
ram='50g'
appName='worker-01'

conf = SparkConf()\
    .setAppName(appName)\
    .setMaster(master)\
    .set('spark.driver.maxResultSize','1g')\
    .set('spark.driver.memory','1g')\
    .set('spark.driver.cores','1')\
    .set('spark.driver.extraClassPath','/usr/local/spark/extra_jars/*')\
    .set('spark.mesos.coarse','true')\
    .set('spark.mesos.executor.docker.image',spark_image)\
    .set('spark.mesos.executor.home','/usr/local/spark')\
    .set('spark.mesos.executor.docker.volumes','/docker:/docker')\
    .set('spark.executor.extraClassPath','/usr/local/spark/extra_jars/*')\
    .set('spark.task.maxFailures','100')\
    .set('spark.task.cpus','1')\
    .set('spark.executor.memory',ram)\
    .set('spark.cores.max',cores)\
    .set('spark.executor.heartbeatInterval','10')\
    .set('spark.sql.parquet.compression.codec','gzip')\
    .set('spark.sql.warehouse.dir','file:///tmp')\
    .set('spark.serializer','org.apache.spark.serializer.KryoSerializer')\
    .set('spark.sql.shuffle.partitions','200')\
    .set('spark.shuffle.spill','true')

sc = SparkContext(conf=conf)

res = sc.parallelize(range(10)).count()
print 'res: {}'.format(res)
sleep(3)

try:
    sc.stop()
except Exception:
    pass

Vote Here

You must earn at least 1 vote on your snippets to be allowed to vote