Skip to content
This repository was archived by the owner on Jul 28, 2020. It is now read-only.

Commit e61adf4

Browse files
authored
Merge pull request #47 from zenaton/feature/scheduling
Add scheduling
2 parents 19bb807 + e0af609 commit e61adf4

File tree

5 files changed

+162
-5
lines changed

5 files changed

+162
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
- Added a `intent_id` property when dispatching workflows and tasks, sending events to workflows, and
77
pausing/resuming/killing workflows.
88

9+
- Added scheduling: `schedule(cron)`
10+
911
### Changed
1012

1113
### Deprecated

zenaton/client.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .abstracts.workflow import Workflow
88
from .exceptions import InvalidArgumentError
99
from .services.http_service import HttpService
10+
from .services.graphql_service import GraphQLService
1011
from .services.properties import Properties
1112
from .services.serializer import Serializer
1213
from .singleton import Singleton
@@ -16,6 +17,7 @@
1617
class Client(metaclass=Singleton):
1718
ZENATON_API_URL = 'https://api.zenaton.com/v1' # Zenaton api url
1819
ZENATON_WORKER_URL = 'http://localhost' # Default worker url
20+
ZENATON_GATEWAY_URL = "https://gateway.zenaton.com/api"; # Zenaton gateway url
1921
DEFAULT_WORKER_PORT = 4001 # Default worker port
2022
WORKER_API_VERSION = 'v_newton' # Default worker api version
2123

@@ -49,6 +51,7 @@ def __init__(self, app_id='', api_token='', app_env=''):
4951
self.api_token = api_token
5052
self.app_env = app_env
5153
self.http = HttpService()
54+
self.graphql = GraphQLService()
5255
self.serializer = Serializer()
5356
self.properties = Properties()
5457

@@ -57,6 +60,14 @@ def __lazy_init__(self, app_id, api_token, app_env):
5760
self.api_token = self.api_token or api_token
5861
self.app_env = self.app_env or app_env
5962

63+
"""
64+
Gets the gateway url (GraphQL API)
65+
:returns String the gateway url
66+
"""
67+
def gateway_url(self):
68+
url = os.environ.get('ZENATON_GATEWAY_URL') or self.ZENATON_GATEWAY_URL
69+
return url
70+
6071
"""
6172
Gets the url for the workers
6273
:param String resource the endpoint for the worker
@@ -114,6 +125,41 @@ def start_task(self, task):
114125
self.ATTR_MAX_PROCESSING_TIME: task.max_processing_time() if hasattr(task, 'max_processing_time') else None
115126
}))
116127

128+
def start_scheduled_workflow(self, flow, cron):
129+
url = self.gateway_url()
130+
headers = self.gateway_headers()
131+
query = self.graphql.CREATE_WORKFLOW_SCHEDULE
132+
variables = {
133+
'createWorkflowScheduleInput': {
134+
'intentId': self.uuid(),
135+
'environmentName': self.app_env,
136+
'cron': cron,
137+
'workflowName': self.class_name(flow),
138+
'canonicalName': self.canonical_name(flow) or self.class_name(flow),
139+
'programmingLanguage': self.PROG.upper(),
140+
'properties': self.serializer.encode(self.properties.from_(flow))
141+
}
142+
}
143+
res = self.graphql.request(url, query, variables=variables, headers=headers)
144+
return res['data']['createWorkflowSchedule']
145+
146+
def start_scheduled_task(self, task, cron):
147+
url = self.gateway_url()
148+
headers = self.gateway_headers()
149+
query = self.graphql.CREATE_TASK_SCHEDULE
150+
variables = {
151+
'createTaskScheduleInput': {
152+
'intentId': self.uuid(),
153+
'environmentName': self.app_env,
154+
'cron': cron,
155+
'taskName': self.class_name(task),
156+
'programmingLanguage': self.PROG.upper(),
157+
'properties': self.serializer.encode(self.properties.from_(task))
158+
}
159+
}
160+
res = self.graphql.request(url, query, variables=variables, headers=headers)
161+
return res['data']['createTaskSchedule']
162+
117163
def update_instance(self, workflow, custom_id, mode):
118164
params = '{}={}'.format(self.ATTR_ID, custom_id)
119165
url = self.instance_worker_url(params)
@@ -212,6 +258,12 @@ def add_app_env(self, url, params):
212258
app_id = '{}={}&'.format(self.APP_ID, self.app_id) if self.app_id else ''
213259
return '{}{}{}{}'.format(url, app_env, app_id, urllib.parse.quote_plus(params, safe='=&'))
214260

261+
def gateway_headers(self):
262+
return {'Accept': 'application/json',
263+
'Content-type': 'application/json',
264+
'app-id': self.app_id,
265+
'api-token': self.api_token}
266+
215267
def parse_custom_id_from(self, flow):
216268
custom_id = flow.id()
217269
if custom_id is not None:

zenaton/engine.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@ def execute(self, jobs):
2929
return [job.handle() for job in jobs]
3030
return self.processor.process(jobs, True)
3131

32+
"""
33+
Executes schedule jobs synchronously
34+
@param jobs [Array<Zenaton::Interfaces::Job>]
35+
@param cron String
36+
@return [Array<String>, nil] the results if executed locally, or nil
37+
"""
38+
def schedule(self, jobs, cron):
39+
for job in jobs:
40+
Engine._check_argument(job)
41+
42+
[self.local_schedule(job, cron) for job in jobs]
43+
3244
"""
3345
Executes jobs asynchronously
3446
@param jobs [Array<Zenaton::Interfaces::Job>]
@@ -37,15 +49,21 @@ def execute(self, jobs):
3749
def dispatch(self, jobs):
3850
map(Engine._check_argument, jobs)
3951
if len(jobs) == 0 or self.processor is None:
40-
[self.local_dispatch(job) for job in jobs]
52+
return [self.local_dispatch(job) for job in jobs]
4153
if self.processor and len(jobs) > 0:
4254
self.processor.process(jobs, False)
4355

