apache spark - How to calculate difference of time between two records using Scala? -


i want calculate time difference between events of session using scala.

-- given source csv file shown below:

header  "session","events","timestamp","records" data "session_1","event_1","2015-01-01 10:10:00",100 "session_1","event_2","2015-01-01 11:00:00",500 "session_1","event_3","2015-01-01 11:30:00",300 "session_1","event_4","2015-01-01 11:45:00",300 "session_2","event_1","2015-01-01 10:10:00",100 "session_2","event_2","2015-01-01 11:00:00",500 

required output

header  "session","events","time_spent_in_minutes","total_records" data "session_1","event_1","50",100 "session_1","event_2","30",600 "session_1","event_3","15",900 "session_1","event_4","0",1200 "session_2","event_1","50",100 "session_2","event_2","0",600 

where time_spend_in_minutes difference between current_event , next event given session. header not required in target have.

i new scala here have far:

$ cat test.csv "session_1","event_1","2015-01-01 10:10:00",100 "session_1","event_2","2015-01-01 11:00:00",500 "session_1","event_3","2015-01-01 11:30:00",300 "session_1","event_4","2015-01-01 11:45:00",300 "session_2","event_1","2015-01-01 10:10:00",100 "session_2","event_2","2015-01-01 11:00:00",500   scala> val sessionfile = sc.textfile("test.csv"). map(_.split(',')). map(e => (e(1).trim, sessions(e(0).trim,e(1).trim,e(2).trim,e(3).trim.toint))). foreach(println)  ("event_1",sessions("session_2","event_1","2015-01-01 10:10:00",100)) ("event_1",sessions("session_1","event_1","2015-01-01 10:10:00",100)) ("event_2",sessions("session_2","event_2","2015-01-01 11:00:00",500)) ("event_2",sessions("session_1","event_2","2015-01-01 11:00:00",500)) ("event_3",sessions("session_1","event_3","2015-01-01 11:30:00",300)) ("event_4",sessions("session_1","event_4","2015-01-01 11:45:00",300)) sessionfile: unit = ()  scala> 

here solution uses joda time library.

val input =  """"session_1","event_1","2015-01-01 10:10:00",100    "session_1","event_2","2015-01-01 11:00:00",500    "session_1","event_3","2015-01-01 11:30:00",300    "session_1","event_4","2015-01-01 11:45:00",300    "session_2","event_1","2015-01-01 10:10:00",100    "session_2","event_2","2015-01-01 11:00:00",500""" 

create rdd text input, can read file using sc.textfile

import org.joda.time.format._ import org.joda.time._  def strtotime(s: string):long = {      datetimeformat.forpattern(""""yyyy-mm-dd hh:mm:ss"""")                   .parsedatetime(s).getmillis()/1000  }  val r1 = sc.parallelize(input.split("\n"))            .map(_.split(","))            .map(x => (x(0), (x(1), x(2), x(3))))            .groupby(_._1)            .map(_._2.map{ case(s, (e, timestr, r)) =>                                (s, (e, strtotime(timestr), r))}                     .toarray                     .sortby( z => z match {                          case (session, (event, time, records)) => time})) 

converted time "2015-01-01 10:10:00" seconds epoch, , sorted time.

val r2 = r1.map(x => x :+ { val y = x.last;                              y match {                              case (session, (event, time, records)) =>                                   (session, (event, time, "0")) }}) 

added event in each session, params same last event of session except record count. allows time-duration calculation provide "0" in last event.

use sliding pairs of events.

val r3 = r2.map(x => x.sliding(2).toarray)  val r4 = r3.map(x => x.map{          case array((s1, (e1, t1, c1)), (s2, (e2, t2, c2)))  =>                     (s1, (e1, (t2 - t1)/60, c1)) } ) 

use scan add records-count in incremental way.

val r5 = r4.map(x => x.zip(x.map{ case (s, (e, t, r)) => r.toint}                             .scan(0)(_+_)                             .drop(1)))  val r6 = r5.map(x => x.map{ case ((s, (e, t, r)), recordstillnow) =>                              s"${s},${e},${t},${recordstillnow}" })  val r7 = r6.flatmap(x => x)  r7.collect.mkstring("\n") //"session_2","event_1",50,100 //"session_2","event_2",0,600 //"session_1","event_1",50,100 //"session_1","event_2",30,600 //"session_1","event_3",15,900 //"session_1","event_4",0,1200 

Popular posts from this blog