Skip to content

Commit aea88f0

Browse files
committed
Initial commit with all code files
0 parents  commit aea88f0

13 files changed

+804
-0
lines changed

ETLJobExecutionResult.png

145 KB
Loading

FetchStockDataFunction

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import json
2+
import requests
3+
4+
## Rapid API stock analysis api related info and data is collected for every min for first 100 minutes
5+
6+
url = "https://alpha-vantage.p.rapidapi.com/query"
7+
querystring = {"interval":"1min","function":"TIME_SERIES_INTRADAY","symbol":"MSFT","datatype":"json","output_size":"compact"}
8+
headers = {
9+
"X-RapidAPI-Key": "bd675a2e56msh75474a42f520a18p180a34jsn3f06bc445ce3",
10+
"X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com"
11+
}
12+
13+
def lambda_handler(event, context):
14+
# TODO implement
15+
16+
response = requests.get(url, headers=headers, params=querystring)
17+
response_json = response.json()
18+
19+
time_series_data = response_json["Time Series (1min)"]
20+
21+
time_series_data_json = json.dumps(time_series_data)
22+
23+
return time_series_data_json

FetchSubscriptionDataFunction

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import json
2+
import requests
3+
4+
def lambda_handler(event, context):
5+
# TODO implement
6+
7+
url = "https://streaming-availability.p.rapidapi.com/countries"
8+
headers = {
9+
"X-RapidAPI-Key": "bd675a2e56msh75474a42f520a18p180a34jsn3f06bc445ce3",
10+
"X-RapidAPI-Host": "streaming-availability.p.rapidapi.com"
11+
}
12+
13+
response = requests.get(url, headers=headers)
14+
15+
countrycode_details = {}
16+
17+
response_dict = json.loads((response.content).decode("utf-8"))
18+
19+
20+
return json.dumps(response_dict['result']["ae"])
21+
22+

GlueRoleToAccessS3.txt

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"Version": "2012-10-17",
3+
"Statement": [
4+
{
5+
"Effect": "Allow",
6+
"Action": [
7+
"s3:*",
8+
"s3-object-lambda:*"
9+
],
10+
"Resource": "*"
11+
}
12+
]
13+
}

InputDataToStepFunction.txt

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"start" : 1,
3+
"end" : 3,
4+
"streamName" : "subscriptionStream",
5+
"topicName" : "serviceSubscriptionptrTopic",
6+
"email" : "krishphaniteja9@gmail.com",
7+
"batchBucketName" : "stockdatabucket2810202312",
8+
"glueJobName" : "StockDataETLJOB"
9+
}

StateMachine.json

