debugging - spark scala - replace text if exists in list -


i have 2 datasets. 1 dataframe bunch of data, 1 column has comments (a string). other list of words.

if comment contains word in list, want replace word in comment @@@@@ , return comment in full replaced words.

here's sample data:

commentsample.txt

1   badword small town 2   "love truck, though rattle annoying." 3   love paint! 4    5   "like added ""oh badword2"" handle passenger side." 6   "badword you. specific enough you, badword3?"    7   car piece if badword2 

profanitysample.txt

badword badword2 badword3 

here's code far:

val sqlcontext = new org.apache.spark.sql.sqlcontext(sc)  import sqlcontext.implicits._  case class response(uniqueid: int, comment: string)  val response = sc.textfile("file:/data/commentsample.txt").map(_.split("\t")).filter(_.size == 2).map(r => response(r(0).trim.toint, r(1).trim.tostring, r(10).trim.toint)).todf()  var profanity = sc.textfile("file:/data/profanitysample.txt").map(x => (x.tolowercase())).toarray();      def replaceprofanity(s: string): string = {         val l = s.tolowercase()         val r = "@@@@@"         if(profanity.contains(s))             r         else             s       }      def processcomment(s: string): string = {         val commentwords = sc.parallelize(s.split(' '))         commentwords.foreach(replaceprofanity)         commentwords.collect().mkstring(" ")       }      response.select(processcomment("comment")).show(100) 

it compiles, runs, words not replaced. don't know how debug in scala. i'm totally new! first project ever!

many pointers. -m

first, think usecase describe here won't benefit use of dataframes - it's simpler implement using rdds (dataframes convenient when transformations can described using sql, isn't case here).

so - here's possible implementation using rdds. assumes list of profanities isn't large (i.e. ~thousands), can collect non-distributed memory. if that's not case, different approach (involving join) might needed.

case class response(uniqueid: int, comment: string)  val mask = "@@@@@"  val responses: rdd[response] = sc.textfile("file:/data/commentsample.txt").map(_.split("\t")).filter(_.size == 2).map(r => response(r(0).trim.toint, r(1).trim)) val profanities: array[string] = sc.textfile("file:/data/profanitysample.txt").collect()  val result = responses.map(r => {   // using foldleft here means we'll replace profanities 1 one,    // result of each replace input of next,   // starting original comment    profanities.foldleft(r.comment)({       case (updatedcomment, profanity) => updatedcomment.replaceall(s"(?i)\\b$profanity\\b", mask)    }) })  result.take(10).foreach(println) // printing examples... 

note case-insensitivity , "words only" limitations implemented in regex itself: "(?i)\\bsomeword\\b".


Comments