score:0

assuming each id doesn't have too many matching records, you can use the collect_list aggregation function to collect all matching arrays into an array-of-arrays, and then a user defined function to sum over these nested arrays:

val flattenandsum = udf[int, mutable.seq[mutable.seq[int]]] { seqofarrays => seqofarrays.flatten.sum }

val sums = df.groupby($"id").agg(
  collect_list($"event.x0") as "arr0",
  collect_list($"event.x1") as "arr1",
  collect_list($"event.x2") as "arr2",
  collect_list($"event.x3") as "arr3"
).select($"id",
  flattenandsum($"arr0") as "0",
  flattenandsum($"arr1") as "1",
  flattenandsum($"arr2") as "2",
  flattenandsum($"arr3") as "3"
)

df.join(sums, "id")

alternatively, if that assumption cannot be made, you can create a user defined aggregation function to perform the flatten-and-sum on the fly. this is safer and potentially faster but requires a bit more work:

// implement a udaf:
class flattenandsum extends userdefinedaggregatefunction {
  override def inputschema: structtype = new structtype().add("arr", arraytype(integertype))
  override def bufferschema: structtype = new structtype().add("sum", integertype)
  override def datatype: datatype = integertype
  override def deterministic: boolean = true

  override def initialize(buffer: mutableaggregationbuffer): unit = buffer.update(0, 0)

  override def update(buffer: mutableaggregationbuffer, input: row): unit = {
    val current = buffer.getas[int](0)
    val toadd = input.getas[seq[int]](0).sum
    buffer.update(0, current + toadd)
  }

  override def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit = {
    buffer1.update(0, buffer1.getas[int](0) + buffer2.getas[int](0))
  }

  override def evaluate(buffer: row): any = buffer.getas[int](0)
}

// use it in aggregation:
val flattenandsum = new flattenandsum()

val sums = df.groupby($"id").agg(
  flattenandsum($"event.x0") as "0",
  flattenandsum($"event.x1") as "1",
  flattenandsum($"event.x2") as "2",
  flattenandsum($"event.x3") as "3"
)

df.join(sums, "id")

Related Query

More Query from same tag