Skip to content

Commit 84dc24c

Browse files
committed
Poll enet in a separate process to avoid timeouts.
1 parent 1c42641 commit 84dc24c

File tree

2 files changed

+144
-67
lines changed

2 files changed

+144
-67
lines changed

melee/console.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,8 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
485485
print("WARNING: Something went wrong unpacking events. Data is probably missing")
486486
print("\tDidn't have enough data for event")
487487
return False
488-
if EventType(event_bytes[0]) == EventType.PAYLOADS:
488+
event_type = EventType(event_bytes[0])
489+
if event_type == EventType.PAYLOADS:
489490
cursor = 0x2
490491
payload_size = event_bytes[1]
491492
num_commands = (payload_size - 1) // 3
@@ -496,10 +497,10 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
496497
cursor += 3
497498
event_bytes = event_bytes[payload_size + 1:]
498499

499-
elif EventType(event_bytes[0]) == EventType.FRAME_START:
500+
elif event_type == EventType.FRAME_START:
500501
event_bytes = event_bytes[event_size:]
501502

502-
elif EventType(event_bytes[0]) == EventType.GAME_START:
503+
elif event_type == EventType.GAME_START:
503504
self.__game_start(gamestate, event_bytes)
504505
event_bytes = event_bytes[event_size:]
505506
# The game needs to know what to press on the first frame of the game
@@ -508,22 +509,22 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
508509
controller.release_all()
509510
controller.flush()
510511

511-
elif EventType(event_bytes[0]) == EventType.GAME_END:
512+
elif event_type == EventType.GAME_END:
512513
event_bytes = event_bytes[event_size:]
513514
return self._use_manual_bookends
514515

515-
elif EventType(event_bytes[0]) == EventType.PRE_FRAME:
516+
elif event_type == EventType.PRE_FRAME:
516517
self.__pre_frame(gamestate, event_bytes)
517518
event_bytes = event_bytes[event_size:]
518519

519-
elif EventType(event_bytes[0]) == EventType.POST_FRAME:
520+
elif event_type == EventType.POST_FRAME:
520521
self.__post_frame(gamestate, event_bytes)
521522
event_bytes = event_bytes[event_size:]
522523

523-
elif EventType(event_bytes[0]) == EventType.GECKO_CODES:
524+
elif event_type == EventType.GECKO_CODES:
524525
event_bytes = event_bytes[event_size:]
525526

526-
elif EventType(event_bytes[0]) == EventType.FRAME_BOOKEND:
527+
elif event_type == EventType.FRAME_BOOKEND:
527528
self.__frame_bookend(gamestate, event_bytes)
528529
event_bytes = event_bytes[event_size:]
529530
# If this is an old frame, then don't return it.
@@ -532,7 +533,7 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
532533
self._frame = gamestate.frame
533534
return True
534535

535-
elif EventType(event_bytes[0]) == EventType.ITEM_UPDATE:
536+
elif event_type == EventType.ITEM_UPDATE:
536537
self.__item_update(gamestate, event_bytes)
537538
event_bytes = event_bytes[event_size:]
538539

melee/slippstream.py

