score:1

Accepted answer

The example below read a .csv file as the same format as presented in the question.

There are some details that I would like to explain first.

In the table schema the field: rec_value: Decimal(2,1) would have to be rec_value: Decimal(3,1) for the following reason:

The DECIMAL type represents numbers with fixed precision and scale. When you create a DECIMAL column, you specify the precision, p, and scale, s. Precision is the total number of digits, regardless of the location of the decimal point. Scale is the number of digits after the decimal place. To represent the number 10.0 without a loss of precision, you would need a DECIMAL type with precision of at least 3, and scale of at least 1.

So the Hive table would be:

CREATE TABLE tab_data (
  rec_id INT,
  rec_name STRING,
  rec_value DECIMAL(3,1),
  rec_created TIMESTAMP
) STORED AS PARQUET;

The full scala code

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}

object CsvToParquet {

  val spark = SparkSession
    .builder()
    .appName("CsvToParquet")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
    .config("spark.sql.parquet.writeLegacyFormat", true) // To avoid issues with data type between Spark and Hive
                                                         // The convention used by Spark to write Parquet data is configurable.
                                                         // This is determined by the property spark.sql.parquet.writeLegacyFormat
                                                         // The default value is false. If set to "true",
                                                         // Spark will use the same convention as Hive for writing the Parquet data.
    .getOrCreate()

  val sc = spark.sparkContext

  val inputPath = "hdfs://host:port/user/...../..../tab_data.csv"
  val outputPath = "hdfs://host:port/user/hive/warehouse/test.db/tab_data"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      val DecimalType = DataTypes.createDecimalType(3, 1)

      /**
        * the data schema
        */
      val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
                   StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))

      /**
        * Reading the data from HDFS as .csv text file
        */
      val data = spark
        .read
        .option("sep","|")
        .option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
        .option("inferSchema",false)
        .schema(schema)
        .csv(inputPath)

       data.show(truncate = false)
       data.schema.printTreeString()

      /**
        * Writing the data as Parquet file
        */
      data
        .write
        .mode(SaveMode.Append)
        .option("compression", "none") // Assuming no data compression
        .parquet(outputPath)

    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

Input file as .csv tab separated fields

10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|24.0|2016-09-08  10:45:00.0
30|customer3|35.0|2016-09-10  03:26:00.0
40|customer1|46.0|2016-09-11  08:38:00.0
........

reading from Spark

+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created        |
+------+---------+---------+-------------------+
|10    |customer1|10.0     |2016-09-07 08:38:00|
|20    |customer2|24.0     |2016-09-08 10:45:00|
|30    |customer3|35.0     |2016-09-10 03:26:00|
|40    |customer1|46.0     |2016-09-11 08:38:00|
......

schema

root
 |-- rec_id: integer (nullable = true)
 |-- rec_name: string (nullable = true)
 |-- rec_value: decimal(3,1) (nullable = true)
 |-- rec_created: timestamp (nullable = true)

reading from Hive

SELECT *
FROM tab_data;

+------------------+--------------------+---------------------+------------------------+--+
| tab_data.rec_id  | tab_data.rec_name  | tab_data.rec_value  |  tab_data.rec_created  |
+------------------+--------------------+---------------------+------------------------+--+
| 10               | customer1          | 10                  | 2016-09-07 08:38:00.0  |
| 20               | customer2          | 24                  | 2016-09-08 10:45:00.0  |
| 30               | customer3          | 35                  | 2016-09-10 03:26:00.0  |
| 40               | customer1          | 46                  | 2016-09-11 08:38:00.0  |
.....

Hope this helps.


Related Query

More Query from same tag