score:0
Here is my solution :
I am using SLF4j (with Log4j binding), in my base class of every spark job I have something like this:
import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass)
Just before the place where I use LOG
in distributed functional code, I copy logger reference to a local constant.
val LOG = this.LOG
It worked for me!
score:1
val log = Logger.getLogger(getClass.getName),
You can use "log" to write logs . Also if you need change logger properties you need to have log4j.properties in /conf folder. By default we will have a template in that location.
score:1
Making the logger transient and lazy does the trick
@transient lazy val log = Logger.getLogger(getClass.getName)
@transient
will tell the spark to not serialize it for all executors and lazy will cause the instance to be created when it is first used. In other words each executor will have their own instance of the logger. Serializing the logger is not a good idea anyway even if you can.
Ofcourse anything you put in the map() closure will run on the executor so will be found in executor logs and not the driver logs. For custom log4j properties on the executors you need to add the log4j.properties to executor classpath and send your log4j.properties to the executors.
This can be done by adding the following args to your spark-submit command --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties " --files ./log4j.properties
There are other ways to do set these configs but this one is the most common.
score:2
This is an old post but I want to provide my working solution which I just got after struggling a lot and still can be useful for others:
I want to print rdd contents inside rdd.map function but getting Task Not Serializalable Error
. This is my solution for this problem using scala static object which is extending java.io.Serializable
:
import org.apache.log4j.Level
object MyClass extends Serializable{
val log = org.apache.log4j.LogManager.getLogger("name of my spark log")
log.setLevel(Level.INFO)
def main(args:Array[String])
{
rdd.map(t=>
//Using object's logger here
val log =MyClass.log
log.INFO("count"+rdd.count)
)
}
}
score:4
If you need some code to be executed before and after a map
, filter
or other RDD
function, try to use mapPartition
, where the underlying iterator is passed explicitely.
Example:
val log = ??? // this gets captured and produces serialization error
rdd.map { x =>
log.info(x)
x+1
}
Becomes:
rdd.mapPartition { it =>
val log = ??? // this is freshly initialized in worker nodes
it.map { x =>
log.info(x)
x + 1
}
}
Every basic RDD
function is implemented with a mapPartition
.
Make sure to handle the partitioner explicitly and not to lose it: see Scaladoc, preservesPartitioning
parameter, this is critical for performances.
score:11
Use Log4j 2.x. The core logger has been made serializable. Problem solved.
Jira discussion: https://issues.apache.org/jira/browse/LOG4J2-801
"org.apache.logging.log4j" % "log4j-api" % "2.x.x"
"org.apache.logging.log4j" % "log4j-core" % "2.x.x"
"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"
score:54
You can use Akhil's solution proposed in
https://www.mail-archive.com/user@spark.apache.org/msg29010.html.
I have used by myself and it works.
Akhil Das Mon, 25 May 2015 08:20:40 -0700
Try this way:object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) }
Source: stackoverflow.com
Related Query
- Apache Spark logging within Scala
- Configuring Apache Spark Logging with Scala and logback
- Concatenating datasets of different RDDs in Apache spark using scala
- Splitting strings in Apache Spark using Scala
- How to read json data using scala from kafka topic in apache spark
- How can one list all csv files in an HDFS location within the Spark Scala shell?
- Re-using A Schema from JSON within a Spark DataFrame using Scala
- Null values from a csv on Scala and Apache Spark
- Unsupported literal type class in Apache Spark in scala
- Apache Toree and Spark Scala Not Working in Jupyter
- Convert RDD of Vector in LabeledPoint using Scala - MLLib in Apache Spark
- Convert Matrix to RowMatrix in Apache Spark using Scala
- Using Apache Spark in IntelliJ Scala Worksheet
- Apache Spark: Convert column with a JSON String to new Dataframe in Scala spark
- Scala IDE and Apache Spark -- different scala library version found in the build path
- Output contents of DStream in Scala Apache Spark
- Testing a utility function by writing a unit test in apache spark scala
- How to run Multi threaded jobs in apache spark using scala or python?
- How to turn off logging of generated code in Apache Spark 2?
- Convert Apache Spark Scala code to Python
- How to Compile Apache Spark with Scala 2.11.1 using SBT?
- Use of exponential on columns within scala spark how to make it work
- Bluemix Apache Spark Service - Scala - reading a file
- process a text file with xml column in apache spark scala
- Merging RDDs using Scala Apache Spark
- How do I group records that are within a specific time interval using Spark Scala or sql?
- Data preprocessing with apache spark and scala
- Apache Spark Scala - Hive insert into throwing a "too large frame error"
- What happens if I use scala parallel collections within a spark job?
- Apache Spark - Scala - how to FlatMap (k, {v1,v2,v3,...}) to ((k,v1),(k,v2),(k,v3),...)
More Query from same tag
- Why Final variables in scala are allowed to change values
- IForest spark scala
- How to declare (not define) a spark udf
- Adapting argument list in Scala
- How start akka cluster sharding?
- Immutability and custom deserialization in Scala
- Akka-streams: how to get flow names in metrics reported by kamon-akka
- With scala play, how to convert a class that has an ObjectId to Json?
- In Scala,Why is it not possible to change the value of var variable with different datatype
- Executable Scala Jar from Eclipse using Java main class
- How to use kafka.group.id and checkpoints in spark 3.0 structured streaming to continue to read from Kafka where it left off after restart?
- Converting types when using play json writes
- Running Play 2.4.x Locally Yields: java.lang.IllegalArgumentException: port out of range:-1
- How does the Seq shorthand for Scala actually work?
- Play 2.0 unexpected exception StackOverflowError: null
- Get the chosen actor in a RoundRobinPool?
- How to parametrize Scala Slick queries by WHERE clause conditions?
- easy way to replace term name in ast
- Scala Play External Redirect
- Flattening a List of nested Json objects in Scala
- Scala, can't implement generic java method
- How to get substring using patterns and replace quotes in json value field using scala?
- In Scala, How to get the returned TypeTag of a class method?
- Most scalable web stack for high performance Flash/Flex/AIR app?
- Spark "Task not serializable" when using field variables
- Scaldi : Bind[T < AkkaInjectable] to TestProbe.ref
- Where in select using Phantom doesn't resolve
- Get list with unique sub-lists
- Equivalence of two types without consideing type parameters
- How to recursively get the last element of each list contained within a map in Scala?