forked from airikka/spo2cms50dplus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspo2cms50d+
executable file
·136 lines (118 loc) · 3.51 KB
/
spo2cms50d+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python2
# Copyright (c) 2016 Tommi Airikka <tommi@airikka.net>
# License: GPLv2
import sys, struct, serial, argparse
from dateutil.parser import parse
#####################
##### Variables #####
#####################
parser = argparse.ArgumentParser(description='Download stored data from a CMS50D+ oximeter.')
parser.add_argument('device', type=str, help='path to device file')
parser.add_argument('outfile', type=str, help='output file path')
parser.add_argument('-s', '--start-time', dest='starttime', type=str, help='start time (\"YYYY-MM-DD HH:MM:SS\")')
args = parser.parse_args()
device = args.device
outfile = args.outfile
starttime = args.starttime
ser = serial.Serial()
###################
##### Helpers #####
###################
# Pack little endian
def _ple(i):
return struct.pack("<I", i)
def _get_real_values((val1,val2)):
oval1 = ord(val1)
oval2 = ord(val2)
v1 = oval1 - 0x80
v2 = oval2 - 0x80
if v1 == 0 or oval1 == 0xFF:
v1 = 127
if v2 == 0 or oval2 == 0xFF:
v2 = 255
return (v1,v2)
def _parse_list(toparse, parsed):
while toparse:
if len(toparse) > 1:
fs = (toparse.pop(0),toparse.pop(0))
if not fs == ('\x0f','\x80'):
(f,s) = _get_real_values(fs)
if f >= 0 and s >= 0:
parsed.append(f)
parsed.append(s)
else:
toparse.pop(0)
#####################
##### Functions #####
#####################
def configure_serial(ser):
ser.baudrate = 115200 # 115200
ser.bytesize = serial.EIGHTBITS # 8
ser.parity = serial.PARITY_NONE # N
ser.stopbits = serial.STOPBITS_ONE # 1
ser.xonxoff = 1 # XON/XOFF flow control
ser.timeout = 1
ser.port = device
def get_raw_data(ser):
sys.stdout.write("Connecting to device...")
sys.stdout.flush()
ser.open()
sys.stdout.write("reading...")
sys.stdout.flush()
ser.write(b'\x7d\x81\xa6')
raw = list(ser.readall())
ser.close()
if len(raw) <= 1:
print("no data received. Is the device on?")
exit(43)
print("done!")
return raw
def parse_raw_data(data):
sys.stdout.write("Parsing data...", )
sys.stdout.flush()
parsed = []
_parse_list(data, parsed)
print("done!")
return parsed
def get_len_of_parsed_data(parsed):
return len(parsed) / 2 # 1Hz, two values (pulse and sats)
def write_to_file(parsed, total_len, f):
sys.stdout.write("Writing to file...", )
sys.stdout.flush()
zeroval = _ple(0)
f.write(_ple(856))
f.write(_ple(1))
for _ in range(212):
f.write(zeroval)
f.write(_ple(1))
for _ in range(55):
f.write(zeroval)
f.write(_ple(total_len))
for e in parsed:
f.write(chr(e))
def change_starttime(f):
if starttime != None:
sys.stdout.write("changing start time...", )
sys.stdout.flush()
dt = parse(starttime, dayfirst=False, yearfirst=True)
year = _ple(dt.year)
month = _ple(dt.month)
day = _ple(dt.day)
hour = _ple(dt.hour)
minute = _ple(dt.minute)
second = _ple(dt.second)
f.seek(0x420)
s = year + month + day + hour + minute + second
f.write(s)
################
##### Main #####
################
configure_serial(ser)
data = get_raw_data(ser)
parsed = parse_raw_data(data)
total_len = get_len_of_parsed_data(parsed)
f = open(outfile, 'wb')
write_to_file(parsed, total_len, f)
change_starttime(f)
f.close()
print("done!")