Skip to content

Commit 9dac2a3

Browse files
committed
[loganalysis] add cassandra integration
1 parent 3adb97d commit 9dac2a3

24 files changed

+555
-54
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
*.log
33
wiki
44
*.sc
5+
src/main/scala/com/cloudwick/Random.scala
56

67
# sbt specific
78
dist/*

README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Cloudwick Spark CodeBase
2+
3+
This repository is a collection of Spark examples & use-case implementations for various components of the Spark eco-system including Spark-Core, Spark-Streaming, Spark-SQL, Spark-MLLib.
4+
5+
## What does this repository contains ?
6+
7+
* Spark core examples
8+
* WordCount
9+
* Spark streaming examples
10+
* NetworkWordCount
11+
* NetworkWordCountWindowed
12+
* RecoverableNetworkWordCount
13+
* TwitterPopularTags
14+
* KafkaWordCount
15+
* Spark core use-cases
16+
* Spark streaming use-cases
17+
* LogAnalytics
18+
* Testing
19+
* ScalaTest spec traits for Spark core, streaming and SQL API(s)
20+
* Embedded Kafka and Zookeeper embedded server instances for testing
21+
22+
## How to download ?
23+
24+
Simplest way is to clone the repository:
25+
26+
```
27+
git clone https://github.com/cloudwicklabs/spark_codebase.git
28+
```
29+
30+
## How to run these ?
31+
32+
To run any of these examples or use-cases you have to package them using a uber-jar (most of the examples depend of external dependencies, hence have to be packaged as a assembly jar).
33+
34+
### Building an assembly jar
35+
36+
From the project's home directory
37+
38+
```
39+
sbt assembly
40+
```
41+
42+
### Running using `spark-submit`
43+
44+
[`spark-submit`](https://spark.apache.org/docs/latest/submitting-applications.html) is the simplest way to submit a spark application to the cluster and supports all the cluster manager's like stand-alone, yarn and mesos.
45+
46+
Each of the main class has documentation on how to run it.

ReadMe.md

Whitespace-only changes.

build.sbt

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
1+
import sbt.Keys._
2+
13
name := "spark_codebase"
24

35
version := "1.0"
46

57
scalaVersion := "2.10.5"
68

79
resolvers ++= Seq(
8-
"typesafe-repository" at "http://repo.typesafe.com/typesafe/releases"
10+
"Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/",
11+
"Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/",
12+
"Sonatype repo" at "https://oss.sonatype.org/content/groups/scala-tools/",
13+
"Sonatype releases" at "https://oss.sonatype.org/content/repositories/releases",
14+
"Sonatype snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
15+
"Sonatype staging" at "http://oss.sonatype.org/content/repositories/staging",
16+
"Java.net Maven2 Repository" at "http://download.java.net/maven/2/",
17+
"Twitter Repository" at "http://maven.twttr.com",
18+
"Websudos releases" at "http://maven.websudos.co.uk/ext-release-local"
919
)
1020

1121
val sparkVersion = "1.2.1"
22+
val PhantomVersion = "1.6.0"
1223

1324
libraryDependencies ++= Seq(
1425
"org.apache.spark" %% "spark-core" % sparkVersion % "provided"
@@ -18,19 +29,28 @@ libraryDependencies ++= Seq(
1829
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
1930
exclude("org.apache.zookeeper", "zookeeper"),
2031
"org.apache.spark" %% "spark-streaming-twitter" % sparkVersion,
32+
"org.slf4j" % "slf4j-api" % "1.7.12",
2133
"org.apache.kafka" %% "kafka" % "0.8.2.1"
2234
exclude("javax.jms", "jms")
2335
exclude("com.sun.jdmk", "jmxtools")
2436
exclude("com.sun.jmx", "jmxri")
25-
exclude("org.slf4j", "slf4j-simple")
2637
exclude("log4j", "log4j")
2738
exclude("org.apache.zookeeper", "zookeeper")
28-
exclude("com.101tec", "zkclient"),
29-
"org.apache.curator" % "curator-test" % "2.4.0",
39+
exclude("com.101tec", "zkclient")
40+
excludeAll ExclusionRule(organization = "org.slf4j"),
41+
"org.apache.curator" % "curator-test" % "2.4.0"
42+
excludeAll ExclusionRule(organization = "io.netty")
43+
excludeAll ExclusionRule(organization = "org.jboss.netty"),
3044
"com.101tec" % "zkclient" % "0.4"
3145
exclude("org.apache.zookeeper", "zookeeper"),
3246
"joda-time" % "joda-time" % "2.7",
3347
"com.maxmind.geoip2" % "geoip2" % "2.1.0",
48+
"com.websudos" %% "phantom-dsl" % PhantomVersion,
49+
"com.websudos" %% "phantom-zookeeper" % PhantomVersion
50+
excludeAll ExclusionRule(organization = "io.netty")
51+
excludeAll ExclusionRule(organization = "org.jboss.netty")
52+
excludeAll ExclusionRule(organization = "org.slf4j"),
53+
"com.typesafe" % "config" % "1.2.1",
3454
// Test dependencies
3555
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
3656
"org.xerial.snappy" % "snappy-java" % "1.1.1.7"
@@ -44,4 +64,14 @@ parallelExecution in Test := false
4464
fork in Test := true
4565

4666
// Skip running tests during assembly
47-
test in assembly := {}
67+
test in assembly := {}
68+
69+
assemblyMergeStrategy in assembly := {
70+
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
71+
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
72+
case "application.conf" => MergeStrategy.concat
73+
case "com/twitter/common/args/apt/cmdline.arg.info.txt.1" => MergeStrategy.first
74+
case x =>
75+
val oldStrategy = (assemblyMergeStrategy in assembly).value
76+
oldStrategy(x)
77+
}

project/plugins.sbt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
logLevel := Level.Warn
1+
logLevel := Level.Warn
2+
3+
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.5")

src/main/resources/log4j.properties

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
logs.dir=logs
2+
3+
log4j.rootLogger=INFO, stdout
4+
5+
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
6+
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
7+
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
8+
9+
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
10+
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
11+
log4j.appender.kafkaAppender.File=${logs.dir}/server.log
12+
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
13+
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
14+
15+
log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
16+
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
17+
log4j.appender.stateChangeAppender.File=${logs.dir}/state-change.log
18+
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
19+
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
20+
21+
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
22+
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
23+
log4j.appender.requestAppender.File=${logs.dir}/kafka-request.log
24+
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
25+
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
26+
27+
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
28+
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
29+
log4j.appender.cleanerAppender.File=log-cleaner.log
30+
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
31+
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
32+
33+
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
34+
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
35+
log4j.appender.controllerAppender.File=${logs.dir}/controller.log
36+
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
37+
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

src/main/resources/reference.conf

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
cassandra {
2+
keyspace = "loganalytics"
3+
4+
host = "localhost"
5+
6+
# native_transport_port
7+
# port for the CQL native transport to listen for clients on
8+
#
9+
nativePort = 9042
10+
# rpc_port
11+
# port for Thrift to listen for clients on
12+
rpcPort = 9160
13+
14+
replication.strategy = "SimpleStrategy"
15+
replication.factor = 3
16+
17+
# Size of pool used for cassandra operations
18+
concurrency = 10
19+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.cloudwick.cassandra
2+
3+
import com.cloudwick.cassandra.schema.{LocationVisitRecord, StatusCountRecord, LogVolumeRecord}
4+
import com.typesafe.config.ConfigFactory
5+
6+
import scala.concurrent.Await
7+
import scala.concurrent.duration._
8+
9+
/**
10+
* Description goes here.
11+
* @author ashrith
12+
*/
13+
trait Cassandra extends CassandraService {
14+
private[this] val config = ConfigFactory.load()
15+
16+
protected val cassandraHost = config.getString("cassandra.host")
17+
protected val nativePort = config.getInt("cassandra.nativePort")
18+
protected val rpcPort = config.getInt("cassandra.rpcPort")
19+
20+
def installSchema(): Unit = {
21+
Await.result(LogVolumeRecord.create.future(), 10.seconds)
22+
Await.result(StatusCountRecord.create.future(), 10.seconds)
23+
Await.result(LocationVisitRecord.create.future(), 10.seconds)
24+
}
25+
}
26+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.cloudwick.cassandra
2+
3+
import java.util.concurrent.Executors
4+
5+
import com.google.common.util.concurrent.ThreadFactoryBuilder
6+
import com.typesafe.config.ConfigFactory
7+
8+
import scala.concurrent.ExecutionContext
9+
10+
/**
11+
* Description goes here.
12+
* @author ashrith
13+
*/
14+
trait CassandraExecutionContext {
15+
implicit val cassandraExecutionContext: ExecutionContext = CassandraExecutionContext.executionContext
16+
}
17+
18+
object CassandraExecutionContext {
19+
implicit val executionContext: ExecutionContext = {
20+
val executor = {
21+
val threadFactory = new ThreadFactoryBuilder().setNameFormat("cassandra-pool-%d").build()
22+
val cassandraConcurrency = ConfigFactory.load().getInt("cassandra.concurrency")
23+
Executors.newFixedThreadPool(cassandraConcurrency, threadFactory)
24+
}
25+
ExecutionContext.fromExecutor(executor)
26+
}
27+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.cloudwick.cassandra
2+
3+
import com.cloudwick.cassandra.schema.{LocationVisit, LocationVisitRecord}
4+
import com.cloudwick.cassandra.service.LocationVisitServiceModule
5+
import com.cloudwick.logging.Logging
6+
import com.websudos.phantom.Implicits._
7+
8+
import scala.concurrent.Future
9+
10+
/**
11+
* Description goes here.
12+
* @author ashrith
13+
*/
14+
trait CassandraLocationVisitServiceModule extends LocationVisitServiceModule with CassandraService {
15+
16+
object locationVisitService extends LocationVisitService with Logging {
17+
18+
override def update(locationVisit: LocationVisit) = {
19+
logger.trace(
20+
s"Update location visit counter. Country: ${locationVisit.country} " +
21+
s"City: ${locationVisit.city} " +
22+
s"Count: ${locationVisit.totalCount}"
23+
)
24+
25+
LocationVisitRecord.update
26+
.where(_.country eqs locationVisit.country)
27+
.and(_.city eqs locationVisit.city)
28+
.modify(_.total_count increment locationVisit.totalCount)
29+
.future()
30+
}
31+
32+
def getCount(country: String, city: String): Future[Option[Long]] = {
33+
LocationVisitRecord
34+
.select(_.total_count)
35+
.where(_.country eqs country)
36+
.and(_.city eqs city)
37+
.one()(session, cassandraExecutionContext)
38+
}
39+
}
40+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.cloudwick.cassandra
2+
3+
import com.cloudwick.cassandra.schema.{LogVolume, LogVolumeRecord}
4+
import com.cloudwick.cassandra.service.LogVolumeServiceModule
5+
import com.cloudwick.logging.Logging
6+
import com.websudos.phantom.Implicits._
7+
8+
trait CassandraLogVolumeServiceModule extends LogVolumeServiceModule with CassandraService {
9+
10+
object logVolumeService extends LogVolumeService with Logging {
11+
override def update(logVolume: LogVolume) = {
12+
logger.trace(
13+
s"Update volume per minute count. Minute: ${logVolume.timeStamp} " +
14+
s"Count: ${logVolume.totalCount}"
15+
)
16+
17+
LogVolumeRecord.update
18+
.where(_.timeStamp eqs logVolume.timeStamp)
19+
.modify(_.total_count increment logVolume.totalCount)
20+
.future()
21+
}
22+
23+
override def getCount(id: Long) = {
24+
LogVolumeRecord
25+
.select(_.total_count)
26+
.where(_.timeStamp eqs id)
27+
.one()(session, cassandraExecutionContext)
28+
}
29+
}
30+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.cloudwick.cassandra
2+
3+
import com.typesafe.config.ConfigFactory
4+
import com.websudos.phantom.zookeeper.{CassandraManager, SimpleCassandraConnector}
5+
6+
/**
7+
* Description goes here.
8+
* @author ashrith
9+
*/
10+
trait CassandraService extends SimpleCassandraConnector with CassandraExecutionContext {
11+
override def keySpace: String = {
12+
val config = ConfigFactory.load()
13+
config.getString("cassandra.keyspace")
14+
}
15+
16+
override def manager: CassandraManager = ConfigurableCassandraManager
17+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.cloudwick.cassandra
2+
3+
import com.cloudwick.cassandra.schema.{StatusCount, StatusCountRecord}
4+
import com.cloudwick.cassandra.service.StatusCountServiceModule
5+
import com.cloudwick.logging.Logging
6+
import com.websudos.phantom.Implicits._
7+
8+
import scala.concurrent.Future
9+
10+
trait CassandraStatusCountServiceModule extends StatusCountServiceModule with CassandraService {
11+
12+
object statusCountService extends StatusCountService with Logging {
13+
14+
override def update(statusCount: StatusCount) = {
15+
logger.trace(
16+
s"Update status count counter. StatusCode: ${statusCount.statusCode} " +
17+
s"Count: ${statusCount.totalCount}"
18+
)
19+
20+
StatusCountRecord.update
21+
.where(_.statusCode eqs statusCount.statusCode)
22+
.modify(_.total_count increment statusCount.totalCount)
23+
.future()
24+
}
25+
26+
override def getCount(id: Int) = {
27+
StatusCountRecord
28+
.select(_.total_count)
29+
.where(_.statusCode eqs id)
30+
.one()(session, cassandraExecutionContext)
31+
}
32+
}
33+
34+
}

0 commit comments

Comments
 (0)