score:-1

Accepted answer

As below,if you don't want to write repeated x(i),you can process it in a loop. Example 1:

val strpcols = mvnmdata.map(x => x.split('|'))
  .map(x =>{
    val xbuffer = new ArrayBuffer[String]()
    for (i <- Array(0,1,5,6...)){
      xbuffer.append(x(i))
    }
    xbuffer
  })

If you only want to define the index list with start&end and the numbers to be excluded, see Example 2 of below:

scala> (1 to 10).toSet
res8: scala.collection.immutable.Set[Int] = Set(5, 10, 1, 6, 9, 2, 7, 3, 8, 4)

scala> ((1 to 10).toSet -- Set(2,9)).toArray.sortBy(row=>row)
res9: Array[Int] = Array(1, 3, 4, 5, 6, 7, 8, 10)

The final code you want:

  //define the function to process indexes
  def getSpecIndexes(start:Int, end:Int, removedValueSet:Set[Int]):Array[Int] = {
    ((start to end).toSet -- removedValueSet).toArray.sortBy(row=>row)
  }

  val strpcols = mvnmdata.map(x => x.split('|'))
    .map(x =>{
      val xbuffer = new ArrayBuffer[String]()
      //call the function
      for (i <- getSpecIndexes(0,100,Set(3,4,5,6))){
        xbuffer.append(x(i))
      }
      xbuffer
    })

score:0

The next solution provides an easy and scalable way to manage your column names and indices. It is based on a map which determines the column name/index relation. The map will also help us to handle both the index of the extracted column and its name.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructType, StructField}

val rdd = spark.sparkContext.parallelize(Seq(
"1|500|400|300",
"1|34|67|89",
"2|10|20|56",
"3|2|5|56",
"3|1|8|22"))

val dictColums = Map("c0" -> 0, "c2" -> 2)

// create schema from map keys
val schema = StructType(dictColums.keys.toSeq.map(StructField(_, StringType, true)))

val mappedRDD = rdd.map{line => line.split('|')}
                    .map{
                      cols => Row.fromSeq(dictColums.values.toSeq.map{cols(_)})
                    }

val df = spark.createDataFrame(mappedRDD, schema).show

//output
+---+---+
| c0| c2|
+---+---+
|  1|400|
|  1| 67|
|  2| 20|
|  3|  5|
|  3|  8|
+---+---+
  • First we declare dictColums in this example we will extract the cols "c0" -> 0 and "c2" -> 2
  • Next we create the schema from the keys of the map
  • The one map (which you already have) will split the line by |, the second one will create a Row containing the values that correspond to each item of dictColums.values

UPDATE:

You could also create a function from the above functionality in order to be able to reuse it multiple times:

import org.apache.spark.sql.DataFrame

def stringRddToDataFrame(colsMapping: Map[String, Int], rdd: RDD[String]) : DataFrame = {
  val schema = StructType(colsMapping.keys.toSeq.map(StructField(_, StringType, true)))

  val mappedRDD = rdd.map{line => line.split('|')}
                    .map{
                      cols => Row.fromSeq(colsMapping.values.toSeq.map{cols(_)})
                    }

  spark.createDataFrame(mappedRDD, schema)
}

And then use it for your case:

val cols = Map("c0" -> 0, "c1" -> 1, "c5" -> 5, ... "c23" -> 23)

val df = stringRddToDataFrame(cols, rdd)

Related Query

More Query from same tag