Skip to content

Commit 15cd165

Browse files
committed
Include KinesisWordCount example
1 parent bfa7fba commit 15cd165

17 files changed

+157
-56
lines changed

Vagrantfile

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# -*- mode: ruby -*-
2+
# vi: set ft=ruby :
3+
4+
$script = <<SCRIPT
5+
cat << EOF >> /etc/hosts
6+
192.168.34.100 sparkmaster
7+
192.168.34.101 sparkworker1
8+
192.168.34.102 sparkworker2
9+
EOF
10+
11+
sudo service iptables stop
12+
sudo chkconfig iptables stop
13+
sudo /usr/sbin/setenforce 0
14+
sudo sed -i.old s/SELINUX=enforcing/SELINUX=disabled/ /etc/selinux/config
15+
SCRIPT
16+
17+
boxes = [
18+
{ :name => :sparkmaster, :ip => '192.168.34.100', :cpus => 2, :memory => 1024 },
19+
{ :name => :sparkworker1, :ip => '192.168.34.101', :cpus => 2, :memory => 2048 },
20+
{ :name => :sparkworker2, :ip => '192.168.34.102', :cpus => 2, :memory => 2048 },
21+
]
22+
23+
VAGRANT_API_VERSION = "2"
24+
25+
Vagrant.configure(VAGRANT_API_VERSION) do |conf|
26+
conf.vm.box = "chef/centos-6.5"
27+
28+
boxes.each do |box|
29+
conf.vm.define box[:name] do |config|
30+
config.vm.network 'private_network', ip: box[:ip]
31+
config.vm.hostname = box[:name].to_s
32+
config.vm.provider "virtualbox" do |v|
33+
v.customize ["modifyvm", :id, "--memory", box[:memory]]
34+
v.customize ["modifyvm", :id, "--cpus", box[:cpus]]
35+
end
36+
37+
#provisioning
38+
config.vm.provision :shell, inline: $script
39+
end
40+
end
41+
end

build.sbt

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import sbt.ExclusionRule
12
import sbt.Keys._
23

34
name := "spark_codebase"
@@ -18,7 +19,7 @@ resolvers ++= Seq(
1819
"Websudos releases" at "http://maven.websudos.co.uk/ext-release-local"
1920
)
2021

21-
val sparkVersion = "1.2.1"
22+
val sparkVersion = "1.3.0"
2223
val PhantomVersion = "1.6.0"
2324

