mirror of
https://github.com/gryf/python-linak-desk-control.git
synced 2025-12-17 11:30:29 +01:00
Further fixes here and there.
This commit is contained in:
@@ -19,6 +19,8 @@ import time
|
|||||||
|
|
||||||
import usb1
|
import usb1
|
||||||
|
|
||||||
|
VENDOR_ID = 0x12d3 # Linak
|
||||||
|
PRODUCT_ID = 0x0002 # DeskLine CBD Control Box
|
||||||
|
|
||||||
REQ_INIT = 0x0303
|
REQ_INIT = 0x0303
|
||||||
REQ_GET_STATUS = 0x0304
|
REQ_GET_STATUS = 0x0304
|
||||||
@@ -63,10 +65,10 @@ class Status(object):
|
|||||||
self = cls()
|
self = cls()
|
||||||
attr = ['positionLost', 'antiColision', 'overloadDown', 'overloadUp']
|
attr = ['positionLost', 'antiColision', 'overloadDown', 'overloadUp']
|
||||||
bitlist = '{:0>8s}'.format(bin(int(buf, base=16)).lstrip('0b'))
|
bitlist = '{:0>8s}'.format(bin(int(buf, base=16)).lstrip('0b'))
|
||||||
for i in range(0, 4):
|
for index, attr_name in enumerate(attr):
|
||||||
setattr(self, attr[i], True if bitlist[i] == '1' else False)
|
setattr(self, attr_name, bitlist[index] == '1')
|
||||||
# set unknown
|
# set unknown
|
||||||
self.unkown = int(buf[1:], 16)
|
self.unknown = int(buf[1:], 16)
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@@ -123,9 +125,8 @@ class ValidFlags(object):
|
|||||||
'ID06_Ref7_pos_stat_speed',
|
'ID06_Ref7_pos_stat_speed',
|
||||||
'ID07_Ref8_pos_stat_speed',
|
'ID07_Ref8_pos_stat_speed',
|
||||||
'unknown']
|
'unknown']
|
||||||
bitlist = '{:0>16s}'.format(bin(int(buf, base=16)).lstrip('0b'))
|
for idx, bit in enumerate(bin(int(buf, base=16))[2:].zfill(16)):
|
||||||
for i in range(0, len(bitlist)):
|
setattr(self, attr[idx], bit == '1')
|
||||||
setattr(self, attr[i], True if bitlist[i] == '1' else False)
|
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@@ -184,14 +185,12 @@ class LinakController(object):
|
|||||||
_handle = None
|
_handle = None
|
||||||
_ctx = None
|
_ctx = None
|
||||||
|
|
||||||
def __init__(self, vendor_id=0x12d3, product_id=0x0002):
|
def __init__(self):
|
||||||
self._ctx = usb1.USBContext()
|
self._ctx = usb1.USBContext()
|
||||||
# self._ctx.setDebug(4)
|
# self._ctx.setDebug(4)
|
||||||
self._handle = self._ctx.openByVendorIDAndProductID(
|
self._handle = self._ctx.openByVendorIDAndProductID(VENDOR_ID,
|
||||||
vendor_id,
|
PRODUCT_ID,
|
||||||
product_id,
|
skip_on_error=True)
|
||||||
skip_on_error=True,
|
|
||||||
)
|
|
||||||
if not self._handle:
|
if not self._handle:
|
||||||
raise Exception('Could not connect to usb device')
|
raise Exception('Could not connect to usb device')
|
||||||
|
|
||||||
@@ -202,11 +201,11 @@ class LinakController(object):
|
|||||||
if self._handle:
|
if self._handle:
|
||||||
self._handle.releaseInterface(0)
|
self._handle.releaseInterface(0)
|
||||||
|
|
||||||
del(self._handle)
|
del self._handle
|
||||||
del(self._ctx)
|
del self._ctx
|
||||||
|
|
||||||
def _control_write_read(self, request_type, request, value, index, data,
|
def _control_write_read(self, request_type, request, value, index, data,
|
||||||
timeout=0):
|
timeout=0):
|
||||||
data, data_buffer = usb1.create_initialised_buffer(data)
|
data, data_buffer = usb1.create_initialised_buffer(data)
|
||||||
transferred = self._handle._controlTransfer(request_type, request,
|
transferred = self._handle._controlTransfer(request_type, request,
|
||||||
value, index, data,
|
value, index, data,
|
||||||
@@ -215,15 +214,15 @@ class LinakController(object):
|
|||||||
return transferred, data_buffer[:transferred]
|
return transferred, data_buffer[:transferred]
|
||||||
|
|
||||||
def _get_status_report(self):
|
def _get_status_report(self):
|
||||||
buf = bytearray(b'\x00'*LEN_STATUS_REPORT)
|
buf = bytearray(b'\x00' * LEN_STATUS_REPORT)
|
||||||
buf[0] = CMD_STATUS_REPORT
|
buf[0] = CMD_STATUS_REPORT
|
||||||
# print('> {:s}'.format(buf.hex()))
|
# print('> {:s}'.format(buf.hex()))
|
||||||
_, buf = self._control_write_read(TYPE_GET_CI,
|
_, buf = self._control_write_read(TYPE_GET_CI,
|
||||||
HID_REPORT_GET,
|
HID_REPORT_GET,
|
||||||
REQ_GET_STATUS,
|
REQ_GET_STATUS,
|
||||||
0,
|
0,
|
||||||
buf,
|
buf,
|
||||||
LINAK_TIMEOUT)
|
LINAK_TIMEOUT)
|
||||||
|
|
||||||
# check if the response match to request!
|
# check if the response match to request!
|
||||||
if buf[0] != CMD_STATUS_REPORT:
|
if buf[0] != CMD_STATUS_REPORT:
|
||||||
@@ -238,16 +237,14 @@ class LinakController(object):
|
|||||||
buf[2] = 0
|
buf[2] = 0
|
||||||
buf[3] = 251
|
buf[3] = 251
|
||||||
|
|
||||||
x, buf = self._control_write_read(
|
amount, buf = self._control_write_read(TYPE_SET_CI,
|
||||||
TYPE_SET_CI,
|
HID_REPORT_SET,
|
||||||
HID_REPORT_SET,
|
REQ_INIT,
|
||||||
REQ_INIT,
|
0,
|
||||||
0,
|
buf,
|
||||||
buf,
|
LINAK_TIMEOUT)
|
||||||
LINAK_TIMEOUT
|
|
||||||
)
|
|
||||||
|
|
||||||
if x != LEN_STATUS_REPORT:
|
if amount != LEN_STATUS_REPORT:
|
||||||
raise Exception('Device is not ready yet. Initialization failed '
|
raise Exception('Device is not ready yet. Initialization failed '
|
||||||
'in step 1.')
|
'in step 1.')
|
||||||
|
|
||||||
@@ -268,15 +265,13 @@ class LinakController(object):
|
|||||||
buf[7] = hHigh
|
buf[7] = hHigh
|
||||||
buf[8] = hLow
|
buf[8] = hLow
|
||||||
|
|
||||||
x, buf = self._control_write_read(
|
amount, buf = self._control_write_read(TYPE_SET_CI,
|
||||||
TYPE_SET_CI,
|
HID_REPORT_SET,
|
||||||
HID_REPORT_SET,
|
REQ_MOVE,
|
||||||
REQ_MOVE,
|
0,
|
||||||
0,
|
buf,
|
||||||
buf,
|
LINAK_TIMEOUT)
|
||||||
LINAK_TIMEOUT
|
return amount == LEN_STATUS_REPORT
|
||||||
)
|
|
||||||
return x == LEN_STATUS_REPORT
|
|
||||||
|
|
||||||
def _move_down(self):
|
def _move_down(self):
|
||||||
return self._move(HEIGHT_MOVE_DOWNWARDS)
|
return self._move(HEIGHT_MOVE_DOWNWARDS)
|
||||||
@@ -305,30 +300,31 @@ class LinakController(object):
|
|||||||
print('Device not ready!')
|
print('Device not ready!')
|
||||||
|
|
||||||
self._set_status_report()
|
self._set_status_report()
|
||||||
time.sleep(1000/1000000.0)
|
time.sleep(0.001)
|
||||||
if not _move_end():
|
if not self._move_end():
|
||||||
raise Exception('Device not ready - initialization failed on step '
|
raise Exception('Device not ready - initialization failed on step '
|
||||||
'2 (move_end)')
|
'2 (move_end)')
|
||||||
|
|
||||||
time.sleep(100000/1000000.0)
|
time.sleep(0.1)
|
||||||
|
|
||||||
def move(self, target):
|
def move(self, target):
|
||||||
a = max_a = 3
|
retry_count = max_retry = 3
|
||||||
epsilon = 13
|
epsilon = 13
|
||||||
oldH = 0
|
prev_height = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
self._move(target)
|
self._move(target)
|
||||||
time.sleep(200000/1000000.0)
|
time.sleep(0.2)
|
||||||
|
|
||||||
buf = self._get_status_report()
|
buf = self._get_status_report()
|
||||||
r = StatusReport.from_buf(buf)
|
r = StatusReport.from_buf(buf)
|
||||||
distance = r.ref1cnt - r.ref1.pos
|
distance = r.ref1cnt - r.ref1.pos
|
||||||
delta = oldH-r.ref1.pos
|
delta = abs(prev_height - r.ref1.pos)
|
||||||
if abs(distance) <= epsilon or abs(delta) <= epsilon or oldH == r.ref1.pos:
|
if (abs(distance) <= epsilon or delta <= epsilon or
|
||||||
a -= 1
|
prev_height == r.ref1.pos):
|
||||||
|
retry_count -= 1
|
||||||
else:
|
else:
|
||||||
a = max_a
|
retry_count = max_retry
|
||||||
|
|
||||||
print(
|
print(
|
||||||
'Current height: {:d}; target height: {:d}; distance: {:d}'.format(
|
'Current height: {:d}; target height: {:d}; distance: {:d}'.format(
|
||||||
@@ -338,9 +334,10 @@ class LinakController(object):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if a == 0:
|
if retry_count == 0:
|
||||||
break
|
break
|
||||||
oldH = r.ref1.pos
|
|
||||||
|
prev_height = r.ref1.pos
|
||||||
|
|
||||||
return abs(r.ref1.pos - target) <= epsilon
|
return abs(r.ref1.pos - target) <= epsilon
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user