Skip to content

Commit f543e54

Browse files
committed
Poll enet in a separate process to avoid timeouts.
1 parent 1c42641 commit f543e54

File tree

2 files changed

+129
-73
lines changed

2 files changed

+129
-73
lines changed

melee/console.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,8 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
485485
print("WARNING: Something went wrong unpacking events. Data is probably missing")
486486
print("\tDidn't have enough data for event")
487487
return False
488-
if EventType(event_bytes[0]) == EventType.PAYLOADS:
488+
event_type = EventType(event_bytes[0])
489+
if event_type == EventType.PAYLOADS:
489490
cursor = 0x2
490491
payload_size = event_bytes[1]
491492
num_commands = (payload_size - 1) // 3
@@ -496,10 +497,10 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
496497
cursor += 3
497498
event_bytes = event_bytes[payload_size + 1:]
498499

499-
elif EventType(event_bytes[0]) == EventType.FRAME_START:
500+
elif event_type == EventType.FRAME_START:
500501
event_bytes = event_bytes[event_size:]
501502

502-
elif EventType(event_bytes[0]) == EventType.GAME_START:
503+
elif event_type == EventType.GAME_START:
503504
self.__game_start(gamestate, event_bytes)
504505
event_bytes = event_bytes[event_size:]
505506
# The game needs to know what to press on the first frame of the game
@@ -508,22 +509,22 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
508509
controller.release_all()
509510
controller.flush()
510511

511-
elif EventType(event_bytes[0]) == EventType.GAME_END:
512+
elif event_type == EventType.GAME_END:
512513
event_bytes = event_bytes[event_size:]
513514
return self._use_manual_bookends
514515

515-
elif EventType(event_bytes[0]) == EventType.PRE_FRAME:
516+
elif event_type == EventType.PRE_FRAME:
516517
self.__pre_frame(gamestate, event_bytes)
517518
event_bytes = event_bytes[event_size:]
518519

519-
elif EventType(event_bytes[0]) == EventType.POST_FRAME:
520+
elif event_type == EventType.POST_FRAME:
520521
self.__post_frame(gamestate, event_bytes)
521522
event_bytes = event_bytes[event_size:]
522523

523-
elif EventType(event_bytes[0]) == EventType.GECKO_CODES:
524+
elif event_type == EventType.GECKO_CODES:
524525
event_bytes = event_bytes[event_size:]
525526

526-
elif EventType(event_bytes[0]) == EventType.FRAME_BOOKEND:
527+
elif event_type == EventType.FRAME_BOOKEND:
527528
self.__frame_bookend(gamestate, event_bytes)
528529
event_bytes = event_bytes[event_size:]
529530
# If this is an old frame, then don't return it.
@@ -532,7 +533,7 @@ def __handle_slippstream_events(self, event_bytes, gamestate):
532533
self._frame = gamestate.frame
533534
return True
534535

535-
elif EventType(event_bytes[0]) == EventType.ITEM_UPDATE:
536+
elif event_type == EventType.ITEM_UPDATE:
536537
self.__item_update(gamestate, event_bytes)
537538
event_bytes = event_bytes[event_size:]
538539

melee/slippstream.py