2425
libraryDependencies ++= Seq(
@@ -29,6 +30,8 @@ libraryDependencies ++= Seq(
2930
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
3031
exclude("org.apache.zookeeper", "zookeeper"),
3132
"org.apache.spark" %% "spark-streaming-twitter" % sparkVersion,
33+
"org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion
34+
excludeAll ExclusionRule(organization = "org.apache.spark", name = "spark-streaming_2.10"),
3235
"org.slf4j" % "slf4j-api" % "1.7.12",
3336
"org.apache.kafka" %% "kafka" % "0.8.2.1"
3437
exclude("javax.jms", "jms")
@@ -45,7 +48,8 @@ libraryDependencies ++= Seq(
4548
"com.101tec" % "zkclient" % "0.4"
4649
exclude("org.apache.zookeeper", "zookeeper"),
4750
"joda-time" % "joda-time" % "2.7",
48-
"com.maxmind.geoip2" % "geoip2" % "2.1.0",
51+
"com.maxmind.geoip2" % "geoip2" % "2.1.0"
52+
exclude("org.apache.httpcomponents", "httpclient"),
4953
"com.websudos" %% "phantom-dsl" % PhantomVersion
5054
exclude("com.google.guava", "guava"),
5155
"com.websudos" %% "phantom-zookeeper" % PhantomVersion
@@ -73,9 +77,20 @@ test in assembly := {}
7377
assemblyMergeStrategy in assembly := {
7478
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
7579
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
80+
case PathList("com", "google", "common", "base", xs @ _*) => MergeStrategy.first
7681
case "application.conf" => MergeStrategy.concat
7782
case "com/twitter/common/args/apt/cmdline.arg.info.txt.1" => MergeStrategy.first
83+
case "org/apache/spark/unused/UnusedStubClass.class" => MergeStrategy.first
84+
case "log4j.properties" => MergeStrategy.first
85+
case "reference.conf" => MergeStrategy.concat
7886
case x =>
7987
val oldStrategy = (assemblyMergeStrategy in assembly).value
8088
oldStrategy(x)
89+
}
90+
91+
assemblyExcludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
92+
val excludes = Set(
93+
"commons-httpclient-3.1.jar"
94+
)
95+
cp filter { jar => excludes(jar.data.getName) }
8196
}

src/main/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
logs.dir=logs
22

3-
log4j.rootLogger=INFO, stdout
3+
log4j.rootLogger=WARN, stdout
44

55
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
66
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

src/main/scala/com/cloudwick/cassandra/CassandraLocationVisitServiceModule.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.cloudwick.cassandra
22

33
import com.cloudwick.cassandra.schema.{LocationVisit, LocationVisitRecord}
44
import com.cloudwick.cassandra.service.LocationVisitServiceModule
5-
import com.cloudwick.logging.Logging
5+
import com.cloudwick.logging.LazyLogging
66
import com.websudos.phantom.Implicits._
77

88
import scala.concurrent.Future
@@ -13,7 +13,7 @@ import scala.concurrent.Future
1313
*/
1414
trait CassandraLocationVisitServiceModule extends LocationVisitServiceModule with CassandraService {
1515

16-
object locationVisitService extends LocationVisitService with Logging {
16+
object locationVisitService extends LocationVisitService with LazyLogging {
1717

1818
override def update(locationVisit: LocationVisit) = {
1919
logger.trace(

src/main/scala/com/cloudwick/cassandra/CassandraLogVolumeServiceModule.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ package com.cloudwick.cassandra
22

33
import com.cloudwick.cassandra.schema.{LogVolume, LogVolumeRecord}
44
import com.cloudwick.cassandra.service.LogVolumeServiceModule
5-
import com.cloudwick.logging.Logging
5+
import com.cloudwick.logging.LazyLogging
66
import com.websudos.phantom.Implicits._
77

88
trait CassandraLogVolumeServiceModule extends LogVolumeServiceModule with CassandraService {
99

10-
object logVolumeService extends LogVolumeService with Logging {
10+
object logVolumeService extends LogVolumeService with LazyLogging {
1111
override def update(logVolume: LogVolume) = {
1212
logger.trace(
1313
s"Update volume per minute count. Minute: ${logVolume.timeStamp} " +

src/main/scala/com/cloudwick/cassandra/CassandraStatusCountServiceModule.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ package com.cloudwick.cassandra
22

33
import com.cloudwick.cassandra.schema.{StatusCount, StatusCountRecord}
44
import com.cloudwick.cassandra.service.StatusCountServiceModule
5-
import com.cloudwick.logging.Logging
5+
import com.cloudwick.logging.LazyLogging
66
import com.websudos.phantom.Implicits._
77

8-
import scala.concurrent.Future
9-
108
trait CassandraStatusCountServiceModule extends StatusCountServiceModule with CassandraService {
119

12-
object statusCountService extends StatusCountService with Logging {
10+
object statusCountService extends StatusCountService with LazyLogging {
1311

1412
override def update(statusCount: StatusCount) = {
1513
logger.trace(

src/main/scala/com/cloudwick/logging/Logging.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ package com.cloudwick.logging
22

33
import org.slf4j.{LoggerFactory, Logger}
44

5-
trait Logging {
5+
trait LazyLogging {
66
protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)
77
}

src/main/scala/com/cloudwick/spark/embedded/KafkaServer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.cloudwick.spark.embedded
33
import java.nio.file.Files
44
import java.util.Properties
55

6-
import com.cloudwick.logging.Logging
6+
import com.cloudwick.logging.LazyLogging
77
import kafka.admin.AdminUtils
88
import kafka.server.{KafkaConfig, KafkaServerStartable}
99
import kafka.utils.ZKStringSerializer
@@ -21,7 +21,7 @@ import scala.concurrent.duration._
2121
* @param config Broker configuration settings. Used to modify, for example, on which port the
2222
* broker should listen to.
2323
*/
24-
class KafkaServer(config: Properties = new Properties) extends Logging {
24+
class KafkaServer(config: Properties = new Properties) extends LazyLogging {
2525
private val defaultZkConnect = "127.0.0.1:2181"
2626
private val logDir = Files.createTempDirectory(this.getClass.getSimpleName)
2727

src/main/scala/com/cloudwick/spark/embedded/ZookeeperServer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.cloudwick.spark.embedded
22

3-
import com.cloudwick.logging.Logging
3+
import com.cloudwick.logging.LazyLogging
44
import org.apache.curator.test.TestingServer
55

66
/**
77
* Runs an in-memory, "embedded" instance of a ZooKeeper server.
88
*/
9-
class ZookeeperServer(val port: Int = 2181) extends Logging {
9+
class ZookeeperServer(val port: Int = 2181) extends LazyLogging {
1010
logger.debug(s"Starting embedded ZooKeeper server on port $port...")
1111

1212
// Creates a new instance of zookeeper server when an instance of this class is created

src/main/scala/com/cloudwick/spark/examples/core/WordCountRunner.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package com.cloudwick.spark.examples.core
22

3+
import com.cloudwick.logging.LazyLogging
34
import org.apache.spark.{Logging, SparkConf, SparkContext}
45

56
/**
67
* Simple word count program to illustrate spark standalone applications usage
78
*/
8-
object WordCountRunner extends App with Logging {
9+
object WordCountRunner extends App with LazyLogging {
910
if (args.length < 2) {
1011
System.err.println("Usage: WordCountRunner input_path output_path")
1112
System.exit(1)
@@ -20,7 +21,7 @@ object WordCountRunner extends App with Logging {
2021
val lines = sc.textFile(inputPath)
2122
val counts = WordCount.count(lines, stopWords)
2223

23-
// log.info(counts.collect().mkString("[", ", ", "]"))
24+
// logger.info(counts.collect().mkString("[", ", ", "]"))
2425

2526
counts.saveAsTextFile(outputPath)
2627
}

src/main/scala/com/cloudwick/spark/examples/streaming/kafka/KafkaWordCount.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package com.cloudwick.spark.examples.streaming.kafka
22

33
import java.nio.file.Files
44

5+
import com.cloudwick.logging.LazyLogging
56
import com.cloudwick.spark.examples.core.WordCount
67
import com.cloudwick.spark.examples.streaming.local.NetworkWordCountWindowed
7-
import com.cloudwick.spark.examples.streaming.util.Utils
88
import org.apache.spark.rdd.RDD
99
import org.apache.spark.streaming.kafka.KafkaUtils
1010
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
11-
import org.apache.spark.{Logging, SparkConf}
11+
import org.apache.spark.SparkConf
1212

1313
/**
1414
* Consumes messages from one or more topics in Kafka and does word-count.
@@ -30,9 +30,9 @@ import org.apache.spark.{Logging, SparkConf}
3030
* 7. Check the offset consumption of the topic
3131
* `bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic test-wc --group stcg`
3232
*/
33-
object KafkaWordCount extends App with Logging {
33+
object KafkaWordCount extends App with LazyLogging {
3434
if (args.length < 4) {
35-
log.error(
35+
logger.error(
3636
"""
3737
|Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
3838
| zkQuorum - Zookeeper quorum (hostname:port,hostname:port,..)
@@ -45,8 +45,6 @@ object KafkaWordCount extends App with Logging {
4545
System.exit(1)
4646
}
4747

48-
Utils.setSparkLogLevels()
49-
5048
val Array(zkQuorum, group, topics, numThreads) = args
5149
val batchDuration = Seconds(5)
5250
val windowDuration = Seconds(30)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.cloudwick.spark.examples.streaming.kinesis
2+
3+
import com.amazonaws.auth.AWSCredentials
4+
import com.amazonaws.services.kinesis.AmazonKinesisClient
5+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
6+
import com.cloudwick.logging.LazyLogging
7+
import org.apache.spark.storage.StorageLevel
8+
import org.apache.spark.streaming.StreamingContext._
9+
import org.apache.spark.streaming.kinesis.KinesisUtils
10+
import org.apache.spark.streaming.{Seconds, StreamingContext}
11+
import org.apache.spark.SparkConf
12+
13+
/**
14+
* Kinesis word count example
15+
*
16+
* Running this example locally:
17+
*
18+
* `spark-submit --class com.cloudwick.spark.examples.streaming.kinesis.KinesisWordCount
19+
* --master "local[*]" target/scala-2.10/spark_codebase-assembly-1.0.jar
20+
* <stream-name> <aws-access-key> <aws-secret-key> <endpoint-url>`
21+
*/
22+
object KinesisWordCount extends App with LazyLogging {
23+
if (args.length < 4) {
24+
logger.error(
25+
"""
26+
|Usage: KinesisWordCount <stream-name> <aws-access-key> <aws-secret-key> <endpoint-url>
27+
| stream-name - is the name of the kinesis stream
28+
| aws-access-key - is the aws access key
29+
| aws-secret-key - is the aws secret access keuy
30+
| endpoint-url - is the endpoint of the kinesis service
31+
""".stripMargin
32+
)
33+
System.exit(1)
34+
}
35+
36+
def fromCredentials(awsAccessKey: String,
37+
awsSecretKey: String,
38+
awsEndPoint: String): AmazonKinesisClient = {
39+
val credentials = new AWSCredentials {
40+
override def getAWSAccessKeyId: String = awsAccessKey
41+
42+
override def getAWSSecretKey: String = awsSecretKey
43+
}
44+
val client = new AmazonKinesisClient(credentials)
45+
client.setEndpoint(awsEndPoint)
46+
client
47+
}
48+
49+
val Array(streamName, awsAccessKey, awsSecretKey, endPointUrl) = args
50+
51+
// Determine the number of shards for a specified stream, so that we could create one kinesis
52+
// receiver for each shard
53+
val kinesisClient = fromCredentials(awsAccessKey, awsSecretKey, endPointUrl)
54+
val numShards = kinesisClient.describeStream(streamName).getStreamDescription.getShards.size
55+
56+
val batchDuration = Seconds(2)
57+
val sparkConf = new SparkConf().setAppName("KinesisWordCount").setMaster("local[*]")
58+
val ssc = new StreamingContext(sparkConf, batchDuration)
59+
60+
// create receivers
61+
// set aws.accessKeyId and aws.secretKey as system properties
62+
System.setProperty("aws.accessKeyId", awsAccessKey)
63+
System.setProperty("aws.secretKey", awsSecretKey)
64+
val kinesisStreams = (0 until numShards).map { i =>
65+
KinesisUtils.createStream(ssc, streamName, endPointUrl, batchDuration,
66+
InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_AND_DISK_2)
67+
}
68+
69+
// union all the streams
70+
val unionStream = ssc.union(kinesisStreams)
71+
72+
// convert each record of type Byte to string
73+
val words = unionStream.flatMap(new String(_).split("\\s+"))
74+
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
75+
76+
wordCounts.print()
77+
78+
ssc.start()
79+
ssc.awaitTermination()
80+
}

src/main/scala/com/cloudwick/spark/examples/streaming/local/NetworkWordCountRunner.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.cloudwick.spark.examples.streaming.local
22

33
import com.cloudwick.spark.examples.core.WordCount
4-
import com.cloudwick.spark.examples.streaming.util.Utils
54
import org.apache.spark.rdd.RDD
65
import org.apache.spark.storage.StorageLevel
76
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
@@ -31,8 +30,6 @@ object NetworkWordCountRunner extends App with Logging {
3130
val Array(host, port, batchInterval) = args
3231
val stopWords = Set("a", "an", "the")
3332

34-
Utils.setSparkLogLevels()
35-
3633
val conf = new SparkConf().setAppName("NetworkWordCount")
3734
val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
3835

src/main/scala/com/cloudwick/spark/examples/streaming/local/NetworkWordCountWindowedRunner.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package com.cloudwick.spark.examples.streaming.local
33
import java.nio.file.Files
44
import com.cloudwick.spark.examples.core.WordCount
55
import com.cloudwick.spark.examples.streaming.local.NetworkWordCountWindowed._
6-
import com.cloudwick.spark.examples.streaming.util.Utils
76
import org.apache.spark.rdd.RDD
87
import org.apache.spark.{Logging, SparkConf}
98
import org.apache.spark.storage.StorageLevel
@@ -40,8 +39,6 @@ object NetworkWordCountWindowedRunner extends App with Logging {
4039

4140
log.info(s"Connecting to host: $hostname port: $port")
4241

43-
Utils.setSparkLogLevels()
44-
4542
// Create a local StreamingContext with master & specified batch interval
4643
val conf = new SparkConf().setAppName("NetworkWordCount")
4744
val ssc = new StreamingContext(conf, Seconds(5))

src/main/scala/com/cloudwick/spark/examples/streaming/twitter/TwitterPopularTags.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.cloudwick.spark.examples.streaming.twitter
22

3-
import com.cloudwick.spark.examples.streaming.util.Utils
43
import org.apache.spark.streaming.twitter.TwitterUtils
54
import org.apache.spark.streaming.{Seconds, StreamingContext}
65
import org.apache.spark.{SparkConf, Logging}
@@ -23,8 +22,6 @@ object TwitterPopularTags extends App with Logging {
2322
System.exit(1)
2423
}
2524

26-
Utils.setSparkLogLevels()
27-
2825
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
2926
val filters = args.takeRight(args.length - 4)
3027

0 commit comments

Comments
 (0)