score:-1

Assuming you do not want to do it after you have written the file to disk, you could:

1. Turn everything to String in both your dataframes. However, the output will look like:

 | REQUEST_DATE | 2019-02-05 20:00:00 |                     |        |
 | USER         | Kate                |                     |        |
 | SEARCH_TYPE  | Global              |                     |        |
 |              |                     |                     |        |
 | NAME         | START_DATE          | END_DATE            | STATUS |
 | Alex         | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
 | Bob          | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
 | Mark         | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
 | Mark         | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
 | Meggy        | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
  1. Build your customer output writer that will add your header before saving. You can find more info there - look for the save/write part.

Update

If you want to do #1, here is the code to transform the first dataframe (with te data):

Dataset<Row> transitionDf = dataDf
    .withColumn("_c1", dataDf.col("NAME"))
    .withColumn("_c2",
        dataDf.col("START_DATE").cast(DataTypes.StringType))
    .withColumn("_c3",
        dataDf.col("END_DATE").cast(DataTypes.StringType))
    .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
    .drop("NAME")
    .drop("START_DATE")
    .drop("END_DATE")
    .drop("STATUS");

The key is to cast() your columns so theyYou can the use unionByName() to merge both dataframes. The whole code, in Java (I don't do Scala) would be something along this line:

package net.jgp.labs.spark.l901Union;

import java.sql.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Use of unionByName() to create a complex header on a dataframe.
 * 
 * @author jgp
 */
public class UnionApp {
  private SimpleDateFormat format = null;

  public UnionApp() {
    this.format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  }

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   * @throws ParseException
   */
  public static void main(String[] args) throws ParseException {
    UnionApp app = new UnionApp();
    app.start();
  }

  /**
   * The processing code.
   * 
   * @throws ParseException
   */
  private void start() throws ParseException {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("expr()")
        .master("local")
        .getOrCreate();

    // DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
    // HH:mm:ss", Locale.ENGLISH);

    // Data
    StructType dataSchema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "NAME",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "START_DATE",
            DataTypes.DateType,
            false),
        DataTypes.createStructField(
            "END_DATE",
            DataTypes.DateType,
            false),
        DataTypes.createStructField(
            "STATUS",
            DataTypes.StringType,
            false) });
    List<Row> dataRows = new ArrayList<Row>();
    dataRows.add(RowFactory.create("Alex", toDate("2018-01-01 00:00:00"),
        toDate("2018-02-01 00:00:00"), "OUT"));
    dataRows.add(RowFactory.create("Bob", toDate("2018-02-01 00:00:00"),
        toDate("2018-02-05 00:00:00"), "IN"));
    dataRows.add(RowFactory.create("Mark", toDate("2018-02-01 00:00:00"),
        toDate("2018-03-01 00:00:00"), "IN"));
    dataRows.add(RowFactory.create("Mark", toDate("2018-05-01 00:00:00"),
        toDate("2018-08-01 00:00:00"), "OUT"));
    dataRows.add(RowFactory.create("Meggy", toDate("2018-02-01 00:00:00"),
        toDate("2018-02-01 00:00:00"), "OUT"));
    Dataset<Row> dataDf = spark.createDataFrame(dataRows, dataSchema);
    dataDf.show();
    dataDf.printSchema();

    // Header
    StructType headerSchema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "_c1",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "_c2",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "_c3",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "_c4",
            DataTypes.StringType,
            false) });
    List<Row> headerRows = new ArrayList<Row>();
    headerRows.add(RowFactory.create("REQUEST_DATE",
        format.format(new java.util.Date()), "", ""));
    headerRows.add(RowFactory.create("USER", "Kate", "", ""));
    headerRows.add(RowFactory.create("SEARCH_TYPE", "Global", "", ""));
    headerRows.add(RowFactory.create("", "", "", ""));
    headerRows
        .add(RowFactory.create("NAME", "START_DATE", "END_DATE", "STATUS"));
    Dataset<Row> headerDf = spark.createDataFrame(headerRows, headerSchema);
    headerDf.show(false);
    headerDf.printSchema();

    // Transition
    Dataset<Row> transitionDf = dataDf
        .withColumn("_c1", dataDf.col("NAME"))
        .withColumn("_c2",
            dataDf.col("START_DATE").cast(DataTypes.StringType))
        .withColumn("_c3",
            dataDf.col("END_DATE").cast(DataTypes.StringType))
        .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
        .drop("NAME")
        .drop("START_DATE")
        .drop("END_DATE")
        .drop("STATUS");
    transitionDf.show(false);
    transitionDf.printSchema();

    // Union
    Dataset<Row> unionDf = headerDf.unionByName(transitionDf);
    unionDf.show(false);
    unionDf.printSchema();
  }

  private Date toDate(String dateAsText) throws ParseException {
    java.util.Date parsed;
    parsed = format.parse(dateAsText);
    return new Date(parsed.getTime());
  }
}

I saved that as part of my Spark and Java labs and GitHub. The equivalent Scala code would probably be a little more compact :).


Related Query

More Query from same tag