File python-can_winmate_ucan03.patch of Package python-can

diff -urN python-can-3.3.4/can/interfaces/__init__.py python-can-3.3.4-patch/can/interfaces/__init__.py
--- python-can-3.3.4/can/interfaces/__init__.py	2023-07-28 16:35:53.833232070 +0300
+++ python-can-3.3.4-patch/can/interfaces/__init__.py	2023-07-28 16:34:55.442160528 +0300
@@ -25,6 +25,7 @@
     'canalystii':       ('can.interfaces.canalystii',       'CANalystIIBus'),
     'systec':           ('can.interfaces.systec',           'UcanBus'),
     'neousys':          ('can.interfaces.neousys',          'NeousysBus'),
+    'ucan03':           ('can.interfaces.winmate',          'WinmateBus'),
 }
 
 BACKENDS.update({
diff -urN python-can-3.3.4/can/interfaces/winmate/__init__.py python-can-3.3.4-patch/can/interfaces/winmate/__init__.py
--- python-can-3.3.4/can/interfaces/winmate/__init__.py	1970-01-01 02:00:00.000000000 +0200
+++ python-can-3.3.4-patch/can/interfaces/winmate/__init__.py	2023-07-28 16:33:07.460187459 +0300
@@ -0,0 +1,3 @@
+""" Winmate UCAN03 CAN bus driver """
+
+from can.interfaces.winmate.winmate import WinmateBus
diff -urN python-can-3.3.4/can/interfaces/winmate/winmate.py python-can-3.3.4-patch/can/interfaces/winmate/winmate.py
--- python-can-3.3.4/can/interfaces/winmate/winmate.py	1970-01-01 02:00:00.000000000 +0200
+++ python-can-3.3.4-patch/can/interfaces/winmate/winmate.py	2023-08-09 13:22:17.642997564 +0300
@@ -0,0 +1,315 @@
+""" Winmate UCAN03 driver """
+
+# This is driver for Winmate UCAN03 CAN bus board which is connected
+# to computer as FTDI interface. As it needs direct FTDI and not linux
+# FTDI module one should blacklist ftdi_sio.
+#
+# echo "blacklist ftdi_sio" >> /etc/modprobe.d/local-blacklist-ftdi_sio.conf
+#
+# Without blacklist nor SDK or this driver won't work
+#
+# UCAN03 has SJA1000 CAN controller and one can have 2 CAN interfaces separated
+# from each other.
+#
+# Driver uses pyftdi rather than libftdi interface as I didn't get it working
+#
+# UCAN03 documenation mentions that SDK can be obtained from
+# git clone ssh://ucan@sw.winmate.com.tw/home/prj/CANBUS/UCAN_Linux_Ubuntu/
+#
+# And yes this is reverse engineered withouth help from Winmate so
+# there can be problems.
+#
+
+# pylint: disable=too-few-public-methods
+# pylint: disable=too-many-instance-attributes
+# pylint: disable=wrong-import-position
+# pylint: disable=method-hidden
+# pylint: disable=unused-import
+
+import queue
+import logging
+import platform
+import time
+import binascii
+
+from can import BusABC, Message, CanError
+
+pyftdi_module = True
+
+try:
+    from pyftdi import FtdiLogger
+    from pyftdi.ftdi import Ftdi, FtdiError
+    from pyftdi.usbtools import UsbTools, UsbToolsError
+except ImportError:
+    raise CanError(
+        "You won't be able to use the Winmate UCAN03 can backend without "
+        "the pyftdi module installed!"
+    )
+    pyftdi_false = False
+
+
+UCAN03_BAUDRATE_20k = b"\x00"
+UCAN03_BAUDRATE_40k = b"\x01"
+UCAN03_BAUDRATE_50k = b"\x02"
+UCAN03_BAUDRATE_80k = b"\x03"
+UCAN03_BAUDRATE_100k = b"\x04"
+UCAN03_BAUDRATE_125k = b"\x05"
+UCAN03_BAUDRATE_200k = b"\x06"
+UCAN03_BAUDRATE_250k = b"\x07"
+UCAN03_BAUDRATE_400k = b"\x08"
+UCAN03_BAUDRATE_500k = b"\x09"
+UCAN03_BAUDRATE_666k = b"\x0A"
+UCAN03_BAUDRATE_800k = b"\x0B"
+UCAN03_BAUDRATE_1000k = b"\x0C"
+
+# UCAN03 separation header is '_WM_' which
+# and it's same on sending and receiving
+UCAN03_HEADER = b"\x5f\x57\x4d\x5f"
+
+UCAN03_CMD_RESET = b"\x10"
+UCAN03_CMD_SET_MODE = b"\x12"
+UCAN03_CMD_SET_ID = b"\x13"
+UCAN03_CMD_SET_MASK = b"\x14"
+UCAN03_CMD_SET_BAUD = b"\x15"
+UCAN03_CMD_RESTART = b"\x17"
+UCAN03_CMD_STOP_RECEIVE = b"\x18"
+UCAN03_CMD_SEND = b"\x30"
+
+UCAN03_VENDOR = 0x403
+UCAN03_PRODUCT = 0x6010
+# This just the device that I have and I hope there are correct
+# in every version on UCAN03
+UCAN03_SERIAL_01 = "00000041"
+UCAN03_SERIAL_02 = "00000042"
+
+
+class WinmateBus(BusABC):
+    """Winmate UCAN03 board CAN bus Class"""
+
+    def __init__(self, channel, device=0, bitrate=500000, **kwargs):
+        """
+        :param channel: channel number
+        :param device: device number
+        :param bitrate: bit rate. Renamed to bitrate in next release.
+        """
+
+        if pyftdi_module is False:
+            raise CanError("The pyftdi module is not installed")
+
+        super().__init__(channel, **kwargs)
+        self.channel = channel
+        self.device = device
+
+        self.channel_info = "Winmate UCAN03 Can: device {}, channel {}".format(
+            self.device, self.channel
+        )
+
+        self.queue = queue.Queue()
+        self.ftdi = Ftdi()
+
+        device_tuple = [(0x0403, 0x6010)]
+        self.devices = self.ftdi.find_all(device_tuple)
+
+        port_count = 0
+
+        for device in self.devices:
+            if device[0].sn == UCAN03_SERIAL_01 or device[0].sn == UCAN03_SERIAL_02:
+                port_count = port_count + 1
+
+        if port_count != 2:
+            raise CanError("Can't find Winmate UCAN03 FTDI ports")
+
+        self.usb_serial = UCAN03_SERIAL_01
+
+        if channel == 2:
+            self.usb_serial = UCAN03_SERIAL_02
+
+        self.ftdi.open(
+            vendor=UCAN03_VENDOR,
+            product=UCAN03_PRODUCT,
+            serial=self.usb_serial,
+            interface=1,
+        )
+
+        # This is sequence that we have to do
+        # before CAN is operational
+        self._reset()
+        self._stop_receive()
+        self._set_id(b"\x01\x01\xff\xff")
+        self._set_mask(b"\xff\xff\xff\xff")
+        self._set_baud(UCAN03_BAUDRATE_125k)
+        self._set_mode(b"\x00")
+        self._restart()
+
+        # Messages are in echo so they
+        # have to read out
+        for i in range(8):
+            self._read_data()
+
+    def send(self, msg, timeout=None):
+        """
+        :param msg: message to send
+        :param timeout: timeout is not used here
+        :return:
+        """
+        data_len = msg.dlc + 5
+
+        try:
+            msg_id = msg.arbitration_id.to_bytes(2, "big")
+        except OverflowError:
+            msg_id = msg.arbitration_id.to_bytes(4, "big")
+
+        # If we don't have extended..
+        if len(msg_id) == 2:
+            tmp_id = msg_id
+            msg_id = tmp_id + b"\xff\xff"
+
+        ucan03_write_msg = bytearray(UCAN03_HEADER)
+        ucan03_write_msg.extend(UCAN03_CMD_SEND)
+        ucan03_write_msg.append(data_len)
+        ucan03_write_msg.extend(msg_id)
+        ucan03_write_msg.append(msg.dlc)
+        ucan03_write_msg.extend(msg.data)
+        self._write_data(ucan03_write_msg)
+
+    def _read_data(self):
+        try:
+            return self.ftdi.read_data_bytes(4096)
+        except FtdiError as exc:
+            raise CanError("FTDI error in read: %s" % str(exc)) from None
+
+    def _recv_internal(self, timeout):
+        # It only reports full frames so if we can read
+        # much we should always have full frame
+        ucan03_read_bytes = self._read_data()
+
+        # Buffer can be empty and then
+        # we should not do parsing as it not
+        # worth it
+        if len(ucan03_read_bytes) > 0:
+            header_byte = 0
+            msg_len = 0
+            can_index = 0
+            can_data_len = 0
+            can_id = bytearray()
+            can_data = bytearray()
+            can_start = 0
+            can_rtr = 0
+            is_header = False
+
+            # UCAN03 full message is
+            # 4 bytes header _WM_
+            # 1 byte CMD
+            # 1 byte length message
+            # 6-13 CAN message
+            # There can be several messages in one
+            for byte in ucan03_read_bytes:
+                if byte == 0x5F and header_byte == 0 and is_header is False:
+                    header_byte = header_byte + 1
+                elif byte == 0x57 and header_byte == 1:
+                    header_byte = header_byte + 1
+                elif byte == 0x4D and header_byte == 2:
+                    header_byte = header_byte + 1
+                elif byte == 0x5F and header_byte == 3:
+                    header_byte = header_byte + 1
+                elif header_byte == 4:
+                    # This is command??
+                    header_byte = header_byte + 1
+                elif header_byte == 5:
+                    # Len
+                    header_byte = 0
+                    is_header = True
+                    msg_len = byte
+                else:
+                    if can_index == 0:
+                        can_start = byte
+                    elif can_index < 5:
+                        if byte != 0xFF:
+                            can_id.append(byte)
+                    elif can_index == 5:
+                        can_data_len = byte
+                    else:
+                        can_data.append(byte)
+
+                    can_index = can_index + 1
+                    msg_len = msg_len - 1
+                    if msg_len == 0:
+                        msg = Message(
+                            timestamp=time.time(),
+                            arbitration_id=int.from_bytes(can_id, "big"),
+                            is_remote_frame=False,
+                            is_extended_id=False,
+                            channel=self.channel,
+                            dlc=can_data_len,
+                            data=can_data,
+                        )
+
+                        header_byte = 0
+                        msg_len = 0
+                        can_index = 0
+                        can_rtr = 0
+                        can_data_len = 0
+                        can_id = bytearray()
+                        can_data = bytearray()
+                        can_start = 0
+                        is_header = False
+
+                        # If we are very slow in reading
+                        # there can be several messages
+                        # waiting.
+                        # In 99% cases this not needed but
+                        # if there is some delay in
+                        # reading and there is some
+                        # faster sender this is needed
+                        try:
+                            self.queue.put(msg)
+                        except queue.Full:
+                            raise CanError("Winmate message Queue is full") from None
+
+        # If we have some packets in Queue return them if not return None
+        try:
+            return self.queue.get(block=False, timeout=None), False
+        except queue.Empty:
+            return None, False
+
+    def shutdown(self):
+        self.ftdi.close()
+
+    def fileno(self):
+        # Return an invalid file descriptor as not used
+        return -1
+
+    def _write_data(self, buf):
+        try:
+            return self.ftdi.write_data(buf)
+        except FtdiError as exc:
+            raise CanError("FTDI error in write: %s" % str(exc)) from None
+
+    def _reset(self):
+        self._write_data(UCAN03_HEADER + UCAN03_CMD_RESET + b"\x01\x00")
+
+    def _restart(self):
+        self._write_data(UCAN03_HEADER + UCAN03_CMD_RESTART + b"\x01\x00")
+
+    def _stop_receive(self):
+        self._write_data(UCAN03_HEADER + UCAN03_CMD_STOP_RECEIVE + b"\x01\x00")
+
+    def _set_id(self, id_bytes):
+        self._write_data(UCAN03_HEADER + UCAN03_CMD_SET_ID + b"\x04" + id_bytes)
+
+    def _set_mask(self, mask_bytes):
+        self._write_data(UCAN03_HEADER + UCAN03_CMD_SET_MASK + b"\x04" + mask_bytes)
+
+    def _set_baud(self, baud_byte):
+        self._write_data(UCAN03_HEADER + UCAN03_CMD_SET_BAUD + b"\x01" + baud_byte)
+
+    def _set_mode(self, mode_byte):
+        self._write_data(UCAN03_HEADER + UCAN03_CMD_SET_MODE + b"\x01" + mode_byte)
+
+    @staticmethod
+    def _detect_available_configs():
+        channels = []
+
+        # There is 2 channels
+        channels.append({"interface": "ucan03", "channel": 2})
+        return channels
openSUSE Build Service is sponsored by