Lines changed: 134 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
(i.e. the Project Slippi fork of Nintendont or Slippi Ishiiruka).
66
"""
77

8-
import socket
8+
from audioop import add
99
from enum import Enum
1010
import enet
1111
import json
12+
import multiprocessing as mp
13+
from multiprocessing.connection import Connection
14+
from multiprocessing.synchronize import Event
1215

1316
# pylint: disable=too-few-public-methods
1417
class EventType(Enum):
@@ -30,88 +33,161 @@ class CommType(Enum):
3033
KEEPALIVE = 0x03
3134
MENU = 0x04
3235

33-
class SlippstreamClient():
34-
""" Container representing a client to some SlippiComm server """
3536

36-
def __init__(self, address="127.0.0.1", port=51441, realtime=True):
37-
""" Constructor for this object """
37+
class SlippstreamWorker:
38+
def __init__(
39+
self,
40+
address: str,
41+
port: int,
42+
buffer: Connection,
43+
shutdown: Event,
44+
):
45+
self.address = address
46+
self.port = port
47+
self._buffer = buffer
48+
self._shutdown = shutdown
49+
3850
self._host = enet.Host(None, 1, 0, 0)
3951
self._peer = None
40-
self.buf = bytearray()
41-
self.realtime = realtime
52+
53+
self._handshake_data = json.dumps({
54+
"type" : "connect_request",
55+
"cursor" : 0,
56+
}).encode()
57+
58+
def _send_handshake(self):
59+
self._peer.send(0, enet.Packet(self._handshake_data))
60+
61+
def connect(self) -> bool:
62+
"""Connect to the server
63+
64+
Returns True on success, False on failure
65+
"""
66+
# Try to connect to the server and send a handshake
67+
try:
68+
self._peer = self._host.connect(
69+
enet.Address(bytes(self.address, 'utf-8'), self.port), 1)
70+
except OSError:
71+
return False
72+
try:
73+
for _ in range(4):
74+
event = self._host.service(1000)
75+
if event.type == enet.EVENT_TYPE_CONNECT:
76+
self._send_handshake()
77+
return True
78+
return False
79+
except OSError:
80+
return False
81+
82+
def run(self):
83+
connected = self.connect()
84+
self._buffer.send(connected)
85+
if not connected:
86+
return
87+
88+
while not self._shutdown.is_set():
89+
event = self._host.service(1000)
90+
91+
if event.type == enet.EVENT_TYPE_NONE:
92+
continue # timeout
93+
elif event.type == enet.EVENT_TYPE_RECEIVE:
94+
self._buffer.send_bytes(event.packet.data)
95+
elif event.type == enet.EVENT_TYPE_CONNECT:
96+
# should this happen during the run loop?
97+
self._send_handshake()
98+
elif event.type == enet.EVENT_TYPE_DISCONNECT:
99+
self._buffer.close()
100+
return
101+
102+
def _run_worker(**kwargs):
103+
SlippstreamWorker(**kwargs).run()
104+
105+
class EnetDisconnected(Exception):
106+
"""Raised when we get an enet disconnection."""
107+
108+
class SlippstreamClient:
109+
""" Container representing a client to some SlippiComm server """
110+
111+
def __init__(
112+
self,
113+
address="127.0.0.1",
114+
port=51441,
115+
):
42116
self.address = address
43117
self.port = port
118+
self.running = False
119+
120+
# set up worker process
121+
self._buffer, worker_buffer = mp.Pipe(False)
122+
self._shutdown = mp.Event()
123+
self._worker = mp.Process(
124+
target=_run_worker,
125+
kwargs=dict(
126+
address=address,
127+
port=port,
128+
buffer=worker_buffer,
129+
shutdown=self._shutdown,
130+
)
131+
)
132+
44133
# Not yet supported
45134
self.playedOn = "dolphin"
46135
self.timestamp = ""
47136
self.consoleNick = ""
48137
self.players = {}
49138

50-
def shutdown(self):
51-
""" Close down the socket and connection to the console """
52-
if self._peer:
53-
self._peer.send(0, enet.Packet())
54-
self._host.service(100)
55-
self._peer.disconnect()
56-
self._peer = None
57-
58-
if self._host:
59-
self._host = None
60-
return False
61-
62-
def dispatch(self, polling_mode):
63-
"""Dispatch messages with the peer (read and write packets)"""
64-
event = None
65-
event_type = 1000
66-
while event_type not in [enet.EVENT_TYPE_RECEIVE]:
67-
wait_time = 1000
68-
if polling_mode:
69-
wait_time = 0
70-
event = self._host.service(wait_time)
71-
event_type = event.type
139+
def _read_messages(self):
140+
while self.running:
141+
event = self._host.service(1000)
72142

73143
if event.type == enet.EVENT_TYPE_NONE:
74-
if polling_mode:
75-
return None
76-
if event.type == enet.EVENT_TYPE_RECEIVE:
144+
continue # timeout
145+
elif event.type == enet.EVENT_TYPE_RECEIVE:
77146
try:
78-
return json.loads(event.packet.data)
147+
message = json.loads(event.packet.data)
148+
print(message["type"])
149+
self._buffer.put(message)
79150
except json.JSONDecodeError:
80151
# This happens at the end of a game for some reason?
81152
if len(event.packet.data) == 0:
82-
event_type = 0
153+
# TODO: figure out what to do in this case
83154
continue
84-
return None
85155
elif event.type == enet.EVENT_TYPE_CONNECT:
86156
handshake = json.dumps({
87157
"type" : "connect_request",
88158
"cursor" : 0,
89159
})
90160
self._peer.send(0, enet.Packet(handshake.encode()))
91161
elif event.type == enet.EVENT_TYPE_DISCONNECT:
92-
return None
93-
return None
162+
raise RuntimeError('enet disconnected')
94163

95-
def connect(self):
96-
""" Connect to the server
164+
def shutdown(self):
165+
""" Close down the socket and connection to the console """
166+
if self._worker:
167+
self._shutdown.set()
168+
self._worker.join()
169+
self._buffer.close()
170+
self._worker = None
171+
self.running = False
172+
173+
def dispatch(self, polling_mode: bool):
174+
"""Dispatch messages with the peer (read and write packets)"""
175+
assert self.running, "Can only dispatch while running."
97176

98-
Returns True on success, False on failure
99-
"""
100-
# Try to connect to the server and send a handshake
101-
try:
102-
self._peer = self._host.connect(enet.Address(bytes(self.address, 'utf-8'), int(self.port)), 1)
103-
except OSError:
104-
return False
105177
try:
106-
for _ in range(4):
107-
event = self._host.service(1000)
108-
if event.type == enet.EVENT_TYPE_CONNECT:
109-
handshake = json.dumps({
110-
"type" : "connect_request",
111-
"cursor" : 0,
112-
})
113-
self._peer.send(0, enet.Packet(handshake.encode()))
114-
return True
115-
return False
116-
except OSError:
117-
return False
178+
if polling_mode and not self._buffer.poll():
179+
return None
180+
message_bytes = self._buffer.recv_bytes()
181+
except EOFError:
182+
raise EnetDisconnected()
183+
184+
return json.loads(message_bytes)
185+
186+
def connect(self) -> bool:
187+
self._worker.start()
188+
connected = self._buffer.recv()
189+
if not connected:
190+
self.shutdown()
191+
else:
192+
self.running = True
193+
return connected

0 commit comments

Comments
 (0)