Data science and analytics
Scala Symposium: Big Data Pipeline Powered by Scala
val pipelineStages = List( new AddRowKeyStage(EvergreenSchema), new WriteToHBaseForLanding(hBaseCatalog), new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols), new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols), new DataTypeValidatorStage(EvergreenSchema), new DataTypeCastStage(sourceRawDf.schema, EvergreenSchema) )
import com.thoughtworks.awayday.ingest.models.ErrorModels.DataError
import org.apache.spark.sql.{DataFrame, Dataset}
trait DataStage[T < : Dataset[_]] extends Serializable {
def apply(errors: Dataset[DataError], dataRecords: T): (Dataset[DataError], DataFrame)
def stage: String
}
case class DataError(rowKey: String, stage: String, fieldName: String, fieldValue: String, error: String, severity: String, addlInfo: String = "")
import com.thoughtworks.awayday.ingest.DataFrameOps
import com.thoughtworks.awayday.ingest.UDFs.generateUUID
import com.thoughtworks.awayday.ingest.models.ErrorModels.DataError
import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}
class AddRowKeyStage(schemaWithRowKey: StructType)(implicit spark: SparkSession, encoder: Encoder[DataError]) extends DataStage[DataFrame] {
override val stage: String = getClass.getSimpleName
def apply(errors: Dataset[DataError], dataRecords: DataFrame): (Dataset[DataError],DataFrame) = addRowKeys(errors, dataRecords)
def addRowKeys(errors: Dataset[DataError], data: DataFrame): (Dataset[DataError],DataFrame) = {
val colOrder = schemaWithRowKey.fields.map(_.name)
val returnDf = data.withColumn(RowKey, lit(generateUUID())).select(colOrder.map(col): _*)
(errors.union(DataFrameOps.emptyErrorStream(spark)), returnDf)
}
}
val (initErr, initDf) = (DataFrameOps.emptyErrorStream(spark), sourceRawDf)
val validRecords = pipelineStages.foldLeft((initErr,initDf)) { case ((err, df), stage) =>
stage(err, df)
}

def getCurrentTemperature():Future[Double] = ??? //1 def getTomorrowsTempFromPredictionAPI(curr: Double): Future[Double] = ??? //2 def publishItInOurWebsite(pred: Double):Future[Double] = ??? //3
val published2:Future[Double] =
for {
curr < - getCurrentTemperature()
pred < - getTomorrowsTempFromPredictionAPI(curr)
pubw < - publishItInOurWebsite(pred)
} yield pubw

def getCurrentTemperatureW(): Writer[List[String], Double] = {
Writer(List("Thermometer isn't broken yet"), 10.0)
}
def getTomorrowsTempFromPredictionAPIW(curr: Double): Writer[List[String], Double] = {
Writer(List("Yay, the Prediction API works too"), 20.0)
}
def publishItInOurWebsiteW(pred: Double): Writer[List[String], Double] = {
Writer(List("Published to our website"), 20.0)
}
val publishedWriter: Writer[List[String], Double] =
for {
curr < - getCurrentTemperatureW()
pred < - getTomorrowsTempFromPredictionAPIW(curr)
pubw < - publishItInOurWebsiteW(pred)
} yield pubw
val (logs, value) = publishedWriter.run logs.foreach(println) println (value)
Thermometer isn't broken yet Yay, the Prediction API works too Published to our website 20.0
def flatMap[U](f: V => WriterT[F, L, U])(implicit flatMapF: FlatMap[F], semigroupL: Semigroup[L]): WriterT[F, L, U] =
WriterT {
flatMapF.flatMap(run) { lv =>
flatMapF.map(f(lv._2).run) { lv2 =>
(semigroupL.combine(lv._1, lv2._1), lv2._2)
}
}
}
object DataFrameOps {
...
...
implicit val dataFrameSemigroup: Semigroup[Dataset[_]] = new Semigroup[Dataset[_]] {
override def combine(x: Dataset[_], y: Dataset[_]): Dataset[_] = x.union(y)
}
}
type DataSetWithErrors[A] = Writer[Dataset[DataError], A]
trait DataStage[T < : Dataset[_]] extends Serializable {
def apply(data: T): DataSetWithErrors[T]
def stage: String
}
import cats.data.Writer
import com.thoughtworks.awayday.ingest.UDFs._
import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError, DataSetWithErrors}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import StageConstants._
class DataTypeValidatorStage(schema: StructType)(implicit val spark: SparkSession) extends DataStage[DataFrame] {
override val stage = getClass.getSimpleName
def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] = validateTypes(dataRecords)
def validateTypes(data: DataFrame): DataSetWithErrors[DataFrame] = {
val withErrorsDF = data.withColumn(RowLevelErrorListCol, validateRowUDF(schema, stage)(struct(data.columns.map(data(_)): _*)))
import spark.implicits._
val errorRecords =
withErrorsDF
.select(RowLevelErrorListCol)
.select(explode(col(RowLevelErrorListCol)))
.select("col.*")
.map(row = > DataError(row))
Writer(errorRecords, withErrorsDF.drop(RowLevelErrorListCol))
}
}
import DataFrameOps._
val initDf = Writer(DataFrameOps.emptyErrorStream(spark), sourceRawDf)
val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors, stage) =>
for {
df < - dfWithErrors
applied < - stage.apply(df)
} yield applied
}
val (errors, processedDf) = validRecords.run
val query = processedDf
.writeStream
.format("console")
.outputMode(OutputMode.Append())
.start()

org.apache.spark.sql.AnalysisException: Union between streaming and batch DataFrames/Datasets is not supported;;
val emptyErrorStream = (spark:SparkSession) => {
implicit val sqlC = spark.sqlContext
MemoryStream[DataError].toDS()
}
Note: For Data type specific row-level error handling for CSV and JSON, you could optionally consider using the ["mode"] option of the DataFrameReader.
Disclaimer: The statements and opinions expressed in this article are those of the author(s) and do not necessarily reflect the positions of Thoughtworks.