+323
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
{
2+
"Comment": "A description of my state machine",
3+
"StartAt": "ChooseBatchOrStream",
4+
"States": {
5+
"ChooseBatchOrStream": {
6+
"Type": "Pass",
7+
"Parameters": {
8+
"batchOrStreamValue.$": "States.MathRandom($.start,$.end)"
9+
},
10+
"Next": "CheckBatchOrStream",
11+
"ResultPath": "$.batchOrStreamValue"
12+
},
13+
"CheckBatchOrStream": {
14+
"Type": "Choice",
15+
"Choices": [
16+
{
17+
"Variable": "$.batchOrStreamValue.batchOrStreamValue",
18+
"NumericEquals": 1,
19+
"Next": "BatchJobExecutionStarts"
20+
}
21+
],
22+
"Default": "StreamJobExecutionStarts"
23+
},
24+
"BatchJobExecutionStarts": {
25+
"Type": "Pass",
26+
"Next": "FetchStockMarketData"
27+
},
28+
"FetchStockMarketData": {
29+
"Type": "Task",
30+
"Resource": "arn:aws:states:::lambda:invoke",
31+
"Parameters": {
32+
"Payload.$": "$",
33+
"FunctionName": "arn:aws:lambda:us-east-1:154142252146:function:FetchStockMarketDataFunction:$LATEST"
34+
},
35+
"Retry": [
36+
{
37+
"ErrorEquals": [
38+
"Lambda.ServiceException",
39+
"Lambda.AWSLambdaException",
40+
"Lambda.SdkClientException",
41+
"Lambda.TooManyRequestsException"
42+
],
43+
"IntervalSeconds": 1,
44+
"MaxAttempts": 3,
45+
"BackoffRate": 2
46+
}
47+
],
48+
"Next": "StockMarketDataFromJsonToString",
49+
"ResultPath": "$.taskResult"
50+
},
51+
"StockMarketDataFromJsonToString": {
52+
"Type": "Pass",
53+
"Parameters": {
54+
"taskResult.$": "States.StringToJson($.taskResult.Payload)"
55+
},
56+
"Next": "ListBuckets",
57+
"ResultPath": "$.taskResult"
58+
},
59+
"ListBuckets": {
60+
"Type": "Task",
61+
"Parameters": {},
62+
"Resource": "arn:aws:states:::aws-sdk:s3:listBuckets",
63+
"Next": "CheckBucketExistsOrNot",
64+
"ResultPath": "$.Buckets"
65+
},
66+
"CheckBucketExistsOrNot": {
67+
"Type": "Pass",
68+
"Parameters": {
69+
"ValidateBucket.$": "States.ArrayContains($.Buckets.Buckets,$.batchBucketName)"
70+
},
71+
"ResultPath": "$.ValidateBucket",
72+
"Next": "ValidateBucketExistsOrNot"
73+
},
74+
"ValidateBucketExistsOrNot": {
75+
"Type": "Choice",
76+
"Choices": [
77+
{
78+
"Variable": "$.ValidateBucket.ValidateBucket",
79+
"BooleanEquals": false,
80+
"Next": "BucketNotExists"
81+
}
82+
],
83+
"Default": "BucketAlreadyExists"
84+
},
85+
"BucketNotExists": {
86+
"Type": "Pass",
87+
"Next": "WaitFor1Min"
88+
},
89+
"WaitFor1Min": {
90+
"Type": "Wait",
91+
"Seconds": 60,
92+
"Next": "CreateBucket"
93+
},
94+
"CreateBucket": {
95+
"Type": "Task",
96+
"Parameters": {
97+
"Bucket.$": "$.batchBucketName"
98+
},
99+
"Resource": "arn:aws:states:::aws-sdk:s3:createBucket",
100+
"Next": "BucketCreatedInsertDataIntoBucket",
101+
"ResultPath": null
102+
},
103+
"BucketCreatedInsertDataIntoBucket": {
104+
"Type": "Pass",
105+
"Next": "WaitForSomeTimeToLoadDataIntoS3"
106+
},
107+
"WaitForSomeTimeToLoadDataIntoS3": {
108+
"Type": "Wait",
109+
"Seconds": 60,
110+
"Next": "PutObject"
111+
},
112+
"PutObject": {
113+
"Type": "Task",
114+
"Parameters": {
115+
"Body.$": "$.taskResult.taskResult",
116+
"Bucket.$": "$.batchBucketName",
117+
"Key": "StockData"
118+
},
119+
"Resource": "arn:aws:states:::aws-sdk:s3:putObject",
120+
"Next": "WaitForSomeTimeToStartGlueJob",
121+
"ResultPath": null
122+
},
123+
"WaitForSomeTimeToStartGlueJob": {
124+
"Type": "Wait",
125+
"Seconds": 60,
126+
"Next": "Glue StartJobRun"
127+
},
128+
"Glue StartJobRun": {
129+
"Type": "Task",
130+
"Resource": "arn:aws:states:::glue:startJobRun",
131+
"Parameters": {
132+
"JobName.$": "$.glueJobName"
133+
},
134+
"Next": "ETLJobSuccess"
135+
},
136+
"ETLJobSuccess": {
137+
"Type": "Succeed"
138+
},
139+
"BucketAlreadyExists": {
140+
"Type": "Pass",
141+
"Next": "PutObject"
142+
},
143+
"StreamJobExecutionStarts": {
144+
"Type": "Pass",
145+
"Next": "FetchSubscriptionData"
146+
},
147+
"FetchSubscriptionData": {
148+
"Type": "Task",
149+
"Resource": "arn:aws:states:::lambda:invoke",
150+
"Parameters": {
151+
"Payload.$": "$",
152+
"FunctionName": "arn:aws:lambda:us-east-1:154142252146:function:FetchSubscriptionDataFunction:$LATEST"
153+
},
154+
"Retry": [
155+
{
156+
"ErrorEquals": [
157+
"Lambda.ServiceException",
158+
"Lambda.AWSLambdaException",
159+
"Lambda.SdkClientException",
160+
"Lambda.TooManyRequestsException"
161+
],
162+
"IntervalSeconds": 1,
163+
"MaxAttempts": 3,
164+
"BackoffRate": 2
165+
}
166+
],
167+
"ResultPath": "$.taskResult",
168+
"Next": "StringToJsonTransformation"
169+
},
170+
"StringToJsonTransformation": {
171+
"Type": "Pass",
172+
"Parameters": {
173+
"PayloadData.$": "States.StringToJson($.taskResult.Payload)"
174+
},
175+
"ResultPath": "$.PayloadJsonData",
176+
"Next": "ListStreams"
177+
},
178+
"ListStreams": {
179+
"Type": "Task",
180+
"Parameters": {},
181+
"Resource": "arn:aws:states:::aws-sdk:kinesis:listStreams",
182+
"ResultPath": "$.taskResult",
183+
"Next": "CheckStreamIsPresentOrNot"
184+
},
185+
"CheckStreamIsPresentOrNot": {
186+
"Type": "Pass",
187+
"Parameters": {
188+
"ContainsStreamOrNot.$": "States.ArrayContains($.taskResult.StreamNames,$.streamName)"
189+
},
190+
"ResultPath": "$.ContainsStreamOrNot",
191+
"Next": "BasedOnPresenceOfStream"
192+
},
193+
"BasedOnPresenceOfStream": {
194+
"Type": "Choice",
195+
"Choices": [
196+
{
197+
"Variable": "$.ContainsStreamOrNot.ContainsStreamOrNot",
198+
"BooleanEquals": true,
199+
"Next": "StreamIsAlreadyPresent"
200+
}
201+
],
202+
"Default": "StreamIsNotPresentOrCreating"
203+
},
204+
"StreamIsAlreadyPresent": {
205+
"Type": "Pass",
206+
"Next": "StreamExists"
207+
},
208+
"StreamIsNotPresentOrCreating": {
209+
"Type": "Pass",
210+
"Next": "StreamCreation"
211+
},
212+
"StreamCreation": {
213+
"Type": "Task",
214+
"Parameters": {
215+
"StreamName.$": "$.streamName"
216+
},
217+
"Resource": "arn:aws:states:::aws-sdk:kinesis:createStream",
218+
"ResultPath": null,
219+
"Next": "WaitForSomeTime"
220+
},
221+
"WaitForSomeTime": {
222+
"Type": "Wait",
223+
"Seconds": 60,
224+
"Next": "StreamExists"
225+
},
226+
"StreamExists": {
227+
"Type": "Pass",
228+
"Next": "InsertRecordInStream"
229+
},
230+
"InsertRecordInStream": {
231+
"Type": "Task",
232+
"Parameters": {
233+
"StreamName.$": "$.streamName",
234+
"Data.$": "$.PayloadJsonData.PayloadData.services",
235+
"PartitionKey.$": "$.PayloadJsonData.PayloadData.countryCode"
236+
},
237+
"Resource": "arn:aws:states:::aws-sdk:kinesis:putRecord",
238+
"ResultPath": "$.taskResult",
239+
"Next": "DataEnteredIntoStream"
240+
},
241+
"DataEnteredIntoStream": {
242+
"Type": "Pass",
243+
"Next": "ListTopics"
244+
},
245+
"ListTopics": {
246+
"Type": "Task",
247+
"Parameters": {},
248+
"Resource": "arn:aws:states:::aws-sdk:sns:listTopics",
249+
"ResultPath": "$.TopicsList",
250+
"Next": "CheckTopicIsPresentOrNot"
251+
},
252+
"CheckTopicIsPresentOrNot": {
253+
"Type": "Pass",
254+
"Parameters": {
255+
"IsTopicExists.$": "States.ArrayContains($.TopicsList.Topics,$.topicName)"
256+
},
257+
"Next": "CheckIfTopicExistsOrNot",
258+
"ResultPath": "$.IsTopicExists"
259+
},
260+
"CheckIfTopicExistsOrNot": {
261+
"Type": "Choice",
262+
"Choices": [
263+
{
264+
"Variable": "$.IsTopicExists.IsTopicExists",
265+
"BooleanEquals": true,
266+
"Next": "TopicAlreadyExists"
267+
}
268+
],
269+
"Default": "CreateTopicAndSubscription"
270+
},
271+
"TopicAlreadyExists": {
272+
"Type": "Pass",
273+
"Next": "PublishDataToTopic"
274+
},
275+
"CreateTopicAndSubscription": {
276+
"Type": "Pass",
277+
"Next": "CreateTopic"
278+
},
279+
"CreateTopic": {
280+
"Type": "Task",
281+
"Parameters": {
282+
"Name.$": "$.topicName"
283+
},
284+
"Resource": "arn:aws:states:::aws-sdk:sns:createTopic",
285+
"ResultPath": "$.topicArn",
286+
"Next": "Subscribe"
287+
},
288+
"Subscribe": {
289+
"Type": "Task",
290+
"Parameters": {
291+
"Protocol": "email",
292+
"TopicArn.$": "$.topicArn.TopicArn",
293+
"Endpoint": "krishphaniteja9@gmail.com"
294+
},
295+
"Resource": "arn:aws:states:::aws-sdk:sns:subscribe",
296+
"Next": "Wait",
297+
"ResultPath": null
298+
},
299+
"Wait": {
300+
"Type": "Wait",
301+
"Seconds": 120,
302+
"Next": "PublishDataToTopic"
303+
},
304+
"PublishDataToTopic": {
305+
"Type": "Task",
306+
"Resource": "arn:aws:states:::sns:publish",
307+
"Parameters": {
308+
"TopicArn.$": "$.topicArn.TopicArn",
309+
"Message": {
310+
"KinesisShardDetails": "Data is present at",
311+
"SequenceNumber.$": "$.taskResult.SequenceNumber",
312+
"ShardId.$": "$.taskResult.ShardId",
313+
"topicName.$": "$.topicName",
314+
"email.$": "$.email"
315+
}
316+
},
317+
"Next": "Success"
318+
},
319+
"Success": {
320+
"Type": "Succeed"
321+
}
322+
}
323+
}

0 commit comments

Comments
 (0)