score:1
The example below read a .csv
file as the same format as presented in the question.
There are some details that I would like to explain first.
In the table schema the field: rec_value: Decimal(2,1)
would have to be rec_value: Decimal(3,1)
for the following reason:
The DECIMAL
type represents numbers with fixed precision
and scale
.
When you create a DECIMAL
column, you specify the precision
, p, and scale
, s.
Precision
is the total number of digits, regardless of the location of the decimal point.
Scale
is the number of digits after the decimal place.
To represent the number 10.0 without a loss of precision, you would need a
DECIMAL
type with precision
of at least 3, and scale
of at least 1.
So the Hive
table would be:
CREATE TABLE tab_data (
rec_id INT,
rec_name STRING,
rec_value DECIMAL(3,1),
rec_created TIMESTAMP
) STORED AS PARQUET;
The full scala code
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}
object CsvToParquet {
val spark = SparkSession
.builder()
.appName("CsvToParquet")
.master("local[*]")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.config("spark.sql.parquet.writeLegacyFormat", true) // To avoid issues with data type between Spark and Hive
// The convention used by Spark to write Parquet data is configurable.
// This is determined by the property spark.sql.parquet.writeLegacyFormat
// The default value is false. If set to "true",
// Spark will use the same convention as Hive for writing the Parquet data.
.getOrCreate()
val sc = spark.sparkContext
val inputPath = "hdfs://host:port/user/...../..../tab_data.csv"
val outputPath = "hdfs://host:port/user/hive/warehouse/test.db/tab_data"
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
try {
val DecimalType = DataTypes.createDecimalType(3, 1)
/**
* the data schema
*/
val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))
/**
* Reading the data from HDFS as .csv text file
*/
val data = spark
.read
.option("sep","|")
.option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
.option("inferSchema",false)
.schema(schema)
.csv(inputPath)
data.show(truncate = false)
data.schema.printTreeString()
/**
* Writing the data as Parquet file
*/
data
.write
.mode(SaveMode.Append)
.option("compression", "none") // Assuming no data compression
.parquet(outputPath)
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
Input file as .csv
tab separated fields
10|customer1|10.0|2016-09-07 08:38:00.0
20|customer2|24.0|2016-09-08 10:45:00.0
30|customer3|35.0|2016-09-10 03:26:00.0
40|customer1|46.0|2016-09-11 08:38:00.0
........
reading from Spark
+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created |
+------+---------+---------+-------------------+
|10 |customer1|10.0 |2016-09-07 08:38:00|
|20 |customer2|24.0 |2016-09-08 10:45:00|
|30 |customer3|35.0 |2016-09-10 03:26:00|
|40 |customer1|46.0 |2016-09-11 08:38:00|
......
schema
root
|-- rec_id: integer (nullable = true)
|-- rec_name: string (nullable = true)
|-- rec_value: decimal(3,1) (nullable = true)
|-- rec_created: timestamp (nullable = true)
reading from Hive
SELECT *
FROM tab_data;
+------------------+--------------------+---------------------+------------------------+--+
| tab_data.rec_id | tab_data.rec_name | tab_data.rec_value | tab_data.rec_created |
+------------------+--------------------+---------------------+------------------------+--+
| 10 | customer1 | 10 | 2016-09-07 08:38:00.0 |
| 20 | customer2 | 24 | 2016-09-08 10:45:00.0 |
| 30 | customer3 | 35 | 2016-09-10 03:26:00.0 |
| 40 | customer1 | 46 | 2016-09-11 08:38:00.0 |
.....
Hope this helps.
Source: stackoverflow.com
Related Query
- How to read from textfile(String type data) map and load data into parquet format(multiple columns with different datatype) in Spark scala dynamically
- How to save data in parquet format and append entries
- How to merge two maps into one with keys from the first map and merged values?
- how to extract the column name and data type from nested struct type in spark
- How to read input from a file and convert data lines of the file to List[Map[Int,String]] using scala?
- How to load dynamic data into cassandra table? How to read csv file wih header also?
- Incrementally Load Data from RDBMS and Write to Parquet
- How to generate the random values of map from a given set of values and then store the key and values into separate variables in scala
- How to read data from Amazon QLDB using Spark and Scala/PySpark?
- How to read data from dynamo db table into dataframe?
- Unable to read data from Map of Primitive data type from Cassandra DB using Scala
- Scala - how to map data from a file into different formats?
- How to load large amount of data from MySQL and save as text file?
- How to read data from json array and pass it to Gatling on run time in loop
- How to read the data of the last 3 days from a folder with parquet files?
- From R to Scala. How can I read a CSV in Scala, save it to a res slot and then combined those res slots into a sample csv?
- How to connect to Oracle database from Scala and read a huge data
- How to Read data from external API in Kafka producer and send it to Kafka consumer in Scala
- How to select 13 digit timestamp column from parquet file,convert it to date and store as a data frame?
- How to use JDBC source to write and read data in (Py)Spark?
- How are Scala collections able to return the correct collection type from a map operation?
- How do I read a parquet in PySpark written from Spark?
- How can I save an RDD into HDFS and later read it back?
- How to read records in JSON format from Kafka using Structured Streaming?
- How to read and write Map<String, Object> from/to parquet file in Java or Scala?
- How to read from TCP and write to stdout?
- Spark-SQL : How to read a TSV or CSV file into dataframe and apply a custom schema?
- Reading JSON files into Spark Dataset and adding columns from a separate Map
- How to read json data using scala from kafka topic in apache spark
- How to build a Map of lists of map from type safe config in scala
More Query from same tag
- How to find the last occurrence of an element in a Scala List?
- Scala: Generic isInstanceOf based on sealed hierarchy
- How to convert each integers to its absolute value in Scala lists?
- Applying recursive operation on a case class in Scala
- Scala equivalent for ActiveSupport's Object.try in Ruby
- Scala avoid using null
- No Json deserializer found for type Option[reactivemongo.bson.BSONObjectID]
- Time and space complexity using Tail recursion
- Concatenation of string with mkstring versus string interpolate
- Creating scalable solution without Futures?
- Scala: checking if an object is Numeric
- ScalaTest does not pass even if conditions are verified
- Play framework Scala Reporting tool
- To use or not to use Scala for new Java projects?
- Extract a subset of a tuple in Scala 3
- Scala override val in constructor
- What is the Lift-way to convert Box[Elem] to LiftResponse?
- intellij with sbt scala and android dev
- Mockito scala Ambiguous reference to overloaded definition
- Specify negative path match for spray routes
- What are the options to set base packaged directory for a package using sbt-native-packager?
- Array of doubles in scala
- why this list means it has 3 or 3 more elements in scala
- Scala Annotation Inheritance
- Scala underscore asterisk in Variadic Function?
- Scala Logging: is LazyLogging blocking or non-blocking?
- Scala Execute List of Futures Concurrently
- spass object index is not a member of package views.html
- Flattening List + Future for Scala
- Why the java version 8 which is selected in jenv is not in fact the current java on my Mac?