apache spark - How to calculate difference of time between two records using Scala? -
i want calculate time difference between events of session using scala.
-- given source csv file shown below:
header "session","events","timestamp","records" data "session_1","event_1","2015-01-01 10:10:00",100 "session_1","event_2","2015-01-01 11:00:00",500 "session_1","event_3","2015-01-01 11:30:00",300 "session_1","event_4","2015-01-01 11:45:00",300 "session_2","event_1","2015-01-01 10:10:00",100 "session_2","event_2","2015-01-01 11:00:00",500
required output
header "session","events","time_spent_in_minutes","total_records" data "session_1","event_1","50",100 "session_1","event_2","30",600 "session_1","event_3","15",900 "session_1","event_4","0",1200 "session_2","event_1","50",100 "session_2","event_2","0",600
where time_spend_in_minutes difference between current_event , next event given session. header not required in target have.
i new scala here have far:
$ cat test.csv "session_1","event_1","2015-01-01 10:10:00",100 "session_1","event_2","2015-01-01 11:00:00",500 "session_1","event_3","2015-01-01 11:30:00",300 "session_1","event_4","2015-01-01 11:45:00",300 "session_2","event_1","2015-01-01 10:10:00",100 "session_2","event_2","2015-01-01 11:00:00",500 scala> val sessionfile = sc.textfile("test.csv"). map(_.split(',')). map(e => (e(1).trim, sessions(e(0).trim,e(1).trim,e(2).trim,e(3).trim.toint))). foreach(println) ("event_1",sessions("session_2","event_1","2015-01-01 10:10:00",100)) ("event_1",sessions("session_1","event_1","2015-01-01 10:10:00",100)) ("event_2",sessions("session_2","event_2","2015-01-01 11:00:00",500)) ("event_2",sessions("session_1","event_2","2015-01-01 11:00:00",500)) ("event_3",sessions("session_1","event_3","2015-01-01 11:30:00",300)) ("event_4",sessions("session_1","event_4","2015-01-01 11:45:00",300)) sessionfile: unit = () scala>
here solution uses joda time library.
val input = """"session_1","event_1","2015-01-01 10:10:00",100 "session_1","event_2","2015-01-01 11:00:00",500 "session_1","event_3","2015-01-01 11:30:00",300 "session_1","event_4","2015-01-01 11:45:00",300 "session_2","event_1","2015-01-01 10:10:00",100 "session_2","event_2","2015-01-01 11:00:00",500"""
create rdd text input, can read file using sc.textfile
import org.joda.time.format._ import org.joda.time._ def strtotime(s: string):long = { datetimeformat.forpattern(""""yyyy-mm-dd hh:mm:ss"""") .parsedatetime(s).getmillis()/1000 } val r1 = sc.parallelize(input.split("\n")) .map(_.split(",")) .map(x => (x(0), (x(1), x(2), x(3)))) .groupby(_._1) .map(_._2.map{ case(s, (e, timestr, r)) => (s, (e, strtotime(timestr), r))} .toarray .sortby( z => z match { case (session, (event, time, records)) => time}))
converted time "2015-01-01 10:10:00" seconds epoch, , sorted time.
val r2 = r1.map(x => x :+ { val y = x.last; y match { case (session, (event, time, records)) => (session, (event, time, "0")) }})
added event in each session, params same last event of session except record count. allows time-duration calculation provide "0" in last event.
use sliding
pairs of events.
val r3 = r2.map(x => x.sliding(2).toarray) val r4 = r3.map(x => x.map{ case array((s1, (e1, t1, c1)), (s2, (e2, t2, c2))) => (s1, (e1, (t2 - t1)/60, c1)) } )
use scan
add records-count in incremental way.
val r5 = r4.map(x => x.zip(x.map{ case (s, (e, t, r)) => r.toint} .scan(0)(_+_) .drop(1))) val r6 = r5.map(x => x.map{ case ((s, (e, t, r)), recordstillnow) => s"${s},${e},${t},${recordstillnow}" }) val r7 = r6.flatmap(x => x) r7.collect.mkstring("\n") //"session_2","event_1",50,100 //"session_2","event_2",0,600 //"session_1","event_1",50,100 //"session_1","event_2",30,600 //"session_1","event_3",15,900 //"session_1","event_4",0,1200