How to create a Twitter custom receiver for Apache Spark streaming, an example

Apache Spark offers streaming capabilities with several predefined connectors (Kafka, sockets, file system etc). Additional receivers can be connected by extending the Receiver class. This example implements a Twitter custom receiver for Apache Spark streaming.

The complete example can be downloaded here

Maven dependencies

The project includes the Twitter4j library and Spark streaming dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=""



        <!-- Spark -->

        <!-- Twitter client -->

        <!-- Logging -->

        <!-- Testing -->

Spark Twitter custom receiver

The Twitter stream connector extends the Receiver class, in this example the custom twitter receiver will listen to status messages containing the word "twitter".

The source code can be found here.

public final class TwitterReceiver extends Receiver<Status> {
  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterReceiver.class);

   * The keywords to be tracked.
  private static final String KEYWORDS = "twitter";

  private final TwitterStream twitterStream;

  private StatusListener listener;

  public TwitterReceiver(StorageLevel storageLevel) {
        String.format("Only [%s] supported.", StorageLevel.MEMORY_ONLY().toString()));
    twitterStream = new TwitterStreamFactory().getInstance();

  public void onStart() {
    if (listener == null) {
      listener = new StreamListener();

  private FilterQuery createFilter() {
    FilterQuery filterQuery = new FilterQuery();
    try {
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      throw new IllegalArgumentException(e);
    return filterQuery;

  public void onStop() {
    listener = null;

  private class StreamListener implements StatusListener {
    public void onStatus(Status status) {

    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
      // Intentionally empty.

    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
      // Intentionally empty.

    public void onScrubGeo(long userId, long upToStatusId) {
      // Intentionally empty.

    public void onStallWarning(StallWarning warning) {
      // Intentionally empty.

    public void onException(Exception ex) {
      LOGGER.warn(ex.getMessage(), ex);

Twitter4j requires a file in the /main/java/resources folder with the twitter authentication parameters. See for more information.

Spark streaming twitter task

The new Twitter custom receiver can be used in a Spark streaming task, here is an example. The  complete source code is here

public class TwitterStreamTask {
  private static final Class[] KRYO_CLASSES = ImmutableList.builder()
      .toArray(new Class[] {});

  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamTask.class);

  public static void main(String args[]) throws InterruptedException {
    new TwitterStreamTask().run();

  public void run() throws InterruptedException {
    SparkConf conf = new SparkConf().setMaster("local[*]")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));

    streamingContext.receiverStream(new TwitterReceiver(StorageLevel.MEMORY_ONLY()))
            rdd -> rdd.coalesce(10)
                .foreach(message ->;


Runnin the example

The example can be executed by running mvn test on the project directory. Twitter4j offers many more functionalities, it is possible for example to listen to status messages from specific users, language, etc.