4456
def local_dispatch(self, job):
4557
if isinstance(job, Workflow):
46-
self.client.start_workflow(job)
58+
return self.client.start_workflow(job)
59+
else:
60+
return self.client.start_task(job)
61+
62+
def local_schedule(self, job, cron):
63+
if isinstance(job, Workflow):
64+
return self.client.start_scheduled_workflow(job, cron)
4765
else:
48-
self.client.start_task(job)
66+
return self.client.start_scheduled_task(job, cron)
4967

5068
@staticmethod
5169
def _check_argument(job):

zenaton/services/graphql_service.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from ..exceptions import ExternalError, InternalError
2+
import json
3+
import requests
4+
5+
6+
class GraphQLService:
7+
CREATE_WORKFLOW_SCHEDULE = """
8+
mutation ($createWorkflowScheduleInput: CreateWorkflowScheduleInput!) {
9+
createWorkflowSchedule(input: $createWorkflowScheduleInput) {
10+
schedule {
11+
id
12+
name
13+
cron
14+
insertedAt
15+
updatedAt
16+
target {
17+
... on WorkflowTarget {
18+
name
19+
type
20+
canonicalName
21+
programmingLanguage
22+
properties
23+
}
24+
}
25+
}
26+
}
27+
}
28+
"""
29+
30+
CREATE_TASK_SCHEDULE = """
31+
mutation ($createTaskScheduleInput: CreateTaskScheduleInput!) {
32+
createTaskSchedule(input: $createTaskScheduleInput) {
33+
schedule {
34+
id
35+
name
36+
cron
37+
insertedAt
38+
updatedAt
39+
target {
40+
... on TaskTarget {
41+
name
42+
type
43+
programmingLanguage
44+
properties
45+
}
46+
}
47+
}
48+
}
49+
}
50+
"""
51+
52+
def request(self, url, query, variables=None, headers={}):
53+
try:
54+
data = {'query': query}
55+
if variables:
56+
data['variables'] = variables
57+
r = requests.request(method='POST', url=url,
58+
headers=headers, data=json.dumps(data))
59+
if r.status_code >= 400:
60+
raise InternalError(r.content)
61+
content = r.json()
62+
content['status_code'] = r.status_code
63+
64+
if 'errors' in content and isinstance(content['errors'], list) and len(content['errors']) > 0:
65+
errors = content['errors']
66+
for error in errors:
67+
if 'locations' in error:
68+
del error['locations']
69+
raise ExternalError(errors)
70+
except json.decoder.JSONDecodeError:
71+
raise InternalError
72+
except requests.exceptions.ConnectionError:
73+
raise ConnectionError
74+
return content

zenaton/traits/zenatonable.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
from ..engine import Engine
22
from ..query.builder import Builder
3+
from zenaton.exceptions import InvalidArgumentError
34

45

56
class Zenatonable:
67
"""
7-
Sends self as the single job to be executed
8+
Sends self as the single job to be dispatched
89
to the engine and returns the result
910
"""
1011
def dispatch(self):
1112
return Engine().dispatch([self])
1213

1314
"""
14-
Sends self as the single job to be dispatched
15+
Sends self as the single job to be scheduled
16+
to the engine and returns the result
17+
"""
18+
def schedule(self, cron):
19+
if not isinstance(cron, str) or cron == "":
20+
raise InvalidArgumentError("Param passed to 'schedule' function must be a non empty cron string")
21+
22+
return Engine().schedule([self], cron)
23+
24+
"""
25+
Sends self as the single job to be executed
1526
to the engine and returns the result
1627
"""
1728
def execute(self):

0 commit comments

Comments
 (0)