Apache Spark offers streaming capabilities with several predefined connectors (Kafka, sockets, file system etc). Additional receivers can be connected by extending the Receiver class. This example implements a Twitter custom receiver for Apache Spark streaming.
The complete example can be downloaded here https://github.com/melphi/spark-examples/tree/master/streaming-twitter-custom-receiver
Maven dependencies
The project includes the Twitter4j library and Spark streaming dependencies.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.sparkexamples</groupId> <artifactId>streaming-twitter-custom-receiver</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <!-- Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> <!-- Twitter client --> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-stream</artifactId> <version>4.0.6</version> </dependency> <!-- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.22</version> </dependency> <!-- Testing --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> </project>
Spark Twitter custom receiver
The Twitter stream connector extends the Receiver class, in this example the custom twitter receiver will listen to status messages containing the word "twitter".
The source code can be found here. https://github.com/melphi/spark-examples/blob/master/streaming-twitter-custom-receiver/src/main/java/org/sparkexample/TwitterReceiver.java
public final class TwitterReceiver extends Receiver<Status> { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterReceiver.class); /** * The keywords to be tracked. */ private static final String KEYWORDS = "twitter"; private final TwitterStream twitterStream; private StatusListener listener; public TwitterReceiver(StorageLevel storageLevel) { super(storageLevel); checkArgument(StorageLevel.MEMORY_ONLY().equals(storageLevel), String.format("Only [%s] supported.", StorageLevel.MEMORY_ONLY().toString())); twitterStream = new TwitterStreamFactory().getInstance(); } @Override public void onStart() { if (listener == null) { listener = new StreamListener(); } twitterStream.addListener(listener); twitterStream.filter(createFilter()); } private FilterQuery createFilter() { FilterQuery filterQuery = new FilterQuery(); try { filterQuery.track(KEYWORDS); } catch (Exception e) { LOGGER.error(e.getMessage(), e); throw new IllegalArgumentException(e); } return filterQuery; } @Override public void onStop() { twitterStream.clearListeners(); twitterStream.cleanUp(); listener = null; } private class StreamListener implements StatusListener { @Override public void onStatus(Status status) { store(status); } @Override public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { // Intentionally empty. } @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { // Intentionally empty. } @Override public void onScrubGeo(long userId, long upToStatusId) { // Intentionally empty. } @Override public void onStallWarning(StallWarning warning) { // Intentionally empty. } @Override public void onException(Exception ex) { LOGGER.warn(ex.getMessage(), ex); } } }
Twitter4j requires a twitter4j.properties file in the /main/java/resources folder with the twitter authentication parameters. See http://twitter4j.org/en/configuration.html for more information.
Spark streaming twitter task
The new Twitter custom receiver can be used in a Spark streaming task, here is an example. The complete source code is here https://github.com/melphi/spark-examples/blob/master/streaming-twitter-custom-receiver/src/main/java/org/sparkexample/TwitterStreamTask.java
public class TwitterStreamTask { private static final Class[] KRYO_CLASSES = ImmutableList.builder() .add(GeoLocation.class) .add(Status.class) .add(User.class) .build() .toArray(new Class[] {}); private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamTask.class); public static void main(String args[]) throws InterruptedException { new TwitterStreamTask().run(); } public void run() throws InterruptedException { SparkConf conf = new SparkConf().setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(KRYO_CLASSES) .setAppName("sparkTask"); JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10)); streamingContext.receiverStream(new TwitterReceiver(StorageLevel.MEMORY_ONLY())) .foreachRDD( rdd -> rdd.coalesce(10) .foreach(message -> LOGGER.info(message.getText()))); streamingContext.start(); streamingContext.awaitTermination(); } }
Runnin the example
The example can be executed by running mvn test on the project directory. Twitter4j offers many more functionalities, it is possible for example to listen to status messages from specific users, language, etc.