Spark UDF Candle Safety

I use Spark to collect data containing a date column and create 3 new columns containing the time in days, weeks and months between the date in the column and today.

My problem is using SimpleDateFormat, which is not thread safe. Normally, without Spark, this would be nice, since it is a local variable, but with a lazy evaluation, Spark shares one instance of SimpleDateFormat into several UDFs that might cause the problem?

def calcTimeDifference(...){ val sdf = new SimpleDateFormat(dateFormat) val dayDifference = udf{(x: String) => math.abs(Days.daysBetween(new DateTime(sdf.parse(x)), presentDate).getDays)} output = output.withColumn("days", dayDifference(myCol)) val weekDifference = udf{(x: String) => math.abs(Weeks.weeksBetween(new DateTime(sdf.parse(x)), presentDate).getWeeks)} output = output.withColumn("weeks", weekDifference(myCol)) val monthDifference = udf{(x: String) => math.abs(Months.monthsBetween(new DateTime(sdf.parse(x)), presentDate).getMonths)} output = output.withColumn("months", monthDifference(myCol)) } 
+5
source share
1 answer

I do not think it is safe, as we know, SimpleDateFormat is not thread safe.

Therefore, I prefer this method to use SimpleDateFormat in Spark if you need to:

 import java.text.SimpleDateFormat import java.util.SimpleTimeZone /** * Thread Safe SimpleDateFormat for Spark. */ object ThreadSafeFormat extends ThreadLocal[SimpleDateFormat] { override def initialValue(): SimpleDateFormat = { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:H") // if you need get UTC time, you can set UTC timezone val utcTimeZone = new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC") dateFormat.setTimeZone(utcTimeZone) dateFormat } } 

Then use ThreadSafeFormat.get() to get Thread-safe SimpleDateFormat to do something.

0
source

Source: https://habr.com/ru/post/1247017/


All Articles