Lines changed: 119 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
(i.e. the Project Slippi fork of Nintendont or Slippi Ishiiruka).
66
"""
77

8-
import socket
8+
from audioop import add
99
from enum import Enum
1010
import enet
1111
import json
12+
import multiprocessing as mp
13+
from multiprocessing.connection import Connection
14+
from multiprocessing.synchronize import Event
1215

1316
# pylint: disable=too-few-public-methods
1417
class EventType(Enum):
@@ -30,88 +33,140 @@ class CommType(Enum):
3033
KEEPALIVE = 0x03
3134
MENU = 0x04
3235

33-
class SlippstreamClient():
34-
""" Container representing a client to some SlippiComm server """
3536

36-
def __init__(self, address="127.0.0.1", port=51441, realtime=True):
37-
""" Constructor for this object """
38-
self._host = enet.Host(None, 1, 0, 0)
39-
self._peer = None
40-
self.buf = bytearray()
41-
self.realtime = realtime
37+
class SlippstreamWorker:
38+
def __init__(
39+
self,
40+
address: str,
41+
port: int,
42+
buffer: Connection,
43+
shutdown: Event,
44+
):
4245
self.address = address
4346
self.port = port
44-
# Not yet supported
45-
self.playedOn = "dolphin"
46-
self.timestamp = ""
47-
self.consoleNick = ""
48-
self.players = {}
49-
50-
def shutdown(self):
51-
""" Close down the socket and connection to the console """
52-
if self._peer:
53-
self._peer.send(0, enet.Packet())
54-
self._host.service(100)
55-
self._peer.disconnect()
56-
self._peer = None
47+
self._buffer = buffer
48+
self._shutdown = shutdown
5749

58-
if self._host:
59-
self._host = None
60-
return False
50+
self._host = enet.Host(None, 1, 0, 0)
51+
self._peer = None
6152

62-
def dispatch(self, polling_mode):
63-
"""Dispatch messages with the peer (read and write packets)"""
64-
event = None
65-
event_type = 1000
66-
while event_type not in [enet.EVENT_TYPE_RECEIVE]:
67-
wait_time = 1000
68-
if polling_mode:
69-
wait_time = 0
70-
event = self._host.service(wait_time)
71-
event_type = event.type
53+
self._handshake_data = json.dumps({
54+
"type" : "connect_request",
55+
"cursor" : 0,
56+
}).encode()
7257

73-
if event.type == enet.EVENT_TYPE_NONE:
74-
if polling_mode:
75-
return None
76-
if event.type == enet.EVENT_TYPE_RECEIVE:
77-
try:
78-
return json.loads(event.packet.data)
79-
except json.JSONDecodeError:
80-
# This happens at the end of a game for some reason?
81-
if len(event.packet.data) == 0:
82-
event_type = 0
83-
continue
84-
return None
85-
elif event.type == enet.EVENT_TYPE_CONNECT:
86-
handshake = json.dumps({
87-
"type" : "connect_request",
88-
"cursor" : 0,
89-
})
90-
self._peer.send(0, enet.Packet(handshake.encode()))
91-
elif event.type == enet.EVENT_TYPE_DISCONNECT:
92-
return None
93-
return None
58+
def _send_handshake(self):
59+
self._peer.send(0, enet.Packet(self._handshake_data))
9460

95-
def connect(self):
96-
""" Connect to the server
61+
def connect(self) -> bool:
62+
"""Connect to the server
9763
9864
Returns True on success, False on failure
9965
"""
10066
# Try to connect to the server and send a handshake
10167
try:
102-
self._peer = self._host.connect(enet.Address(bytes(self.address, 'utf-8'), int(self.port)), 1)
68+
self._peer = self._host.connect(
69+
enet.Address(bytes(self.address, 'utf-8'), self.port), 1)
10370
except OSError:
10471
return False
10572
try:
10673
for _ in range(4):
10774
event = self._host.service(1000)
10875
if event.type == enet.EVENT_TYPE_CONNECT:
109-
handshake = json.dumps({
110-
"type" : "connect_request",
111-
"cursor" : 0,
112-
})
113-
self._peer.send(0, enet.Packet(handshake.encode()))
76+
self._send_handshake()
11477
return True
11578
return False
11679
except OSError:
11780
return False
81+
82+
def run(self):
83+
connected = self.connect()
84+
self._buffer.send(connected)
85+
if not connected:
86+
return
87+
88+
while not self._shutdown.is_set():
89+
event = self._host.service(1000)
90+
91+
if event.type == enet.EVENT_TYPE_NONE:
92+
continue # timeout
93+
elif event.type == enet.EVENT_TYPE_RECEIVE:
94+
# This happens at the end of a game for some reason?
95+
if len(event.packet.data) == 0:
96+
# TODO: figure out what to do in this case
97+
continue
98+
self._buffer.send_bytes(event.packet.data)
99+
elif event.type == enet.EVENT_TYPE_CONNECT:
100+
# should this happen during the run loop?
101+
self._send_handshake()
102+
elif event.type == enet.EVENT_TYPE_DISCONNECT:
103+
self._buffer.close()
104+
return
105+
106+
def _run_worker(**kwargs):
107+
SlippstreamWorker(**kwargs).run()
108+
109+
class EnetDisconnected(Exception):
110+
"""Raised when we get an enet disconnection."""
111+
112+
class SlippstreamClient:
113+
""" Container representing a client to some SlippiComm server """
114+
115+
def __init__(
116+
self,
117+
address="127.0.0.1",
118+
port=51441,
119+
):
120+
self.address = address
121+
self.port = port
122+
self.running = False
123+
124+
# set up worker process
125+
self._buffer, worker_buffer = mp.Pipe(False)
126+
self._shutdown = mp.Event()
127+
self._worker = mp.Process(
128+
target=_run_worker,
129+
kwargs=dict(
130+
address=address,
131+
port=port,
132+
buffer=worker_buffer,
133+
shutdown=self._shutdown,
134+
)
135+
)
136+
137+
# Not yet supported
138+
self.playedOn = "dolphin"
139+
self.timestamp = ""
140+
self.consoleNick = ""
141+
self.players = {}
142+
143+
def shutdown(self):
144+
""" Close down the socket and connection to the console """
145+
if self._worker:
146+
self._shutdown.set()
147+
self._worker.join()
148+
self._buffer.close()
149+
self._worker = None
150+
self.running = False
151+
152+
def dispatch(self, polling_mode: bool):
153+
"""Dispatch messages with the peer (read and write packets)"""
154+
assert self.running, "Can only dispatch while running."
155+
156+
try:
157+
if polling_mode and not self._buffer.poll():
158+
return None
159+
message_bytes = self._buffer.recv_bytes()
160+
except EOFError:
161+
raise EnetDisconnected()
162+
163+
return json.loads(message_bytes)
164+
165+
def connect(self) -> bool:
166+
self._worker.start()
167+
connected = self._buffer.recv()
168+
if not connected:
169+
self.shutdown()
170+
else:
171+
self.running = True
172+
return connected

0 commit comments

Comments
 (0)