i have 2 datasets. 1 dataframe bunch of data, 1 column has comments (a string). other list of words.
if comment contains word in list, want replace word in comment @@@@@ , return comment in full replaced words.
here's sample data:
commentsample.txt
1 badword small town 2 "love truck, though rattle annoying." 3 love paint! 4 5 "like added ""oh badword2"" handle passenger side." 6 "badword you. specific enough you, badword3?" 7 car piece if badword2
profanitysample.txt
badword badword2 badword3
here's code far:
val sqlcontext = new org.apache.spark.sql.sqlcontext(sc) import sqlcontext.implicits._ case class response(uniqueid: int, comment: string) val response = sc.textfile("file:/data/commentsample.txt").map(_.split("\t")).filter(_.size == 2).map(r => response(r(0).trim.toint, r(1).trim.tostring, r(10).trim.toint)).todf() var profanity = sc.textfile("file:/data/profanitysample.txt").map(x => (x.tolowercase())).toarray(); def replaceprofanity(s: string): string = { val l = s.tolowercase() val r = "@@@@@" if(profanity.contains(s)) r else s } def processcomment(s: string): string = { val commentwords = sc.parallelize(s.split(' ')) commentwords.foreach(replaceprofanity) commentwords.collect().mkstring(" ") } response.select(processcomment("comment")).show(100)
it compiles, runs, words not replaced. don't know how debug in scala. i'm totally new! first project ever!
many pointers. -m
first, think usecase describe here won't benefit use of dataframes - it's simpler implement using rdds (dataframes convenient when transformations can described using sql, isn't case here).
so - here's possible implementation using rdds. assumes list of profanities isn't large (i.e. ~thousands), can collect non-distributed memory. if that's not case, different approach (involving join) might needed.
case class response(uniqueid: int, comment: string) val mask = "@@@@@" val responses: rdd[response] = sc.textfile("file:/data/commentsample.txt").map(_.split("\t")).filter(_.size == 2).map(r => response(r(0).trim.toint, r(1).trim)) val profanities: array[string] = sc.textfile("file:/data/profanitysample.txt").collect() val result = responses.map(r => { // using foldleft here means we'll replace profanities 1 one, // result of each replace input of next, // starting original comment profanities.foldleft(r.comment)({ case (updatedcomment, profanity) => updatedcomment.replaceall(s"(?i)\\b$profanity\\b", mask) }) }) result.take(10).foreach(println) // printing examples...
note case-insensitivity , "words only" limitations implemented in regex itself: "(?i)\\bsomeword\\b"
.
Comments
Post a Comment