Skip to content

Commit 428eb55

Browse files
jorwoodsJordan Woods
and
Jordan Woods
authored
Add FlowRun Item and Endpoints. (#884)
* Add tests for fetching flow runs * Implement basics of FlowRuns * Add tests for cancel flow run * Make FlowRuns a Queryset endpoint for easier filtering * Add test for flow refresh endpoint * Align to naming conventions * Apply name change consistently * Change flowrun_id into flow_run_id * Add wait_for_job to FlowRun * Tag wait_for_job with version number * Rewrite flow_run to use ExponentialBackoffTimer * Test flow run wait with backoff * Remove 3.5 from test matrix * Standardize spelling of cancelled Co-authored-by: Jordan Woods <Jordan.Woods@mkcorp.com>
1 parent a8b3424 commit 428eb55

18 files changed

+410
-4
lines changed

tableauserverclient/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
FlowItem,
3535
WebhookItem,
3636
PersonalAccessTokenAuth,
37+
FlowRunItem
3738
)
3839
from .server import (
3940
RequestOptions,

tableauserverclient/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .favorites_item import FavoriteItem
1111
from .group_item import GroupItem
1212
from .flow_item import FlowItem
13+
from .flow_run_item import FlowRunItem
1314
from .interval_item import (
1415
IntervalItem,
1516
DailyInterval,
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import xml.etree.ElementTree as ET
2+
from ..datetime_helpers import parse_datetime
3+
import itertools
4+
5+
6+
class FlowRunItem(object):
7+
def __init__(self) -> None:
8+
self._id=None
9+
self._flow_id=None
10+
self._status=None
11+
self._started_at=None
12+
self._completed_at=None
13+
self._progress=None
14+
self._background_job_id=None
15+
16+
17+
@property
18+
def id(self):
19+
return self._id
20+
21+
22+
@property
23+
def flow_id(self):
24+
return self._flow_id
25+
26+
27+
@property
28+
def status(self):
29+
return self._status
30+
31+
32+
@property
33+
def started_at(self):
34+
return self._started_at
35+
36+
37+
@property
38+
def completed_at(self):
39+
return self._completed_at
40+
41+
42+
@property
43+
def progress(self):
44+
return self._progress
45+
46+
47+
@property
48+
def background_job_id(self):
49+
return self._background_job_id
50+
51+
52+
def _set_values(
53+
self,
54+
id,
55+
flow_id,
56+
status,
57+
started_at,
58+
completed_at,
59+
progress,
60+
background_job_id,
61+
):
62+
if id is not None:
63+
self._id = id
64+
if flow_id is not None:
65+
self._flow_id = flow_id
66+
if status is not None:
67+
self._status = status
68+
if started_at is not None:
69+
self._started_at = started_at
70+
if completed_at is not None:
71+
self._completed_at = completed_at
72+
if progress is not None:
73+
self._progress = progress
74+
if background_job_id is not None:
75+
self._background_job_id = background_job_id
76+
77+
78+
@classmethod
79+
def from_response(cls, resp, ns):
80+
all_flowrun_items = list()
81+
parsed_response = ET.fromstring(resp)
82+
all_flowrun_xml = itertools.chain(
83+
parsed_response.findall(".//t:flowRun[@id]", namespaces=ns),
84+
parsed_response.findall(".//t:flowRuns[@id]", namespaces=ns)
85+
)
86+
87+
for flowrun_xml in all_flowrun_xml:
88+
parsed = cls._parse_element(flowrun_xml, ns)
89+
flowrun_item = cls()
90+
flowrun_item._set_values(**parsed)
91+
all_flowrun_items.append(flowrun_item)
92+
return all_flowrun_items
93+
94+
95+
@staticmethod
96+
def _parse_element(flowrun_xml, ns):
97+
result = {}
98+
result['id'] = flowrun_xml.get("id", None)
99+
result['flow_id'] = flowrun_xml.get("flowId", None)
100+
result['status'] = flowrun_xml.get("status", None)
101+
result['started_at'] = parse_datetime(flowrun_xml.get("startedAt", None))
102+
result['completed_at'] = parse_datetime(flowrun_xml.get("completedAt", None))
103+
result['progress'] = flowrun_xml.get("progress", None)
104+
result['background_job_id'] = flowrun_xml.get("backgroundJobId", None)
105+
106+
return result

tableauserverclient/models/job_item.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import xml.etree.ElementTree as ET
2+
from .flow_run_item import FlowRunItem
23
from ..datetime_helpers import parse_datetime
34

45

@@ -24,6 +25,7 @@ def __init__(
2425
finish_code=0,
2526
notes=None,
2627
mode=None,
28+
flow_run=None,
2729
):
2830
self._id = id_
2931
self._type = job_type
@@ -34,6 +36,7 @@ def __init__(
3436
self._finish_code = finish_code
3537
self._notes = notes or []
3638
self._mode = mode
39+
self._flow_run = flow_run
3740

3841
@property
3942
def id(self):
@@ -76,6 +79,14 @@ def mode(self, value):
7679
# check for valid data here
7780
self._mode = value
7881

82+
@property
83+
def flow_run(self):
84+
return self._flow_run
85+
86+
@flow_run.setter
87+
def flow_run(self, value):
88+
self._flow_run = value
89+
7990
def __repr__(self):
8091
return (
8192
"<Job#{_id} {_type} created_at({_created_at}) started_at({_started_at}) completed_at({_completed_at})"
@@ -102,6 +113,13 @@ def _parse_element(cls, element, ns):
102113
finish_code = int(element.get("finishCode", -1))
103114
notes = [note.text for note in element.findall(".//t:notes", namespaces=ns)] or None
104115
mode = element.get("mode", None)
116+
flow_run = None
117+
for flow_job in element.findall(".//t:runFlowJobType", namespaces=ns):
118+
flow_run = FlowRunItem()
119+
flow_run._id = flow_job.get("flowRunId", None)
120+
for flow in flow_job.findall(".//t:flow", namespaces=ns):
121+
flow_run._flow_id = flow.get("id", None)
122+
flow_run._started_at = created_at or started_at
105123
return cls(
106124
id_,
107125
type_,
@@ -112,6 +130,7 @@ def _parse_element(cls, element, ns):
112130
finish_code,
113131
notes,
114132
mode,
133+
flow_run,
115134
)
116135

117136

tableauserverclient/server/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
ColumnItem,
3333
FlowItem,
3434
WebhookItem,
35+
FlowRunItem
3536
)
3637
from .endpoint import (
3738
Auth,

tableauserverclient/server/endpoint/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .favorites_endpoint import Favorites
88
from .fileuploads_endpoint import Fileuploads
99
from .flows_endpoint import Flows
10+
from .flow_runs_endpoint import FlowRuns
1011
from .exceptions import (
1112
ServerResponseError,
1213
MissingRequiredFieldError,

tableauserverclient/server/endpoint/exceptions.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,16 @@ def __str__(self):
7575
return f"Job {self.job.id} failed with notes {self.notes}"
7676

7777

78-
class JobCanceledException(JobFailedException):
78+
class JobCancelledException(JobFailedException):
7979
pass
80+
class FlowRunFailedException(Exception):
81+
def __init__(self, flow_run):
82+
self.background_job_id = flow_run.background_job_id
83+
self.flow_run = flow_run
84+
85+
def __str__(self):
86+
return f"FlowRun {self.flow_run.id} failed with job id {self.background_job_id}"
87+
88+
89+
class FlowRunCancelledException(FlowRunFailedException):
90+
pass
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from .endpoint import Endpoint, QuerysetEndpoint, api
2+
from .exceptions import FlowRunFailedException, FlowRunCancelledException
3+
from .. import FlowRunItem, PaginationItem
4+
from ...exponential_backoff import ExponentialBackoffTimer
5+
6+
import logging
7+
8+
logger = logging.getLogger("tableau.endpoint.flowruns")
9+
10+
11+
class FlowRuns(QuerysetEndpoint):
12+
def __init__(self, parent_srv):
13+
super(FlowRuns, self).__init__(parent_srv)
14+
15+
@property
16+
def baseurl(self):
17+
return "{0}/sites/{1}/flows/runs".format(self.parent_srv.baseurl, self.parent_srv.site_id)
18+
19+
# Get all flows
20+
@api(version="3.10")
21+
def get(self, req_options=None):
22+
logger.info("Querying all flow runs on site")
23+
url = self.baseurl
24+
server_response = self.get_request(url, req_options)
25+
pagination_item = PaginationItem.from_response(server_response.content, self.parent_srv.namespace)
26+
all_flow_run_items = FlowRunItem.from_response(server_response.content, self.parent_srv.namespace)
27+
return all_flow_run_items, pagination_item
28+
29+
# Get 1 flow by id
30+
@api(version="3.10")
31+
def get_by_id(self, flow_run_id):
32+
if not flow_run_id:
33+
error = "Flow ID undefined."
34+
raise ValueError(error)
35+
logger.info("Querying single flow (ID: {0})".format(flow_run_id))
36+
url = "{0}/{1}".format(self.baseurl, flow_run_id)
37+
server_response = self.get_request(url)
38+
return FlowRunItem.from_response(server_response.content, self.parent_srv.namespace)[0]
39+
40+
41+
# Cancel 1 flow run by id
42+
@api(version="3.10")
43+
def cancel(self, flow_run_id):
44+
if not flow_run_id:
45+
error = "Flow ID undefined."
46+
raise ValueError(error)
47+
id_ = getattr(flow_run_id, 'id', flow_run_id)
48+
url = "{0}/{1}".format(self.baseurl, id_)
49+
self.put_request(url)
50+
logger.info("Deleted single flow (ID: {0})".format(id_))
51+
52+
53+
@api(version="3.10")
54+
def wait_for_job(self, flow_run_id, *, timeout=None):
55+
if isinstance(flow_run_id, FlowRunItem):
56+
flow_run_id = flow_run_id.id
57+
assert isinstance(flow_run_id, str)
58+
logger.debug(f"Waiting for flow run {flow_run_id}")
59+
60+
backoffTimer = ExponentialBackoffTimer(timeout=timeout)
61+
flow_run = self.get_by_id(flow_run_id)
62+
while flow_run.completed_at is None:
63+
backoffTimer.sleep()
64+
flow_run = self.get_by_id(flow_run_id)
65+
logger.debug(f"\tFlowRun {flow_run_id} progress={flow_run.progress}")
66+
67+
logger.info("FlowRun {} Completed: Status: {}".format(flow_run_id, flow_run.status))
68+
69+
if flow_run.status == "Success":
70+
return flow_run
71+
elif flow_run.status == "Failed":
72+
raise FlowRunFailedException(flow_run)
73+
elif flow_run.status == "Cancelled":
74+
raise FlowRunCancelledException(flow_run)
75+
else:
76+
raise AssertionError("Unexpected status in flow_run", flow_run)

tableauserverclient/server/endpoint/jobs_endpoint.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .endpoint import Endpoint, api
2-
from .exceptions import JobCanceledException, JobFailedException
2+
from .exceptions import JobCancelledException, JobFailedException
33
from .. import JobItem, BackgroundJobItem, PaginationItem
44
from ..request_options import RequestOptionsBase
55
from ...exponential_backoff import ExponentialBackoffTimer
@@ -44,6 +44,7 @@ def get_by_id(self, job_id):
4444
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
4545
return new_job
4646

47+
@api(version="2.6")
4748
def wait_for_job(self, job_id, *, timeout=None):
4849
if isinstance(job_id, JobItem):
4950
job_id = job_id.id
@@ -64,6 +65,6 @@ def wait_for_job(self, job_id, *, timeout=None):
6465
elif job.finish_code == JobItem.FinishCode.Failed:
6566
raise JobFailedException(job)
6667
elif job.finish_code == JobItem.FinishCode.Cancelled:
67-
raise JobCanceledException(job)
68+
raise JobCancelledException(job)
6869
else:
6970
raise AssertionError("Unexpected finish_code in job", job)

tableauserverclient/server/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
Favorites,
2626
DataAlerts,
2727
Fileuploads,
28+
FlowRuns
2829
)
2930
from .endpoint.exceptions import (
3031
EndpointUnavailableError,
@@ -85,6 +86,7 @@ def __init__(self, server_address, use_server_version=False):
8586
self.data_alerts = DataAlerts(self)
8687
self.fileuploads = Fileuploads(self)
8788
self._namespace = Namespace()
89+
self.flow_runs = FlowRuns(self)
8890

8991
if use_server_version:
9092
self.use_server_version()

test/_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ def sleep_mock(interval):
2929
def get_time():
3030
return mock_time
3131

32-
patch = unittest.mock.patch
32+
try:
33+
patch = unittest.mock.patch
34+
except AttributeError:
35+
from unittest.mock import patch
3336
with patch("time.sleep", sleep_mock), patch("time.time", get_time):
3437
yield get_time

test/assets/flow_refresh.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?xml version='1.0' encoding='UTF-8'?>
2+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.5.xsd">
3+
<job id="d1b2ccd0-6dfa-444a-aee4-723dbd6b7c9d"
4+
mode="Asynchronous"
5+
type="RunFlow"
6+
createdAt="2018-05-22T13:00:29Z">
7+
<runFlowJobType flowRunId="e0c3067f-2333-4eee-8028-e0a56ca496f6">
8+
<flow id="92967d2d-c7e2-46d0-8847-4802df58f484" name="FlowOne"/>
9+
</runFlowJobType>
10+
</job>
11+
</tsResponse>

test/assets/flow_runs_get.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.10.xsd">
2+
<pagination pageNumber="1" pageSize="100" totalAvailable="2"/>
3+
<flowRuns>
4+
<flowRuns id="cc2e652d-4a9b-4476-8c93-b238c45db968"
5+
flowId="587daa37-b84d-4400-a9a2-aa90e0be7837"
6+
status="Success"
7+
startedAt="2021-02-11T01:42:55Z"
8+
completedAt="2021-02-11T01:57:38Z"
9+
progress="100"
10+
backgroundJobId="aa23f4ac-906f-11e9-86fb-3f0f71412e77"/>
11+
<flowRuns id="a3104526-c0c6-4ea5-8362-e03fc7cbd7ee"
12+
flowId="5c36be69-eb30-461b-b66e-3e2a8e27cc35"
13+
status="Failed"
14+
startedAt="2021-02-13T04:05:30Z"
15+
completedAt="2021-02-13T04:05:35Z"
16+
progress="100"
17+
backgroundJobId="1ad21a9d-2530-4fbf-9064-efd3c736e023"/>
18+
</flowRuns>
19+
</tsResponse>

test/assets/flow_runs_get_by_id.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.10.xsd">
2+
<flowRun id="cc2e652d-4a9b-4476-8c93-b238c45db968"
3+
flowId="587daa37-b84d-4400-a9a2-aa90e0be7837"
4+
status="Success"
5+
startedAt="2021-02-11T01:42:55Z"
6+
completedAt="2021-02-11T01:57:38Z"
7+
progress="100"
8+
backgroundJobId="1ad21a9d-2530-4fbf-9064-efd3c736e023">
9+
</flowRun>
10+
</tsResponse>

0 commit comments

Comments
 (0)