From 19938a28f191bddf865443369c18919ecb45c881 Mon Sep 17 00:00:00 2001 From: gryf Date: Tue, 23 Feb 2021 20:23:40 +0100 Subject: [PATCH] Roughly pepify the code. --- linak-desk-control.py | 142 +++++++++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 58 deletions(-) diff --git a/linak-desk-control.py b/linak-desk-control.py index 9a297bf..35931ef 100644 --- a/linak-desk-control.py +++ b/linak-desk-control.py @@ -12,11 +12,14 @@ # You should have received a copy of the GNU General Public License along with this # program. If not, see . -from ctypes import sizeof +import argparse +import ctypes import sys import time + import usb1 + REQ_INIT = 0x0303 REQ_GET_STATUS = 0x0304 REQ_MOVE = 0x0305 @@ -47,6 +50,7 @@ HEIGHT_MOVE_DOWNWARDS = 32767 HEIGHT_MOVE_UPWARDS = 32768 HEIGHT_MOVE_END = 32769 + class Status(object): positionLost = True antiColision = True @@ -55,8 +59,8 @@ class Status(object): unknown = 4 @classmethod - def fromBuf(sr, buf): - self = sr() + def from_buf(cls, buf): + self = cls() attr = ['positionLost', 'antiColision', 'overloadDown', 'overloadUp'] bitlist = '{:0>8s}'.format(bin(int(buf, base=16)).lstrip('0b')) for i in range(0, 4): @@ -66,20 +70,22 @@ class Status(object): return self + class StatusPositionSpeed(object): pos = None status = None speed = 0 @classmethod - def fromBuf(sr, buf): - self = sr() + def from_buf(cls, buf): + self = cls() self.pos = int(buf[2:4] + buf[:2], 16) - self.status = Status.fromBuf(buf[4:6]) + self.status = Status.from_buf(buf[4:6]) self.speed = int(buf[6:8], 16) return self + class ValidFlags(object): ID00_Ref1_pos_stat_speed = True ID01_Ref2_pos_stat_speed = True @@ -99,15 +105,31 @@ class ValidFlags(object): unknown = True @classmethod - def fromBuf(sr, buf): - self = sr() - attr = ['ID00_Ref1_pos_stat_speed', 'ID01_Ref2_pos_stat_speed', 'ID02_Ref3_pos_stat_speed', 'ID03_Ref4_pos_stat_speed', 'ID10_Ref1_controlInput', 'ID11_Ref2_controlInput', 'ID12_Ref3_controlInput', 'ID13_Ref4_controlInput', 'ID04_Ref5_pos_stat_speed', 'ID28_Diagnostic', 'ID05_Ref6_pos_stat_speed', 'ID37_Handset1command', 'ID38_Handset2command', 'ID06_Ref7_pos_stat_speed', 'ID07_Ref8_pos_stat_speed', 'unknown'] + def from_buf(cls, buf): + self = cls() + attr = ['ID00_Ref1_pos_stat_speed', + 'ID01_Ref2_pos_stat_speed', + 'ID02_Ref3_pos_stat_speed', + 'ID03_Ref4_pos_stat_speed', + 'ID10_Ref1_controlInput', + 'ID11_Ref2_controlInput', + 'ID12_Ref3_controlInput', + 'ID13_Ref4_controlInput', + 'ID04_Ref5_pos_stat_speed', + 'ID28_Diagnostic', + 'ID05_Ref6_pos_stat_speed', + 'ID37_Handset1command', + 'ID38_Handset2command', + 'ID06_Ref7_pos_stat_speed', + 'ID07_Ref8_pos_stat_speed', + 'unknown'] bitlist = '{:0>16s}'.format(bin(int(buf, base=16)).lstrip('0b')) for i in range(0, len(bitlist)): setattr(self, attr[i], True if bitlist[i] == '1' else False) return self + class StatusReport(object): featureRaportID = 0 numberOfBytes = 0 @@ -131,39 +153,40 @@ class StatusReport(object): undefined2 = None @classmethod - def fromBuf(sr, buf): - self = sr() + def from_buf(cls, buf): + self = cls() raw = buf.hex() self.featureRaportID = buf[0] self.numberOfBytes = buf[1] - self.validFlag = ValidFlags.fromBuf(raw[4:8]) - self.ref1 = StatusPositionSpeed.fromBuf(raw[8:8+8]) - self.ref2 = StatusPositionSpeed.fromBuf(raw[16:16+8]) - self.ref3 = StatusPositionSpeed.fromBuf(raw[24:24+8]) - self.ref4 = StatusPositionSpeed.fromBuf(raw[32:32+8]) + self.validFlag = ValidFlags.from_buf(raw[4:8]) + self.ref1 = StatusPositionSpeed.from_buf(raw[8:8+8]) + self.ref2 = StatusPositionSpeed.from_buf(raw[16:16+8]) + self.ref3 = StatusPositionSpeed.from_buf(raw[24:24+8]) + self.ref4 = StatusPositionSpeed.from_buf(raw[32:32+8]) self.ref1cnt = int(raw[42:44] + raw[40:42], 16) self.ref2cnt = int(raw[46:48] + raw[44:46], 16) self.ref3cnt = int(raw[50:52] + raw[48:50], 16) self.ref4cnt = int(raw[54:56] + raw[52:54], 16) - self.ref5 = StatusPositionSpeed.fromBuf(raw[56:56+8]) + self.ref5 = StatusPositionSpeed.from_buf(raw[56:56+8]) self.diagnostic = raw[64:64+16] self.undefined1 = raw[80:84] self.handset1 = int(raw[86:88] + raw[84:86], 16) self.handset2 = int(raw[88:90] + raw[86:88], 16) - self.ref6 = StatusPositionSpeed.fromBuf(raw[90:90+8]) - self.ref7 = StatusPositionSpeed.fromBuf(raw[98:98+8]) - self.ref8 = StatusPositionSpeed.fromBuf(raw[106:106+8]) + self.ref6 = StatusPositionSpeed.from_buf(raw[90:90+8]) + self.ref7 = StatusPositionSpeed.from_buf(raw[98:98+8]) + self.ref8 = StatusPositionSpeed.from_buf(raw[106:106+8]) self.undefined2 = raw[114:] return self + class LinakController(object): _handle = None _ctx = None def __init__(self, vendor_id=0x12d3, product_id=0x0002): - self._ctx =usb1.USBContext() - #self._ctx.setDebug(4) + self._ctx = usb1.USBContext() + # self._ctx.setDebug(4) self._handle = self._ctx.openByVendorIDAndProductID( vendor_id, product_id, @@ -173,7 +196,7 @@ class LinakController(object): raise Exception('Could not connect to usb device') self._handle.claimInterface(0) - self._initDevice() + self._init_device() def close(self): if self._handle: @@ -182,24 +205,25 @@ class LinakController(object): del(self._handle) del(self._ctx) - def _controlWriteRead(self, request_type, request, value, index, data, timeout=0): + def _control_write_read(self, request_type, request, value, index, data, + timeout=0): data, data_buffer = usb1.create_initialised_buffer(data) - transferred = self._handle._controlTransfer(request_type, request, value, index, data, - sizeof(data), timeout) + transferred = self._handle._controlTransfer(request_type, request, + value, index, data, + ctypes.sizeof(data), + timeout) return transferred, data_buffer[:transferred] - def _getStatusReport(self): + def _get_status_report(self): buf = bytearray(b'\x00'*LEN_STATUS_REPORT) buf[0] = CMD_STATUS_REPORT - #print('> {:s}'.format(buf.hex())) - x, buf = self._controlWriteRead( - TYPE_GET_CI, - HID_REPORT_GET, - REQ_GET_STATUS, - 0, - buf, - LINAK_TIMEOUT - ) + # print('> {:s}'.format(buf.hex())) + _, buf = self._control_write_read(TYPE_GET_CI, + HID_REPORT_GET, + REQ_GET_STATUS, + 0, + buf, + LINAK_TIMEOUT) # check if the response match to request! if buf[0] != CMD_STATUS_REPORT: @@ -207,14 +231,14 @@ class LinakController(object): return buf - def _setStatusReport(self): - buf = bytearray(b'\x00'*LEN_STATUS_REPORT) + def _set_status_report(self): + buf = bytearray(b'\x00' * LEN_STATUS_REPORT) buf[0] = CMD_MODE_OF_OPERATION buf[1] = DEF_MODE_OF_OPERATION buf[2] = 0 buf[3] = 251 - x, buf = self._controlWriteRead( + x, buf = self._control_write_read( TYPE_SET_CI, HID_REPORT_SET, REQ_INIT, @@ -224,7 +248,8 @@ class LinakController(object): ) if x != LEN_STATUS_REPORT: - raise Exception('Device is not ready yet. Initialization failed in step 1.') + raise Exception('Device is not ready yet. Initialization failed ' + 'in step 1.') def _move(self, height): buf = bytearray(b'\x00' * LEN_STATUS_REPORT) @@ -243,7 +268,7 @@ class LinakController(object): buf[7] = hHigh buf[8] = hLow - x, buf = self._controlWriteRead( + x, buf = self._control_write_read( TYPE_SET_CI, HID_REPORT_SET, REQ_MOVE, @@ -253,16 +278,16 @@ class LinakController(object): ) return x == LEN_STATUS_REPORT - def _moveDown(self): + def _move_down(self): return self._move(HEIGHT_MOVE_DOWNWARDS) - def _moveUp(self): + def _move_up(self): return self._move(HEIGHT_MOVE_UPWARDS) - def _moveEnd(self): + def _move_end(self): return self._move(HEIGHT_MOVE_END) - def _isStatusReportNotReady(self, buf): + def _is_status_report_not_ready(self, buf): if buf[0] != CMD_STATUS_REPORT or buf[1] != NRB_STATUS_REPORT: return False @@ -272,17 +297,18 @@ class LinakController(object): return True - def _initDevice(self): - buf = self._getStatusReport() - if not self._isStatusReportNotReady(buf): + def _init_device(self): + buf = self._get_status_report() + if not self._is_status_report_not_ready(buf): return else: print('Device not ready!') - self._setStatusReport() + self._set_status_report() time.sleep(1000/1000000.0) - if not _moveEnd(): - raise Exception('Device not ready - initialization failed on step 2 (moveEnd)') + if not _move_end(): + raise Exception('Device not ready - initialization failed on step ' + '2 (move_end)') time.sleep(100000/1000000.0) @@ -295,8 +321,8 @@ class LinakController(object): self._move(target) time.sleep(200000/1000000.0) - buf = self._getStatusReport() - r = StatusReport.fromBuf(buf) + buf = self._get_status_report() + r = StatusReport.from_buf(buf) distance = r.ref1cnt - r.ref1.pos delta = oldH-r.ref1.pos if abs(distance) <= epsilon or abs(delta) <= epsilon or oldH == r.ref1.pos: @@ -318,14 +344,14 @@ class LinakController(object): return abs(r.ref1.pos - target) <= epsilon - def getHeight(self): - buf = self._getStatusReport() - r = StatusReport.fromBuf(buf) + def get_height(self): + buf = self._get_status_report() + r = StatusReport.from_buf(buf) return r.ref1.pos, r.ref1.pos/98.0 + if __name__ == '__main__': - import argparse parser = argparse.ArgumentParser(description='Get the control on your desk!') parser.add_argument('command', choices=['move', 'height'], help='Command to execute.') parser.add_argument('height', type=int, nargs='?', help='For command "move", give the destination height.') @@ -345,7 +371,7 @@ if __name__ == '__main__': else: print('Command failed') elif args.command == 'height': - h, hcm = co.getHeight() + h, hcm = co.get_height() print('Current height is: {:d} / {:.2f} cm'.format(h, hcm)) except Exception as e: co.close()