9
9
from fastapi .responses import Response , StreamingResponse
10
10
from fastapi .staticfiles import StaticFiles
11
11
from streamer import Streamer
12
+ from concurrent .futures import ThreadPoolExecutor
12
13
from prometheus_fastapi_instrumentator import Instrumentator
13
14
14
15
import tracemalloc
@@ -55,11 +56,10 @@ async def detect(detect_request: odrpc.DetectRequest, response: Response):
55
56
async def detect_stream (websocket : WebSocket ):
56
57
await websocket .accept ()
57
58
detect_responses = asyncio .Queue ()
59
+ executor = ThreadPoolExecutor ()
58
60
async def detect_handle (detect_request : odrpc .DetectRequest ):
59
- loop = asyncio .get_event_loop ()
60
- fut = loop .run_in_executor (None , self .doods .detect , detect_request )
61
61
try :
62
- detect_response = await asyncio . wait_for ( fut , 300 ) # Kill it and exit after 5 minutes
62
+ detect_response = self . doods . detect ( detect_request )
63
63
if detect_request .image :
64
64
detect_response .image = base64 .b64encode (detect_response .image )
65
65
await detect_responses .put (detect_response )
@@ -68,9 +68,10 @@ async def detect_handle(detect_request: odrpc.DetectRequest):
68
68
except Exception as e :
69
69
self .logger .error ("Exception({0}):{1!r}" .format (type (e ).__name__ , e .args ))
70
70
71
- def detect_thread (loop , detect_request : odrpc .DetectRequest ):
71
+ def detect_thread (detect_request : odrpc .DetectRequest ):
72
+ loop = asyncio .new_event_loop ()
73
+ asyncio .set_event_loop (loop )
72
74
try :
73
- asyncio .set_event_loop (loop )
74
75
loop .run_until_complete (detect_handle (detect_request ))
75
76
loop .close ()
76
77
except Exception as e :
@@ -91,16 +92,17 @@ async def send_detect_responses():
91
92
try :
92
93
detect_config = await websocket .receive_json ()
93
94
detect_request = odrpc .DetectRequest (** detect_config )
94
- loop = asyncio .new_event_loop ()
95
- threading .Thread (target = detect_thread , args = (loop , detect_request ,)).start ()
95
+ executor .submit (detect_thread , detect_request )
96
96
except TypeError :
97
97
await detect_responses .put (odrpc .DetectResponse (error = 'could not parse request body' ))
98
98
except WebSocketDisconnect :
99
99
send_detect_responses_task .cancel ()
100
+ executor .shutdown ()
100
101
break
101
102
except Exception as e :
102
103
self .logger .error ("Exception({0}):{1!r}" .format (type (e ).__name__ , e .args ))
103
104
send_detect_responses_task .cancel ()
105
+ executor .shutdown ()
104
106
break
105
107
106
108
@self .api .post ("/image" )
0 commit comments