• Big Data
  • java


  • java
  • spark

Spark Socket streaming example windows:
Windows OS doesn’t provide any netcat utility and if you are trying to test your spark streaming socket program in windows then either you download external netcat utility or create socket program equivalent to netcat.

The spark streaming socket word count example is implemented using netcat command.

Create socket utility using java, integrate it with Spark streaming and CustomSocketReceiver in java.

Create socket server utility which opens port and write data to socket:

package com.ts.spark.streaming; import\*; import; import; import java.util.Scanner; /\*\* \* 

This class create server socket, open InputStream through \* and read data until find "close" text in separate line

 \*/ public class SocketWriter { public static void main(String[] args) throws Exception { System.out.println("Begin"); /\*\* \* Create socket server \*/ ServerSocket server = new ServerSocket(9999); //open socket Socket socket = server.accept(); DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream()); System.out.println("Start writing data. Enter close when finish"); Scanner sc = new Scanner(; String str; /\*\* \* Read content from scanner and write to socket. \*/ while (!(str = sc.nextLine()).equals("close")) { outputStream.writeUTF(str); } //close connection now. server.close(); } }
  • Default socket receiver will not work as DataOutputStream is required to read data from socket. Use below receiver:
package com.ts.spark.streaming; import; import org.apache.spark.streaming.receiver.Receiver; import; import; import; /\*\* \* Custom java socket receiver \* Replaced BufferedReader with DataSocketStream. \*/ public class JavaCustomSocketReceiver extends Receiver<string> {

    String host = null;
    int port = -1;

    public JavaCustomSocketReceiver(String host_ , int port_) {
        host = host_;
        port = port_;

    public void onStart() {
        // Start the thread that receives data over a connection
        new Thread() {
            @Override public void run() {

    public void onStop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false

    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() {
        Socket socket = null;
        String userInput = null;

        try {
            // connect to the server
            socket = new Socket(host, port);

            DataInputStream reader=new DataInputStream(socket.getInputStream());
            // Until stopped or connection broken continue reading
            while (!isStopped() &amp;&amp; (userInput = reader.readUTF()) != null) {

            // Restart in an attempt to connect again when server is active again
            restart("Trying to connect again");
        } catch(ConnectException ce) {
            // restart if could not connect to server
            restart("Could not connect", ce);
        } catch(Throwable t) {
            // restart if there is any other error
            restart("Error receiving data", t);

  • Finally use your new Receiver in wordCount. I am saving each red to disk:
package com.ts.spark.streaming; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import; import; import; import; import scala.Tuple2; import; import java.util.Arrays; /\*\* \* Socket stream word count example. \*/ public class SocketStreamingExample { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("word count example").setMaster("local[\*]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1)); Accumulator<integer> count= ssc.sparkContext().accumulator(0,"Ts");
        JavaReceiverInputDStream<string> lines = ssc.receiverStream(new JavaCustomSocketReceiver("localhost", 9999));

        JavaDStream<string> words = lines.flatMap(x -&gt;{count.add(1); return Arrays.asList(x.split(" ")).iterator();});

        JavaPairDStream<string integer> wordCounts = words.mapToPair(s -&gt; new Tuple2<string integer>(s, 1))
                .reduceByKey((i1, i2) -&gt; i1 + i2);

        wordCounts.foreachRDD(rdd-&gt;rdd.saveAsTextFile("<path to save data>"));



Thanks for reading this blog.