score:78
Just use Source.fromFile(...).getLines
as you already stated.
That returns an Iterator, which is already lazy (You'd use stream as a lazy collection where you wanted previously retrieved values to be memoized, so you can read them again)
If you're getting memory problems, then the problem will lie in what you're doing after getLines. Any operation like toList
, which forces a strict collection, will cause the problem.
score:4
UPDATE 2020/08/30: Please use the Scala library, kantan.csv, for the most accurate and correct implementation of RFC 4180 which defines the .csv
MIME-type.
While I enjoyed the learning process I experienced creating the solution below, please refrain from using it as I have found a number of issues with it especially at scale. To avoid the obvious technical debt arising from my solution below, choosing a well-maintained RFC driven Scala native solution should be how you take care of your current and future clients.
If you are looking to process the large file line-by-line while avoiding requiring the entire file's contents be loaded into memory all at once, then you can use the Iterator
returned by scala.io.Source
.
I have a small function, tryProcessSource
, (containing two sub-functions) which I use for exactly these types of use-cases. The function takes up to four parameters, of which only the first is required. The other parameters have sane default values provided.
Here's the function profile (full function implementation is at the bottom):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
The first parameter, file: File
, is required. And it is just any valid instance of java.io.File
which points to a line-oriented text file, like a CSV.
The second parameter, parseLine: (Int, String) => Option[List[String]]
, is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int
, unparsedLine: String
. And then return an Option[List[String]]
. The function may return a Some
wrapped List[String]
consisting of the valid column values. Or it may return a None
which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, line) => Some(List(line))
is provided. This default results in the entire line being returned as a single String
value.
The third parameter, filterLine: (Int, List[String]) => Option[Boolean]
, is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int
, parsedValues: List[String]
. And then return an Option[Boolean]
. The function may return a Some
wrapped Boolean
indicating whether this particular line should be included in the output. Or it may return a None
which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(true)
is provided. This default results in all lines being included.
The fourth and final parameter, retainValues: (Int, List[String]) => Option[List[String]]
, is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int
, parsedValues: List[String]
. And then return an Option[List[String]]
. The function may return a Some
wrapped List[String]
consisting of some subset and/or alteration of the existing column values. Or it may return a None
which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(values)
is provided. This default results in the values parsed by the second parameter, parseLine
.
Consider a file with the following contents (4 lines):
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
The following calling profile...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
...results in this output for tryLinesDefaults
(the unaltered contents of the file):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
The following calling profile...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
...results in this output for tryLinesParseOnly
(each line parsed into the individual column values):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
The following calling profile...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
...results in this output for tryLinesIrvingTxNoHeader
(each line parsed into the individual column values, no header and only the two rows in Irving,Tx):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
Here's the entire tryProcessSource
function implementation:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
While this solution is relatively succinct, it took me considerable time and many refactoring passes before I was finally able to get to here. Please let me know if you see any ways it might be improved.
UPDATE: I have just asked the issue below as it's own StackOverflow question. And it now has an answer fixing the error mentioned below.
I had the idea to try and make this even more generic changing the retainValues
parameter to transformLine
with the new generics-ified function definition below. However, I keep getting the highlight error in IntelliJ "Expression of type Some[List[String]] doesn't conform to expected type Option[A]" and wasn't able to figure out how to change the default value so the error goes away.
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
Any assistance on how to make this work would be greatly appreciated.
score:14
I hope you don't mean Scala's collection.immutable.Stream
with Stream. This is not what you want. Stream is lazy, but does memoization.
I don't know what you plan to do, but just reading the file line-by-line should work very well without using high amounts of memory.
getLines
should evaluate lazily and should not crash (as long as your file does not have more than 2³² lines, afaik). If it does, ask on #scala or file a bug ticket (or do both).
Source: stackoverflow.com
Related Query
- How do I read a large CSV file with Scala Stream class?
- How do I read a non standard csv file into dataframe with python or scala
- How to create pivot in read csv file with scala
- How can I read a large CSV file into Python with speed?
- How to read a text file with mixed encodings in Scala or Java?
- Read CSV in Scala into case class instances with error handling
- How to emit newer class file versions with Scala (50.0/51.0)?
- How to read csv file into an Array of arrays in scala
- How to read a file in Scala with Play! 2.0?
- How to read json file and convert to case class with Spark and Spray Json
- How to parse text file with line feeds with Scala to build a CSV file?
- how to read CSV file in scala
- How to specify a name to CSV file that I save to S3 with Scala
- how to read a .dat file with delimiter /u0001 and record next record will be separating by next line in spark with scala
- How to read a csv file and assign values to the variable in spark scala
- SCALA: Read YAML file using Scala with class constructor
- How to parse a csv with matching case class and store the output to treemap[Int, List[List[InputConfig]] object in scala
- How to Handle last chunk of large File with java or scala
- How to read csv file in dataframe with different delimiter in header as ''," and rest of the rows are separated with "|"
- Read a CSV file with , as delim and numeric data also contain , separator to create RDD in Spark using Scala
- read csv file with json as string in csv and convert to json apache spark scala
- How to read from a csv file to create a scala Map object?
- read a large csv file and separate it according to conditions using scala / spark
- How to update a mongo record using Rogue with MongoCaseClassField when case class contains a scala Enumeration
- How to read a file as a byte array in Scala
- In Scala, how to read a simple CSV file having a header in its first line?
- How to read gzip'd file in Scala
- how to read properties file in scala
- How to load JSON file using Play with Scala
- Forward a file upload stream to S3 through Iteratee with Play2 / Scala
More Query from same tag
- Recursive call for try in scala
- Debugging a sbt project in IntelliJ
- Scala incompatibility with Java 9 - java.lang.NoClassDefFoundError
- How to write unit tests in Spark 2.0+?
- What (exactly) are "First Class" modules?
- Scala pattern matching with mixins
- scala Eclipse is not opening and giving error: org.eclipse.e4.core.di.InjectionException: java.lang.NoClassDefFoundError
- Checking how many elements a set contains
- Simplify/DRY up a case statement in Scala for Twirl Templates
- How to get just directory name from HDFS
- How is scalaz able to do "A \/ B", and how can I do my own "B.??" or "A <??> B"
- AkkaHttp: Process incoming requests in parallel with multiple processes
- Rewrite this SQL query using scala slick
- Scalding job failing with VerifyError on EMR version 4.2.0
- Idiomatically defining dynamic tasks in SBT 0.13?
- Scala Type inference for anonymous function declaration
- Pre-persistence validation for Scala case class using Salat/Casbah
- How to rewrite a for-loop Seq output into Stream output?
- How to Initialize an Immutable val outside scope of class
- Scala Flatmap on Array inside the part of the value
- Scala Play: Implement custom QueryStringBindable with an optional field
- Dynamic HTTP request in Gatling
- Surprising PartialFunction literal
- Scala elegant list comprehension as in F#
- Default argument value method location
- Error in "Eclipse Plugin for Scala" while compiling a Spark class
- Install scalatest in Scala IDE for Eclipse
- Why is appending to a list bad?
- Running a main application inside of a play app
- Linear regression weights and prediction in spark