score:2
one way is to cast all columns to string. note that i'm changing the r.get(idx) to r.getstring(idx) in your code. the below works.
scala> val df = seq(("servicecent4","ap-1-ioo-ppp","241.206.155.172","06-12-18:17:42:34",162,53,1544098354885l)).todf("col1","col2","col3","eventtime","col4","col5","col6")
df: org.apache.spark.sql.dataframe = [col1: string, col2: string ... 5 more fields]
scala> df.show(1,false)
+------------+------------+---------------+-----------------+----+----+-------------+
|col1 |col2 |col3 |eventtime |col4|col5|col6 |
+------------+------------+---------------+-----------------+----+----+-------------+
|servicecent4|ap-1-ioo-ppp|241.206.155.172|06-12-18:17:42:34|162 |53 |1544098354885|
+------------+------------+---------------+-----------------+----+----+-------------+
only showing top 1 row
scala> df.printschema
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- eventtime: string (nullable = true)
|-- col4: integer (nullable = false)
|-- col5: integer (nullable = false)
|-- col6: long (nullable = false)
scala> val schema = df.schema
schema: org.apache.spark.sql.types.structtype = structtype(structfield(col1,stringtype,true), structfield(col2,stringtype,true), structfield(col3,stringtype,true), structfield(eventtime,stringtype,true), structfield(col4,integertype,false), structfield(col5,integertype,false), structfield(col6,longtype,false))
scala> val df2 = df.columns.foldleft(df){ (acc,r) => acc.withcolumn(r,col(r).cast("string")) }
df2: org.apache.spark.sql.dataframe = [col1: string, col2: string ... 5 more fields]
scala> df2.printschema
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- eventtime: string (nullable = true)
|-- col4: string (nullable = false)
|-- col5: string (nullable = false)
|-- col6: string (nullable = false)
scala> val x = df2.flatmap(r => (0 until schema.length).map { idx => ((idx, r.getstring(idx)), 1l) } )
x: org.apache.spark.sql.dataset[((int, string), long)] = [_1: struct<_1: int, _2: string>, _2: bigint]
scala> x.show(5,false)
+---------------------+---+
|_1 |_2 |
+---------------------+---+
|[0,servicecent4] |1 |
|[1,ap-1-ioo-ppp] |1 |
|[2,241.206.155.172] |1 |
|[3,06-12-18:17:42:34]|1 |
|[4,162] |1 |
+---------------------+---+
only showing top 5 rows
scala>
Source: stackoverflow.com
Related Query
- Spark Scala getting class not found scala.Any
- main class not found in spark scala program
- spark scala : jnibasedunixgroupsmapping: error getting groups for yyy: the user name could not be found
- Spark Scala Error - Error: Main method not found in class
- reduceByKey method not being found in Scala Spark
- java.lang.NoClassDefFoundError: Could not initialize class when launching spark job via spark-submit in scala code
- How to read the class of a Scala object extending Any but not AnyRef?
- Scala deserialization: class not found
- Scala IDE Error: Main method not found in class 'hello'
- Scala Main Class not found in Eclipse ide
- Spark RDD Class not Found
- Write an Arbitrary Value Not Found in a Case Class Using Play's (2.2) Scala JSON Combinators
- Playframework, scala case class and property not found
- Test Class Not Found when Unit Testing on Scala
- Command Strategy Class Not found in Spark 1.3
- Task not serializable while using custom dataframe class in Spark Scala
- Apache Spark Mongo-Hadoop Connector class not found
- EMR always gives me Class Not Found for Scala app
- Getting latest based on column condition in Spark Scala is not working
- getOrElse method not being found in Scala Spark
- Scala class not found on classpath when specifying jar in classpath
- Trying to read file from s3 with FLINK using the IDE getting Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
- Scala macro expansion of class with companion: type not found
- Spark Scala UDF : java.lang.UnsupportedOperationException: Schema for type Any is not supported
- Play framework: 2.8.0 - getting Class Not Found Exception when trying to run SBT command
- Not getting the output metrics for Dataframe writer save operation in Spark 2.4 using Scala though I am getting input Metrics
- Carbondata Class not found CarbonSessionStateBuilder Error in Spark
- class not found error - scala
- Error in running Scala Program: Main method not found in class main, please define the main method
- Spark Scala year/month function error: not found
More Query from same tag
- scala-maven-plugin mixed compile does not include src/main/java and can not find java class
- Play Framework / Scala: abstract repository and Json de/serialization
- Convert RDD[String] to RDD[Row] to Dataframe Spark Scala
- How multiple executors are managed on the worker nodes with a Spark standalone cluster?
- Copy specific files in SBT
- Type Class Derivation accessing default values
- How to modify current RequestHeader in playframework2?
- How to share an unmanaged directory between several sub-projects?
- Regex to strip trailing 0 but keep the value to a specific length
- Halting a Process[Task, O] on user input
- Is there a way to extend type declarations?
- Getting error while compiling akka actor source code using sbt
- Is there a framework for simple, asynchronous, HTTP integration I/O?
- How to add external jar files to a spark scala project
- Scala : How to break out of a nested for comprehension
- Case Class without Parameters
- java.util.NoSuchElementException: spark.executor.cores
- can anyone explain what @ _* mean in this code snippet?
- spark scala reducekey dataframe operation
- Parse dates with microseconds precision with dataframe in Spark
- Program never ends when use other implementation of Execution Context for Futures
- Scala: How to Define an Argument with Default Value in Partial Functions
- Types MapColumn, SetColumn, JsonColumn needs owner and record. What actually are these values?
- Append transformed columns to spark dataframe using scala
- Sbt run command warning UNRESOLVED DEPENDENCIES
- ERROR: 'Slice not a member' when implementing Asynchronous-iterators in Phantom-DSL
- Scala implicit conversions and parameters
- Optimization in spark (collect) & transformations of RDD inside an RDD
- Scala.swing freezes when updating
- Can only do 4 concurrent futures as maximum in Scala