score:8

Accepted answer

here is a possible solution for the specific case of linearregression and any other algorithm that support objective history (in this case, and linearregressiontrainingsummary does the job).

let's first create a minimal verifiable and complete example :

import org.apache.spark.ml.param.parammap
import org.apache.spark.ml.regression.{linearregression, linearregressionmodel}
import org.apache.spark.ml.tuning.{paramgridbuilder, trainvalidationsplit}
import org.apache.spark.mllib.util.{lineardatagenerator, mlutils}
import org.apache.spark.sql.sparksession

val spark: sparksession = sparksession.builder().getorcreate()

import org.apache.spark.ml.evaluation.regressionevaluator
import spark.implicits._

val data = {
  val tmp = lineardatagenerator.generatelinearrdd(
    spark.sparkcontext,
    nexamples = 10000,
    nfeatures = 4,
    eps = 0.05
  ).todf

  mlutils.convertvectorcolumnstoml(tmp, "features")
}

as you've noticed, when you want to generate data for testing purposes for spark-mllib or spark-ml, it's advised to use data generators.

now, let's train a linear regressor :

// create model of linear regression.
val lr = new linearregression().setmaxiter(1000)

// the following line will create two sets of parameters
val paramgrid = new paramgridbuilder().addgrid(lr.regparam, array(0.001)).addgrid(lr.fitintercept).addgrid(lr.elasticnetparam, array(0.5)).build()

// create trainer using validation split to evaluate which set of parameters performs the best.
// i'm using the regular regressionevaluator here
val trainvalidationsplit = new trainvalidationsplit()
  .setestimator(lr)
  .setevaluator(new regressionevaluator)
  .setestimatorparammaps(paramgrid)
  .settrainratio(0.8) // 80% of the data will be used for training and the remaining 20% for validation.

// to retrieve submodels, make sure to set collectsubmodels to true before fitting.
trainvalidationsplit.setcollectsubmodels(true)
// run train validation split, and choose the best set of parameters.
var model = trainvalidationsplit.fit(data)

now since our model is trained, all we need is to get the objective history.

the following part needs a bit of gymnastics between the model and sub-models object parameters.

in case you have a pipeline or so, this code needs to be modified, so use it carefully. it's just an example :

val objectivehist = spark.sparkcontext.parallelize(
  model.submodels.zip(model.getestimatorparammaps).map {
    case (m: linearregressionmodel, pm: parammap) =>
      val history: array[double] = m.summary.objectivehistory
      val idx: seq[int] = 1 until history.length
      // regparam, elasticnetparam, fitintercept
      val parameters = pm.toseq.map(pair => (pair.param.name, pair.value.tostring)) match {
        case seq(x, y, z) => (x._2, y._2, z._2)
      }
      (parameters._1, parameters._2, parameters._3, idx.zip(history).tomap)
  }).todf("regparam", "elasticnetparam", "fitintercept", "objectivehistory")

we can now examine those metrics :

objectivehist.show(false)
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
// |regparam|elasticnetparam|fitintercept|objectivehistory                                                                                       |
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
// |0.001   |0.5            |true        |[1 -> 0.4999999999999999, 2 -> 0.4038796441909531, 3 -> 0.02659222058006269, 4 -> 0.026592220340980147]|
// |0.001   |0.5            |false       |[1 -> 0.5000637621421942, 2 -> 0.4039303922115196, 3 -> 0.026592220673025396, 4 -> 0.02659222039347222]|
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+

you can notice that the training process actually stops after 4 iterations.

if you want just the number of iterations, you can do the following instead :

val objectivehist2 = spark.sparkcontext.parallelize(
  model.submodels.zip(model.getestimatorparammaps).map {
    case (m: linearregressionmodel, pm: parammap) =>
      val history: array[double] = m.summary.objectivehistory
      // regparam, elasticnetparam, fitintercept
      val parameters = pm.toseq.map(pair => (pair.param.name, pair.value.tostring)) match {
        case seq(x, y, z) => (x._2, y._2, z._2)
      }
      (parameters._1, parameters._2, parameters._3, history.size)
  }).todf("regparam", "elasticnetparam", "fitintercept", "iterations")

i've changed the number of features in the generator (nfeatures = 100) for the sake of demonstrations :

objectivehist2.show
// +--------+---------------+------------+----------+
// |regparam|elasticnetparam|fitintercept|iterations|
// +--------+---------------+------------+----------+
// |   0.001|            0.5|        true|        11|
// |   0.001|            0.5|       false|        11|
// +--------+---------------+------------+----------+

Related Query

More Query from same tag