File python-can_axiomtek_ax9290_hack.patch of Package python-can
diff -urN python-can-3.3.4/can/interfaces/serial/serial_can.py python-can-3.3.4-patch/can/interfaces/serial/serial_can.py
--- python-can-3.3.4/can/interfaces/serial/serial_can.py 2020-10-04 23:05:14.000000000 +0300
+++ python-can-3.3.4-patch/can/interfaces/serial/serial_can.py 2021-01-14 13:46:54.865536708 +0200
@@ -11,6 +11,8 @@
import logging
import struct
+import binascii
+import time
from can import BusABC, Message
@@ -33,7 +35,7 @@
"""
- def __init__(self, channel, baudrate=115200, timeout=0.1, rtscts=False,
+ def __init__(self, channel, baudrate=115200, timeout=0.5, rtscts=False,
*args, **kwargs):
"""
:param str channel:
@@ -57,8 +59,9 @@
raise ValueError("Must specify a serial port.")
self.channel_info = "Serial interface: " + channel
+ self.send_tx = 0
self.ser = serial.serial_for_url(
- channel, baudrate=baudrate, timeout=timeout, rtscts=rtscts)
+ channel, baudrate=baudrate, timeout=(timeout / 1000), rtscts=rtscts)
super(SerialBus, self).__init__(channel=channel, *args, **kwargs)
@@ -86,25 +89,44 @@
used instead.
"""
- try:
- timestamp = struct.pack('<I', int(msg.timestamp * 1000))
- except struct.error:
- raise ValueError('Timestamp is out of range')
- try:
- a_id = struct.pack('<I', msg.arbitration_id)
- except struct.error:
- raise ValueError('Arbitration Id is out of range')
+ # @t send CAN A message
+ #
+ # Example string is like
+ # @t10001000110021133
byte_msg = bytearray()
- byte_msg.append(0xAA)
- for i in range(0, 4):
- byte_msg.append(timestamp[i])
- byte_msg.append(msg.dlc)
- for i in range(0, 4):
- byte_msg.append(a_id[i])
- for i in range(0, msg.dlc):
- byte_msg.append(msg.data[i])
- byte_msg.append(0xBB)
+
+ # CAN port number, only 1 is allowed for AX92903-series.
+ byte_msg.extend("@t1".encode())
+ # Transmission intervals in hexadecimal.. Four byte milliseconds
+ byte_msg.extend("0001".encode())
+ # Repeat counter in hexadecimal. Range: 0000~FFFF (0000 = Send once)
+ byte_msg.extend("0001".encode())
+ byte_msg.extend('{:03x}'.format(msg.arbitration_id).encode())
+ byte_msg.extend('{:1x}'.format(msg.dlc).encode())
+ byte_msg.extend(binascii.hexlify(msg.data))
+
+ # If we want to change the default active (Command) Mode to
+ # frame transmission (Data mode), the only is using command
+ # string “+++” to make it.
+ #
+ # The string “+++” is defined by the STM chip which uses
+ # their firmware to convert the signal from the UART to CAN Bus.
+ self.ser.write(str.encode('+++'))
+ time.sleep(0.1)
+
+ # Somewho Tx buffer is full.. tell to empty it
+ # ERROR05 CAN Tx buffer is full
+ if self.send_tx == 1:
+ # print("Empty buffer")
+ self.ser.write(str.encode('@Tx'))
+ self.send_tx = 0
+
self.ser.write(byte_msg)
+ self.ser.write(str.encode('\r\n'))
+
+ # Then get back to operate mode
+ self.ser.write(str.encode('@S3\r\n'))
+ time.sleep(0.8)
def _recv_internal(self, timeout):
"""
@@ -129,28 +151,53 @@
try:
# ser.read can return an empty string
# or raise a SerialException
- rx_byte = self.ser.read()
+ error = 0
+ while True:
+ rx_byte = self.ser.read()
+ if rx_byte == b'' or ord(rx_byte) == ord('@'):
+ break
+ elif ord(rx_byte) == ord('#'):
+ rxd_byte = self.ser.read()
+ if rxd_byte and ord(rxd_byte) == ord('O'):
+ rxd_byte = self.ser.read(3)
+ elif rxd_byte and ord(rxd_byte) == ord('E'):
+ rxd_byte = self.ser.read(8)
+ # ERROR05 CAN Tx buffer is full. This should not
+ # happend but it does
+ if rxd_byte[5] == ord('5'):
+ # print("Request empty")
+ self.send_tx = 1
except serial.SerialException:
return None, False
- if rx_byte and ord(rx_byte) == 0xAA:
- s = bytearray(self.ser.read(4))
- timestamp = (struct.unpack('<I', s))[0]
- dlc = ord(self.ser.read())
-
- s = bytearray(self.ser.read(4))
- arb_id = (struct.unpack('<I', s))[0]
-
- data = self.ser.read(dlc)
-
- rxd_byte = ord(self.ser.read())
- if rxd_byte == 0xBB:
- # received message data okay
- msg = Message(timestamp=timestamp/1000,
- arbitration_id=arb_id,
- dlc=dlc,
- data=data)
- return msg, False
+ if rx_byte and ord(rx_byte) == ord('@'):
+ rx_byte = ord(self.ser.read())
+
+ if rx_byte == ord('F'):
+ s = bytearray(self.ser.read(4))
+ timestamp = time.time()
+
+ ext = int(self.ser.read())
+
+ s = bytearray(self.ser.read(3))
+ arb_id = int(s, 16)
+
+ s = bytearray(self.ser.read(2))
+ dlc = int(s)
+
+ orig_data = self.ser.read(dlc * 2)
+ data = bytearray.fromhex(orig_data.decode('utf-8'))
+
+ rxd_byte = ord(self.ser.read())
+ if rxd_byte == ord('\r'):
+ # received message data okay
+ msg = Message(timestamp=timestamp,
+ arbitration_id=arb_id,
+ dlc=dlc,
+ data=data,
+ channel=self.channel_info,
+ is_extended_id=ext)
+ return msg, False
else:
return None, False
@@ -160,3 +207,4 @@
return self.ser.fileno()
# Return an invalid file descriptor on Windows
return -1
+