Terms Of Use

Privacy Policy

Featured snippets are MIT license

Gears & Masters

DevOpsnipp.com © 2020

medium.png

LoadXML.scala

Load XML in Scala

0

Votes

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
import com.databricks.spark.xml._
import spark.implicits._

case class Person (time: Double, id: Long, x: Double, y: Double,
                    angle: Double, position: Double, speed: Double,
                    slope: Double, movement: String, edge: String,
                    edge_type: String, edge_lane: String)


case class Vehicle (time: Double, id: Long, x: Double, y: Double, 
                    angle: Double, `type`: String, speed: Double,
                    pos: Double, lane: String, slope: Double)

case class Source (time: Double, person: Array[(String, Double, String,
                    Long, Double, Double, Double, Double, String )],
                  vehicle: Array[(String, Double, Long, String, Double,
                    Double, Double, String, Double, Double)])

type ParsedEdge = (String, String, String, String)

object LoadXML extends Serializable {

  val spark = SparkSession.builder()
                          .appName("LoadXML")
                          .master("local[9]")
                          .getOrCreate

  val sc = spark.sparkContext
  val sqlContext = new SQLContext(sc)

  def getEdgeData(t: String, m: String, e: String, l: String): ParsedEdge = {
    val edge_type =Option(t).map(_ => "intersection").getOrElse("road")
    val movement = Option(m).map(_ => "backward").getOrElse("forward")
    (edge_type, movement, e, l)
  }

  def parseEdge(edge: String): ParsedEdge = {
    val Pattern = raw"^(:)?(-)?(\d+)_(\d).*$$".r
    edge match {
      case Pattern(t, m, e, l) => getEdgeData(t, m, e, l)
      case _ => null
    }
  }

   // Read XML File
  def loadXML (path: String): Dataset[Source] = {
      spark.read
           .format("com.databricks.spark.xml")
           .option("rowTag", "timestep")
           .load(path)
           .withColumnRenamed("_time", "time")
           .as[Source]
  }

  def personDF(df: Dataset[Source]): Dataset[Person] = {
    df.drop("vehicle")
      .withColumn("person", explode(col("person")))
      .select($"time", $"person._id", $"person._x", $"person._y", $"person._angle",
        $"person._pos", $"person._speed", $"person._slope", $"person._edge")
      .map(r => {
        val (edge_type, movement, edge, edge_lane) = parseEdge(r getString 8)
        (r getDouble 0, r getLong 1, r getDouble 2, r getDouble 3, 
          r getDouble 4, r getDouble 5, r getDouble 6, r getDouble 7, 
          movement, edge, edge_type, edge_lane)})
      .withColumnRenamed("_1", "time")
      .withColumnRenamed("_2", "id")
      .withColumnRenamed("_3", "x")
      .withColumnRenamed("_4", "y")
      .withColumnRenamed("_5", "angle")
      .withColumnRenamed("_6", "position")
      .withColumnRenamed("_7", "speed")
      .withColumnRenamed("_8", "slope")
      .withColumnRenamed("_9", "movement")
      .withColumnRenamed("_10", "edge")
      .withColumnRenamed("_11", "edge_type")
      .withColumnRenamed("_12", "edge_lane")
      .as[Person]
  }

Vote Here

You must earn at least 1 vote on your snippets to be allowed to vote