-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathx0smartpower.py
172 lines (132 loc) · 5.76 KB
/
x0smartpower.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
##
## imports
##
import time
import x0opcmd
import x0refcmd
import logging
##
## globals
##
##
## code
##
class X0SmartPower:
"""Change power state and ensure it changes"""
def __init__(self, inComm, useLog, timeoutConfig):
self.log = useLog
self.cfg = timeoutConfig
self.desired = None
self.jvcip = inComm
self.state = ''
self.timeout = 0
self.timeoutOffset = timeoutConfig['jvcPowerTimeout']
self.operation = None
self.delayValidationUntil = 0
def action(self):
# 1. Sent Power State
# 2. Validate Power State
# 3. Not not correct state, go back to 1
# Flow:
# ''
# validating
# potentially finish with timeout
# potentially finish successfully
# potentilly go to delay-validation
# validating
# potentially go to delay-restart
# restart
# setting
# validating
if not self.desired:
self.log.error('Called with nothing to do')
# True means nothing more to do
return (True,None)
if self.state == '':
self.log.debug(f'State "{self.state}"')
self.log.info(f'First checking whether JVC power state is {self.desired}')
# we set this timeout once regardless of how many verifications or restarts occur
# this timeout is the max time the process can take regardless of what individual steps occur
self.timeout = time.time() + self.timeoutOffset
# build the reference command
self.operation = x0refcmd.X0RefCmd(self.jvcip, self.log, self.cfg)
self.operation.set('PW',b'')
self.state = 'validating'
elif self.state == 'restart':
self.log.debug(f'State "{self.state}"')
self.log.info(f'Telling JVC to set power state to {self.desired}')
# need to make a opcmd
self.operation = x0opcmd.X0OpCmd(self.jvcip, self.log, self.cfg)
self.operation.set('PW',self.desired)
self.state = 'setting'
elif self.state == 'setting':
finished, rsp = self.operation.action()
if finished:
self.log.info(f'JVC performed power operation and returned {rsp}')
# now we need to verify the JVC actually did what it said
self.operation = x0refcmd.X0RefCmd(self.jvcip, self.log, self.cfg)
self.operation.set('PW',b'')
self.state = 'validating'
elif self.state == 'validating':
finished, rsp = self.operation.action()
if finished:
self.log.debug(f'JVC says power state is rsp:{rsp}')
if time.time() > self.timeout:
# Too much time has passed
# unfortuntaely there's nothing else we can do, we are giving up
self.log.error(f'UNABLE TO GET JVC TO CHANGE POWER STATE. rsp:{rsp}')
self.state = ''
self.desired = None
else:
# 0 - standby
# 1 - lamp on
# 2 - cooling
# 3 - reserved (warming up?)
# 4 - error
if (rsp == b'3') or (rsp == b'2'):
# JVC is apparently warming up or cooling down
s = 'WARMING UP'
if rsp == b'2':
s = 'COOLING DOWN'
self.log.info(f'JVC is {s}... Waiting... Desired: {self.desired} rsp: {rsp}')
# now we need to verify again -- see if we come into a full mode (1-on or 0-standby)
self.delayValidationUntil = time.time() + 2
self.state = 'delay-validation'
elif ((self.desired == b'1' and rsp == b'1') or (self.desired == b'0' and rsp == b'0')):
self.log.info(f'SUCCESS! Power state is as expected. desired:{self.desired} rsp:{rsp}')
self.state = ''
self.desired = None
else:
self.log.info(f'Inconsistent Power State, attempting to change. desired:{self.desired} rsp:{rsp}')
self.state = 'delay-restart'
self.delayRestartUntil = time.time() + 1
elif self.state == 'delay-validation':
if time.time() > self.delayValidationUntil:
self.log.debug('delay-validation complete, validating power state again')
self.operation = x0refcmd.X0RefCmd(self.jvcip, self.log, self.cfg)
self.operation.set('PW',b'')
self.state = 'validating'
elif self.state == 'delay-restart':
if time.time() > self.delayRestartUntil:
self.log.debug('delay-restart complete, attempting to change power state')
self.state = 'restart'
else:
self.log.error(f'**** YIKES {self.state}')
if not self.desired:
# we are done
return (True,None)
else:
# more to do
return (False,None)
def set(self, cmd:str, data: bytearray):
if self.desired:
# same mode
self.log.error(f'!!!! Already sending command {self.desired} {cmd} {data}')
else:
self.desired = b'0'
if cmd == 'on':
self.desired = b'1'
self.log.info(f'Asked to set Power State to {self.desired}')
# definitely need to restart the state machine
self.state = ''
self.action()