by Rahul


  • Big Data
  • java


  • bigdata
  • java
  • spark
Spark with Java

How to Use non-serializable classes in Spark closures-

Spark closures, objects must be serializable otherwise spark engine throws ‘NotSerializableException’. You will often come across the situation when you can’t change the actual class implementation. Resolve this error using the Kryo.

Register classes as serializable in SparkContent-

//Exact exception spark throws when class is not serialize Serialization stack: - object not serializable (class:, value:

/\*\* \* Create Custom KryoRegistrator implementation \*/ **public class** CustomKKryoRegistrator **implements** org.apache.spark.serializer.KryoRegistrator{ @Override **public void** registerClasses(Kryo kryo) { kryo.register(ABean. **class** ); //Register non serialize classes } } //Register Kryo in SparkConf- sparkConf.set( **"spark.kryo.registrar"** ,CustomKKryoRegistrator. **class**.getName());

Create Dataset using Encoder-

**import** org.apache.spark.sql.Encoder; **import** org.apache.spark.sql.Encoders;Encoder<Employee> employeeEncoder = Encoders._`bean`_(Employee.class); Dataset<Employee> dataset=; **public class** Employee { **private** Integer **id** ; **private** String **name** ; }

cat employee.json
{ “id” : 100, “name” : “xyz” }
{ “id” : 200, “name” : “prq” }

Find out the Max value from Dataset column-

Row max = dataset.agg(org.apache.spark.sql.functions._`max`_(dataset.col( **`"id"`** ))).as( **`"max"`** ).head();
System. **_`out`_**.println(max);

Define custom UDF Function with SparkSession

Dataset<Long> ds= SessionRegistry.session.range(1,20);

        ds.sparkSession().udf().register("add100",(Long l)->l+100,org.apache.spark.sql.types.DataTypes.LongType);;

        ds.sparkSession().sql("select add100(id) from allnum").show();

Custom Property file in Spark -

Create property file- e.g.


//Supply Propty to spark using spark-submit
${SPARK_HOME}/bin/spark-submit --files
//Read file in drive

import java.util.Properties;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkFiles;

//Load file to propert object using HDFS FileSystem
String fileName = SparkFiles.get("")
Configuration hdfsConf = new Configuration();
FileSystem fs = FileSystem.get(hdfsConf);

//THe file name contains absolute path of file
FSDataInputStream is = Path(fileName));

Properties prop = new Properties();
//load properties
//retrieve properties

Accumulator implementation-

_/\*\* \* The sample accumulator to store set of string values \*/_ **class** CustomAccumulator **extends** AccumulatorV2\<String,Set\<String\>\>{ Set\<String\> **myval** = **new** HashSet\<\>(); @Override **public void** merge(AccumulatorV2\<String, Set\<String\>\> other) { other.value().stream().forEach(val-\> **myval**.add(val)); } @Override **public boolean** isZero() { **return myval**.size()==0; } @Override **public** AccumulatorV2\<String, Set\<String\>\> copy() { **return this** ; } @Override **public void** reset() { **myval**.clear(); } @Override **public void** add(String v) { **myval**.add(v); } @Override **public** Set\<String\> value() { **return myval** ; } } //Register accumulator to SparkContext. jsc object is created during init section (begining) AccumulatorV2 accumulatorV2 = **new** CustomAccumulator();; //Use accumulatorV2 like normal accumulator

Custom Comparator implementation for the compare operations-

/\*\* \* Comparator for Integer \*/ 
public **class** LengthComparator **implements** Comparator\<Integer\>{ 
  @Override **public int** compare(Integer o1, Integer o2) { **return** 0; 
  //jsc is JavaSparkContext defined in the beginning during init. 
  JavaRDD\<Integer\> javaRDD = _jsc_.parallelize(Arrays._asList_( **new** Integer[]{100,20,10,1020,100})); 
  //Find max value using custom implementation 
  Integer maxVal= javaRDD.max( **new** LengthComparator());