Skip to content
This repository was archived by the owner on Sep 18, 2023. It is now read-only.

taking changes from upstream + trying poll activity error fix #23

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion cadence/activity_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def activity_task_loop(worker: Worker):
return
except Exception as ex:
logger.error("PollForActivityTask error: %s", ex)
continue
logger.error(f"Exiting")
break
if err:
logger.error("PollForActivityTask failed: %s", err)
continue
Expand Down
2 changes: 1 addition & 1 deletion cadence/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,4 @@ def call_function(self, call: ThriftFunctionCall) -> ThriftFunctionResponse:
raise Exception("Unexpected type: " + Frame.TYPE)
assert isinstance(frame, FrameWithArgs)
response.process_frame(frame)
return response
return response
5 changes: 5 additions & 0 deletions cadence/ioutils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from select import select
from socket import socket
from typing import IO, Callable
import logging


logger = logging.getLogger(__name__)
class IOWrapper:
def __init__(self, io_stream: IO, socket_: socket = None):
self.io_stream = io_stream
Expand All @@ -27,6 +29,9 @@ def read_or_eof(self, size, field):
self.socket.settimeout(timeout)
buf: bytes = self.io_stream.read(size)
if len(buf) != size:
logger.info(f"buffer content bytes: {buf}")
logger.info(f"buffer size: {len(buf)}")
logger.info(f"size: {size}")
raise EOFError(field)
return buf

Expand Down
2 changes: 1 addition & 1 deletion cadence/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def exec_workflow(workflow_client, wm: WorkflowMethod, args, workflow_options: W
start_request = create_start_workflow_request(workflow_client, wm, args)
start_response, err = workflow_client.service.start_workflow(start_request)
if err:
raise Exception(err)
raise Exception(repr(err))
execution = WorkflowExecution(workflow_id=start_request.workflow_id, run_id=start_response.run_id)
stub_instance._execution = execution
return WorkflowExecutionContext(workflow_type=wm._name, workflow_execution=execution)
Expand Down