mirror of
https://github.com/gryf/python-linak-desk-control.git
synced 2025-12-17 11:30:29 +01:00
Roughly pepify the code.
This commit is contained in:
@@ -12,11 +12,14 @@
|
|||||||
# You should have received a copy of the GNU General Public License along with this
|
# You should have received a copy of the GNU General Public License along with this
|
||||||
# program. If not, see <https://www.gnu.org/licenses/>.
|
# program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
from ctypes import sizeof
|
import argparse
|
||||||
|
import ctypes
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import usb1
|
import usb1
|
||||||
|
|
||||||
|
|
||||||
REQ_INIT = 0x0303
|
REQ_INIT = 0x0303
|
||||||
REQ_GET_STATUS = 0x0304
|
REQ_GET_STATUS = 0x0304
|
||||||
REQ_MOVE = 0x0305
|
REQ_MOVE = 0x0305
|
||||||
@@ -47,6 +50,7 @@ HEIGHT_MOVE_DOWNWARDS = 32767
|
|||||||
HEIGHT_MOVE_UPWARDS = 32768
|
HEIGHT_MOVE_UPWARDS = 32768
|
||||||
HEIGHT_MOVE_END = 32769
|
HEIGHT_MOVE_END = 32769
|
||||||
|
|
||||||
|
|
||||||
class Status(object):
|
class Status(object):
|
||||||
positionLost = True
|
positionLost = True
|
||||||
antiColision = True
|
antiColision = True
|
||||||
@@ -55,8 +59,8 @@ class Status(object):
|
|||||||
unknown = 4
|
unknown = 4
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def fromBuf(sr, buf):
|
def from_buf(cls, buf):
|
||||||
self = sr()
|
self = cls()
|
||||||
attr = ['positionLost', 'antiColision', 'overloadDown', 'overloadUp']
|
attr = ['positionLost', 'antiColision', 'overloadDown', 'overloadUp']
|
||||||
bitlist = '{:0>8s}'.format(bin(int(buf, base=16)).lstrip('0b'))
|
bitlist = '{:0>8s}'.format(bin(int(buf, base=16)).lstrip('0b'))
|
||||||
for i in range(0, 4):
|
for i in range(0, 4):
|
||||||
@@ -66,20 +70,22 @@ class Status(object):
|
|||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
class StatusPositionSpeed(object):
|
class StatusPositionSpeed(object):
|
||||||
pos = None
|
pos = None
|
||||||
status = None
|
status = None
|
||||||
speed = 0
|
speed = 0
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def fromBuf(sr, buf):
|
def from_buf(cls, buf):
|
||||||
self = sr()
|
self = cls()
|
||||||
self.pos = int(buf[2:4] + buf[:2], 16)
|
self.pos = int(buf[2:4] + buf[:2], 16)
|
||||||
self.status = Status.fromBuf(buf[4:6])
|
self.status = Status.from_buf(buf[4:6])
|
||||||
self.speed = int(buf[6:8], 16)
|
self.speed = int(buf[6:8], 16)
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
class ValidFlags(object):
|
class ValidFlags(object):
|
||||||
ID00_Ref1_pos_stat_speed = True
|
ID00_Ref1_pos_stat_speed = True
|
||||||
ID01_Ref2_pos_stat_speed = True
|
ID01_Ref2_pos_stat_speed = True
|
||||||
@@ -99,15 +105,31 @@ class ValidFlags(object):
|
|||||||
unknown = True
|
unknown = True
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def fromBuf(sr, buf):
|
def from_buf(cls, buf):
|
||||||
self = sr()
|
self = cls()
|
||||||
attr = ['ID00_Ref1_pos_stat_speed', 'ID01_Ref2_pos_stat_speed', 'ID02_Ref3_pos_stat_speed', 'ID03_Ref4_pos_stat_speed', 'ID10_Ref1_controlInput', 'ID11_Ref2_controlInput', 'ID12_Ref3_controlInput', 'ID13_Ref4_controlInput', 'ID04_Ref5_pos_stat_speed', 'ID28_Diagnostic', 'ID05_Ref6_pos_stat_speed', 'ID37_Handset1command', 'ID38_Handset2command', 'ID06_Ref7_pos_stat_speed', 'ID07_Ref8_pos_stat_speed', 'unknown']
|
attr = ['ID00_Ref1_pos_stat_speed',
|
||||||
|
'ID01_Ref2_pos_stat_speed',
|
||||||
|
'ID02_Ref3_pos_stat_speed',
|
||||||
|
'ID03_Ref4_pos_stat_speed',
|
||||||
|
'ID10_Ref1_controlInput',
|
||||||
|
'ID11_Ref2_controlInput',
|
||||||
|
'ID12_Ref3_controlInput',
|
||||||
|
'ID13_Ref4_controlInput',
|
||||||
|
'ID04_Ref5_pos_stat_speed',
|
||||||
|
'ID28_Diagnostic',
|
||||||
|
'ID05_Ref6_pos_stat_speed',
|
||||||
|
'ID37_Handset1command',
|
||||||
|
'ID38_Handset2command',
|
||||||
|
'ID06_Ref7_pos_stat_speed',
|
||||||
|
'ID07_Ref8_pos_stat_speed',
|
||||||
|
'unknown']
|
||||||
bitlist = '{:0>16s}'.format(bin(int(buf, base=16)).lstrip('0b'))
|
bitlist = '{:0>16s}'.format(bin(int(buf, base=16)).lstrip('0b'))
|
||||||
for i in range(0, len(bitlist)):
|
for i in range(0, len(bitlist)):
|
||||||
setattr(self, attr[i], True if bitlist[i] == '1' else False)
|
setattr(self, attr[i], True if bitlist[i] == '1' else False)
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
class StatusReport(object):
|
class StatusReport(object):
|
||||||
featureRaportID = 0
|
featureRaportID = 0
|
||||||
numberOfBytes = 0
|
numberOfBytes = 0
|
||||||
@@ -131,39 +153,40 @@ class StatusReport(object):
|
|||||||
undefined2 = None
|
undefined2 = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def fromBuf(sr, buf):
|
def from_buf(cls, buf):
|
||||||
self = sr()
|
self = cls()
|
||||||
raw = buf.hex()
|
raw = buf.hex()
|
||||||
self.featureRaportID = buf[0]
|
self.featureRaportID = buf[0]
|
||||||
self.numberOfBytes = buf[1]
|
self.numberOfBytes = buf[1]
|
||||||
self.validFlag = ValidFlags.fromBuf(raw[4:8])
|
self.validFlag = ValidFlags.from_buf(raw[4:8])
|
||||||
self.ref1 = StatusPositionSpeed.fromBuf(raw[8:8+8])
|
self.ref1 = StatusPositionSpeed.from_buf(raw[8:8+8])
|
||||||
self.ref2 = StatusPositionSpeed.fromBuf(raw[16:16+8])
|
self.ref2 = StatusPositionSpeed.from_buf(raw[16:16+8])
|
||||||
self.ref3 = StatusPositionSpeed.fromBuf(raw[24:24+8])
|
self.ref3 = StatusPositionSpeed.from_buf(raw[24:24+8])
|
||||||
self.ref4 = StatusPositionSpeed.fromBuf(raw[32:32+8])
|
self.ref4 = StatusPositionSpeed.from_buf(raw[32:32+8])
|
||||||
self.ref1cnt = int(raw[42:44] + raw[40:42], 16)
|
self.ref1cnt = int(raw[42:44] + raw[40:42], 16)
|
||||||
self.ref2cnt = int(raw[46:48] + raw[44:46], 16)
|
self.ref2cnt = int(raw[46:48] + raw[44:46], 16)
|
||||||
self.ref3cnt = int(raw[50:52] + raw[48:50], 16)
|
self.ref3cnt = int(raw[50:52] + raw[48:50], 16)
|
||||||
self.ref4cnt = int(raw[54:56] + raw[52:54], 16)
|
self.ref4cnt = int(raw[54:56] + raw[52:54], 16)
|
||||||
self.ref5 = StatusPositionSpeed.fromBuf(raw[56:56+8])
|
self.ref5 = StatusPositionSpeed.from_buf(raw[56:56+8])
|
||||||
self.diagnostic = raw[64:64+16]
|
self.diagnostic = raw[64:64+16]
|
||||||
self.undefined1 = raw[80:84]
|
self.undefined1 = raw[80:84]
|
||||||
self.handset1 = int(raw[86:88] + raw[84:86], 16)
|
self.handset1 = int(raw[86:88] + raw[84:86], 16)
|
||||||
self.handset2 = int(raw[88:90] + raw[86:88], 16)
|
self.handset2 = int(raw[88:90] + raw[86:88], 16)
|
||||||
self.ref6 = StatusPositionSpeed.fromBuf(raw[90:90+8])
|
self.ref6 = StatusPositionSpeed.from_buf(raw[90:90+8])
|
||||||
self.ref7 = StatusPositionSpeed.fromBuf(raw[98:98+8])
|
self.ref7 = StatusPositionSpeed.from_buf(raw[98:98+8])
|
||||||
self.ref8 = StatusPositionSpeed.fromBuf(raw[106:106+8])
|
self.ref8 = StatusPositionSpeed.from_buf(raw[106:106+8])
|
||||||
self.undefined2 = raw[114:]
|
self.undefined2 = raw[114:]
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
class LinakController(object):
|
class LinakController(object):
|
||||||
_handle = None
|
_handle = None
|
||||||
_ctx = None
|
_ctx = None
|
||||||
|
|
||||||
def __init__(self, vendor_id=0x12d3, product_id=0x0002):
|
def __init__(self, vendor_id=0x12d3, product_id=0x0002):
|
||||||
self._ctx =usb1.USBContext()
|
self._ctx = usb1.USBContext()
|
||||||
#self._ctx.setDebug(4)
|
# self._ctx.setDebug(4)
|
||||||
self._handle = self._ctx.openByVendorIDAndProductID(
|
self._handle = self._ctx.openByVendorIDAndProductID(
|
||||||
vendor_id,
|
vendor_id,
|
||||||
product_id,
|
product_id,
|
||||||
@@ -173,7 +196,7 @@ class LinakController(object):
|
|||||||
raise Exception('Could not connect to usb device')
|
raise Exception('Could not connect to usb device')
|
||||||
|
|
||||||
self._handle.claimInterface(0)
|
self._handle.claimInterface(0)
|
||||||
self._initDevice()
|
self._init_device()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self._handle:
|
if self._handle:
|
||||||
@@ -182,24 +205,25 @@ class LinakController(object):
|
|||||||
del(self._handle)
|
del(self._handle)
|
||||||
del(self._ctx)
|
del(self._ctx)
|
||||||
|
|
||||||
def _controlWriteRead(self, request_type, request, value, index, data, timeout=0):
|
def _control_write_read(self, request_type, request, value, index, data,
|
||||||
|
timeout=0):
|
||||||
data, data_buffer = usb1.create_initialised_buffer(data)
|
data, data_buffer = usb1.create_initialised_buffer(data)
|
||||||
transferred = self._handle._controlTransfer(request_type, request, value, index, data,
|
transferred = self._handle._controlTransfer(request_type, request,
|
||||||
sizeof(data), timeout)
|
value, index, data,
|
||||||
|
ctypes.sizeof(data),
|
||||||
|
timeout)
|
||||||
return transferred, data_buffer[:transferred]
|
return transferred, data_buffer[:transferred]
|
||||||
|
|
||||||
def _getStatusReport(self):
|
def _get_status_report(self):
|
||||||
buf = bytearray(b'\x00'*LEN_STATUS_REPORT)
|
buf = bytearray(b'\x00'*LEN_STATUS_REPORT)
|
||||||
buf[0] = CMD_STATUS_REPORT
|
buf[0] = CMD_STATUS_REPORT
|
||||||
#print('> {:s}'.format(buf.hex()))
|
# print('> {:s}'.format(buf.hex()))
|
||||||
x, buf = self._controlWriteRead(
|
_, buf = self._control_write_read(TYPE_GET_CI,
|
||||||
TYPE_GET_CI,
|
|
||||||
HID_REPORT_GET,
|
HID_REPORT_GET,
|
||||||
REQ_GET_STATUS,
|
REQ_GET_STATUS,
|
||||||
0,
|
0,
|
||||||
buf,
|
buf,
|
||||||
LINAK_TIMEOUT
|
LINAK_TIMEOUT)
|
||||||
)
|
|
||||||
|
|
||||||
# check if the response match to request!
|
# check if the response match to request!
|
||||||
if buf[0] != CMD_STATUS_REPORT:
|
if buf[0] != CMD_STATUS_REPORT:
|
||||||
@@ -207,14 +231,14 @@ class LinakController(object):
|
|||||||
|
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
def _setStatusReport(self):
|
def _set_status_report(self):
|
||||||
buf = bytearray(b'\x00'*LEN_STATUS_REPORT)
|
buf = bytearray(b'\x00' * LEN_STATUS_REPORT)
|
||||||
buf[0] = CMD_MODE_OF_OPERATION
|
buf[0] = CMD_MODE_OF_OPERATION
|
||||||
buf[1] = DEF_MODE_OF_OPERATION
|
buf[1] = DEF_MODE_OF_OPERATION
|
||||||
buf[2] = 0
|
buf[2] = 0
|
||||||
buf[3] = 251
|
buf[3] = 251
|
||||||
|
|
||||||
x, buf = self._controlWriteRead(
|
x, buf = self._control_write_read(
|
||||||
TYPE_SET_CI,
|
TYPE_SET_CI,
|
||||||
HID_REPORT_SET,
|
HID_REPORT_SET,
|
||||||
REQ_INIT,
|
REQ_INIT,
|
||||||
@@ -224,7 +248,8 @@ class LinakController(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if x != LEN_STATUS_REPORT:
|
if x != LEN_STATUS_REPORT:
|
||||||
raise Exception('Device is not ready yet. Initialization failed in step 1.')
|
raise Exception('Device is not ready yet. Initialization failed '
|
||||||
|
'in step 1.')
|
||||||
|
|
||||||
def _move(self, height):
|
def _move(self, height):
|
||||||
buf = bytearray(b'\x00' * LEN_STATUS_REPORT)
|
buf = bytearray(b'\x00' * LEN_STATUS_REPORT)
|
||||||
@@ -243,7 +268,7 @@ class LinakController(object):
|
|||||||
buf[7] = hHigh
|
buf[7] = hHigh
|
||||||
buf[8] = hLow
|
buf[8] = hLow
|
||||||
|
|
||||||
x, buf = self._controlWriteRead(
|
x, buf = self._control_write_read(
|
||||||
TYPE_SET_CI,
|
TYPE_SET_CI,
|
||||||
HID_REPORT_SET,
|
HID_REPORT_SET,
|
||||||
REQ_MOVE,
|
REQ_MOVE,
|
||||||
@@ -253,16 +278,16 @@ class LinakController(object):
|
|||||||
)
|
)
|
||||||
return x == LEN_STATUS_REPORT
|
return x == LEN_STATUS_REPORT
|
||||||
|
|
||||||
def _moveDown(self):
|
def _move_down(self):
|
||||||
return self._move(HEIGHT_MOVE_DOWNWARDS)
|
return self._move(HEIGHT_MOVE_DOWNWARDS)
|
||||||
|
|
||||||
def _moveUp(self):
|
def _move_up(self):
|
||||||
return self._move(HEIGHT_MOVE_UPWARDS)
|
return self._move(HEIGHT_MOVE_UPWARDS)
|
||||||
|
|
||||||
def _moveEnd(self):
|
def _move_end(self):
|
||||||
return self._move(HEIGHT_MOVE_END)
|
return self._move(HEIGHT_MOVE_END)
|
||||||
|
|
||||||
def _isStatusReportNotReady(self, buf):
|
def _is_status_report_not_ready(self, buf):
|
||||||
if buf[0] != CMD_STATUS_REPORT or buf[1] != NRB_STATUS_REPORT:
|
if buf[0] != CMD_STATUS_REPORT or buf[1] != NRB_STATUS_REPORT:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -272,17 +297,18 @@ class LinakController(object):
|
|||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _initDevice(self):
|
def _init_device(self):
|
||||||
buf = self._getStatusReport()
|
buf = self._get_status_report()
|
||||||
if not self._isStatusReportNotReady(buf):
|
if not self._is_status_report_not_ready(buf):
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
print('Device not ready!')
|
print('Device not ready!')
|
||||||
|
|
||||||
self._setStatusReport()
|
self._set_status_report()
|
||||||
time.sleep(1000/1000000.0)
|
time.sleep(1000/1000000.0)
|
||||||
if not _moveEnd():
|
if not _move_end():
|
||||||
raise Exception('Device not ready - initialization failed on step 2 (moveEnd)')
|
raise Exception('Device not ready - initialization failed on step '
|
||||||
|
'2 (move_end)')
|
||||||
|
|
||||||
time.sleep(100000/1000000.0)
|
time.sleep(100000/1000000.0)
|
||||||
|
|
||||||
@@ -295,8 +321,8 @@ class LinakController(object):
|
|||||||
self._move(target)
|
self._move(target)
|
||||||
time.sleep(200000/1000000.0)
|
time.sleep(200000/1000000.0)
|
||||||
|
|
||||||
buf = self._getStatusReport()
|
buf = self._get_status_report()
|
||||||
r = StatusReport.fromBuf(buf)
|
r = StatusReport.from_buf(buf)
|
||||||
distance = r.ref1cnt - r.ref1.pos
|
distance = r.ref1cnt - r.ref1.pos
|
||||||
delta = oldH-r.ref1.pos
|
delta = oldH-r.ref1.pos
|
||||||
if abs(distance) <= epsilon or abs(delta) <= epsilon or oldH == r.ref1.pos:
|
if abs(distance) <= epsilon or abs(delta) <= epsilon or oldH == r.ref1.pos:
|
||||||
@@ -318,14 +344,14 @@ class LinakController(object):
|
|||||||
|
|
||||||
return abs(r.ref1.pos - target) <= epsilon
|
return abs(r.ref1.pos - target) <= epsilon
|
||||||
|
|
||||||
def getHeight(self):
|
def get_height(self):
|
||||||
buf = self._getStatusReport()
|
buf = self._get_status_report()
|
||||||
r = StatusReport.fromBuf(buf)
|
r = StatusReport.from_buf(buf)
|
||||||
|
|
||||||
return r.ref1.pos, r.ref1.pos/98.0
|
return r.ref1.pos, r.ref1.pos/98.0
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
import argparse
|
|
||||||
parser = argparse.ArgumentParser(description='Get the control on your desk!')
|
parser = argparse.ArgumentParser(description='Get the control on your desk!')
|
||||||
parser.add_argument('command', choices=['move', 'height'], help='Command to execute.')
|
parser.add_argument('command', choices=['move', 'height'], help='Command to execute.')
|
||||||
parser.add_argument('height', type=int, nargs='?', help='For command "move", give the destination height.')
|
parser.add_argument('height', type=int, nargs='?', help='For command "move", give the destination height.')
|
||||||
@@ -345,7 +371,7 @@ if __name__ == '__main__':
|
|||||||
else:
|
else:
|
||||||
print('Command failed')
|
print('Command failed')
|
||||||
elif args.command == 'height':
|
elif args.command == 'height':
|
||||||
h, hcm = co.getHeight()
|
h, hcm = co.get_height()
|
||||||
print('Current height is: {:d} / {:.2f} cm'.format(h, hcm))
|
print('Current height is: {:d} / {:.2f} cm'.format(h, hcm))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
co.close()
|
co.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user