File rdserialtool-0.2.1+20200101.obscpio of Package rdserialtool
07070100000000000081A40000273B00000064000000015E0D4DE40000011E000000000000000000000000000000000000002500000000rdserialtool-0.2.1+20200101/MakefilePYTHON := python3
PANDOC := pandoc
all: build
build:
$(PYTHON) setup.py build
test: build
$(PYTHON) setup.py test
install: build
$(PYTHON) setup.py install
clean:
$(PYTHON) setup.py clean
$(RM) -r build MANIFEST
doc: README
README: README.md
$(PANDOC) -s -t plain -o $@ $<
07070100000001000081A40000273B00000064000000015E0D4DE400001A2F000000000000000000000000000000000000002300000000rdserialtool-0.2.1+20200101/README
RDSERIALTOOL - RDTECH UM/DPS/RD SERIES DEVICE INTERFACE TOOL
_This program is currently in an early stage and could change
significantly._
This program provides monitor, control and configuration access to
RDTech (RuiDeng, Riden) UM, DPS and RD series devices.
The UM24C, UM25C and UM34C are low-cost USB pass-through power
measurement devices, and support a decent number of collection features,
as well as full control via Bluetooth. (The non-C versions of these
devices support the same features as the C versions, but without
Bluetooth control.)
The DPS series are programmable DC-DC power supplies, and many devices
in the series support external communication via the Modbus RTU serial
protocol over USB or Bluetooth.
The RD6006 is a logical continuation of the DPS series and also uses
Modbus communication, but the registers are incompatible with previous
DPS series, so “RD” is treated as a separate series.
Compatibility
- UM24C, UM25C and UM34C support is complete and tested.
- DPS5005 support is complete and tested. Other devices in the DPS
series (DPS3005, DPS5015, DPS5020, DPS8005, DPH5005) should perform
identically. (Status reports and bugs welcome.)
- RD6006 has basic support and testing. Reading and writing most
states work.
- Tested under Python 3.6, but should work with 3.4 or later.
- Linux: Tested fine with both PyBluez (direct) and pyserial
(e.g. /dev/rfcomm0 via rfcomm bind), as well as direct USB serial
(e.g. /dev/ttyUSB0) on DPS devices.
- Windows: Tested fine with pyserial (e.g. COM4 as set up
automatically by Windows). Author could not get PyBluez
compiled/installed.
- MacOS: When using pyserial (e.g. /dev/cu.UM24C-Port as set up
automatically by MacOS), writes to the device would succeed
(e.g. 0xf2 to rotate the screen on UM series), but reads from the
device never arrive. Author could not get PyBluez
compiled/installed.
Setup
rdserialtool requires Python 3, and PyBluez and/or pyserial modules,
depending on which method you use to connect. Installation varies by
operating system, but on Debian/Ubuntu, these are available via the
python3-pybluez and python3-serial packages, respectively.
To install rdserialtool:
$ sudo python3 setup.py install
rdserialtool may also be run directly from its source directory without
installation.
Bluetooth setup
Varies by operating system. If the pairing procedure asks for a PIN,
enter 1234.
For command-line installation on Linux:
$ bluetoothctl
Agent registered
[bluetooth]# scan on
Discovery started
[NEW] Device 00:90:72:56:98:D7 UM24C
[CHG] Device 00:90:72:56:98:D7 RSSI: -60
[bluetooth]# pair 00:90:72:56:98:D7
Attempting to pair with 00:90:72:56:98:D7
[CHG] Device 00:90:72:56:98:D7 Connected: yes
Request PIN code
[UM241m[agent] Enter PIN code: 1234
[CHG] Device 00:90:72:56:98:D7 UUIDs: 00001101-0000-1000-8000-00805f9b34fb
[CHG] Device 00:90:72:56:98:D7 ServicesResolved: yes
[CHG] Device 00:90:72:56:98:D7 Paired: yes
Pairing successful
[bluetooth]# trust 00:90:72:56:98:D7
[CHG] Device 00:90:72:56:98:D7 Trusted: yes
Changing 00:90:72:56:98:D7 trust succeeded
[bluetooth]# exit
Agent unregistered
Device MAC address will vary. Again, the PIN for the device is 1234.
If you then want to use rdserialtool via direct serial, bind it via
rfcomm:
$ sudo rfcomm bind 0 00:90:72:56:98:D7
Usage
A number of options common to device access are available to all
commands; see:
$ rdserialtool --help
After the common options, a command is required (commands available are
in --help above). For example, to get device information from a UM24C
via PyBluez:
$ rdserialtool --device=um24c --bluetooth-address=00:90:72:56:98:D7
Or via pyserial:
$ rdserialtool --device=um24c --serial-device=/dev/rfcomm0
To turn the output on for a DPS device:
$ rdserialtool --device=dps --bluetooth-address=00:BA:68:00:47:3A --on
Example
$ rdserialtool --device=um25c --bluetooth-address=00:15:A6:00:36:2F
rdserialtool
Copyright (C) 2019 Ryan Finnie
Connecting to UM25C 00:15:A6:00:36:2F
Connection established
USB: 5.062V, 0.1146A, 0.580W, 44.1Ω
Data: 0.01V(+), 0.00V(-), charging mode: DCP 1.5A
Recording (off): 0.000Ah, 0.000Wh, 0 sec at >= 0.13A
Data groups:
*0: 0.001Ah, 0.009Wh 5: 0.000Ah, 0.000Wh
1: 0.000Ah, 0.000Wh 6: 0.000Ah, 0.000Wh
2: 0.000Ah, 0.000Wh 7: 0.000Ah, 0.000Wh
3: 0.000Ah, 0.000Wh 8: 0.000Ah, 0.000Wh
4: 0.000Ah, 0.000Wh 9: 0.000Ah, 0.000Wh
UM25C, temperature: 25C ( 78F)
Screen: 1/6, brightness: 4/5, timeout: 2 min
Collection time: 2019-02-23 22:53:08.468732
$ rdserialtool --device=dps --bluetooth-address=00:BA:68:00:47:3A
rdserialtool
Copyright (C) 2019 Ryan Finnie
Connecting to DPS 00:BA:68:00:47:3A
Connection established
Setting: 5.00V, 5.100A (CV)
Output (on) : 5.00V, 0.15A, 0.07W
Input: 19.30V, protection: good
Brightness: 4/5, key lock: off
Model: 5005, firmware: 14
Collection time: 2019-02-23 22:55:24.721946
$ rdserialtool --device=rd --serial-device=/dev/ttyUSB0 --baud=115200
rdserialtool
Copyright (C) 2019 Ryan Finnie
Connecting to RD /dev/ttyUSB0
Connection established
Setting: 15.00V, 0.998A (CV)
Output (on) : 14.99V, 0.14A, 0.20W
Input: 50.15V, protection: good
Brightness: 4/5, key lock: off
Model: 60062, firmware: 125, serial: 5403
Collection time: 2019-12-28 21:16:07.114146
About
Copyright (C) 2019 Ryan Finnie
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at
your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
This tool is not affiliated with or endorsed by RDTech.
See also
- RDTech UM series on the sigrok wiki, which contains a lot of
information and reverse engineering of the protocol used on these
devices.
- DPS5005 communication protocol and Android/Windows software, from
the manufacturer.
- opendps, a replacement firmware package for the DPS5005.
(Incompatible with rdserialtool, as opendps uses its own
communication interface.)
07070100000002000081A40000273B00000064000000015E0D4DE400001C78000000000000000000000000000000000000002600000000rdserialtool-0.2.1+20200101/README.md# rdserialtool - RDTech UM/DPS/RD series device interface tool
*This program is currently in an early stage and could change significantly.*
This program provides monitor, control and configuration access to [RDTech (RuiDeng, Riden)](https://rdtech.aliexpress.com/store/923042) UM, DPS and RD series devices.
The [UM24C](https://www.aliexpress.com/item/RD-UM24-UM24C-for-APP-USB-2-0-LCD-Display-Voltmeter-ammeter-battery-charge-voltage-current/32845522857.html), [UM25C](https://www.aliexpress.com/store/product/RD-UM25-UM25C-for-APP-USB-2-0-Type-C-LCD-Voltmeter-ammeter-voltage-current-meter/923042_32855845265.html) and [UM34C](https://www.aliexpress.com/store/product/RD-UM34-UM34C-for-APP-USB-3-0-Type-C-DC-Voltmeter-ammeter-voltage-current-meter/923042_32880908871.html) are low-cost USB pass-through power measurement devices, and support a decent number of collection features, as well as full control via Bluetooth. (The non-C versions of these devices support the same features as the C versions, but without Bluetooth control.)
The [DPS series](https://rdtech.aliexpress.com/store/923042) are programmable DC-DC power supplies, and many devices in the series support external communication via the [Modbus](https://en.wikipedia.org/wiki/Modbus) RTU serial protocol over USB or Bluetooth.
The RD6006 is a logical continuation of the DPS series and also uses Modbus communication, but the registers are incompatible with previous DPS series, so "RD" is treated as a separate series.
## Compatibility
* UM24C, UM25C and UM34C support is complete and tested.
* DPS5005 support is complete and tested. Other devices in the DPS series (DPS3005, DPS5015, DPS5020, DPS8005, DPH5005) should perform identically. (Status reports and bugs welcome.)
* RD6006 has basic support and testing. Reading and writing most states work.
* Tested under Python 3.6, but should work with 3.4 or later.
* Linux: Tested fine with both PyBluez (direct) and pyserial (e.g. /dev/rfcomm0 via ```rfcomm bind```), as well as direct USB serial (e.g. /dev/ttyUSB0) on DPS devices.
* Windows: Tested fine with pyserial (e.g. COM4 as set up automatically by Windows). Author could not get PyBluez compiled/installed.
* MacOS: When using pyserial (e.g. /dev/cu.UM24C-Port as set up automatically by MacOS), writes to the device would succeed (e.g. 0xf2 to rotate the screen on UM series), but reads from the device never arrive. Author could not get PyBluez compiled/installed.
## Setup
rdserialtool requires Python 3, and [PyBluez](https://pypi.org/project/PyBluez/) and/or [pyserial](https://pypi.org/project/pyserial/) modules, depending on which method you use to connect. Installation varies by operating system, but on Debian/Ubuntu, these are available via the python3-pybluez and python3-serial packages, respectively.
To install rdserialtool:
```
$ sudo python3 setup.py install
```
rdserialtool may also be run directly from its source directory without installation.
## Bluetooth setup
Varies by operating system. If the pairing procedure asks for a PIN, enter 1234.
For command-line installation on Linux:
```
$ bluetoothctl
Agent registered
[bluetooth]# scan on
Discovery started
[NEW] Device 00:90:72:56:98:D7 UM24C
[CHG] Device 00:90:72:56:98:D7 RSSI: -60
[bluetooth]# pair 00:90:72:56:98:D7
Attempting to pair with 00:90:72:56:98:D7
[CHG] Device 00:90:72:56:98:D7 Connected: yes
Request PIN code
[UM241m[agent] Enter PIN code: 1234
[CHG] Device 00:90:72:56:98:D7 UUIDs: 00001101-0000-1000-8000-00805f9b34fb
[CHG] Device 00:90:72:56:98:D7 ServicesResolved: yes
[CHG] Device 00:90:72:56:98:D7 Paired: yes
Pairing successful
[bluetooth]# trust 00:90:72:56:98:D7
[CHG] Device 00:90:72:56:98:D7 Trusted: yes
Changing 00:90:72:56:98:D7 trust succeeded
[bluetooth]# exit
Agent unregistered
```
Device MAC address will vary. Again, the PIN for the device is 1234.
If you then want to use rdserialtool via direct serial, bind it via rfcomm:
```
$ sudo rfcomm bind 0 00:90:72:56:98:D7
```
## Usage
A number of options common to device access are available to all commands; see:
```
$ rdserialtool --help
```
After the common options, a command is required (commands available are in ```--help``` above). For example, to get device information from a UM24C via PyBluez:
```
$ rdserialtool --device=um24c --bluetooth-address=00:90:72:56:98:D7
```
Or via pyserial:
```
$ rdserialtool --device=um24c --serial-device=/dev/rfcomm0
```
To turn the output on for a DPS device:
```
$ rdserialtool --device=dps --bluetooth-address=00:BA:68:00:47:3A --on
```
## Example
```
$ rdserialtool --device=um25c --bluetooth-address=00:15:A6:00:36:2F
rdserialtool
Copyright (C) 2019 Ryan Finnie
Connecting to UM25C 00:15:A6:00:36:2F
Connection established
USB: 5.062V, 0.1146A, 0.580W, 44.1Ω
Data: 0.01V(+), 0.00V(-), charging mode: DCP 1.5A
Recording (off): 0.000Ah, 0.000Wh, 0 sec at >= 0.13A
Data groups:
*0: 0.001Ah, 0.009Wh 5: 0.000Ah, 0.000Wh
1: 0.000Ah, 0.000Wh 6: 0.000Ah, 0.000Wh
2: 0.000Ah, 0.000Wh 7: 0.000Ah, 0.000Wh
3: 0.000Ah, 0.000Wh 8: 0.000Ah, 0.000Wh
4: 0.000Ah, 0.000Wh 9: 0.000Ah, 0.000Wh
UM25C, temperature: 25C ( 78F)
Screen: 1/6, brightness: 4/5, timeout: 2 min
Collection time: 2019-02-23 22:53:08.468732
```
```
$ rdserialtool --device=dps --bluetooth-address=00:BA:68:00:47:3A
rdserialtool
Copyright (C) 2019 Ryan Finnie
Connecting to DPS 00:BA:68:00:47:3A
Connection established
Setting: 5.00V, 5.100A (CV)
Output (on) : 5.00V, 0.15A, 0.07W
Input: 19.30V, protection: good
Brightness: 4/5, key lock: off
Model: 5005, firmware: 14
Collection time: 2019-02-23 22:55:24.721946
```
```
$ rdserialtool --device=rd --serial-device=/dev/ttyUSB0 --baud=115200
rdserialtool
Copyright (C) 2019 Ryan Finnie
Connecting to RD /dev/ttyUSB0
Connection established
Setting: 15.00V, 0.998A (CV)
Output (on) : 14.99V, 0.14A, 0.20W
Input: 50.15V, protection: good
Brightness: 4/5, key lock: off
Model: 60062, firmware: 125, serial: 5403
Collection time: 2019-12-28 21:16:07.114146
```
## About
Copyright (C) 2019 Ryan Finnie
> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
>
> This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
This tool is not affiliated with or endorsed by RDTech.
## See also
* [RDTech UM series](https://sigrok.org/wiki/RDTech_UM_series) on the sigrok wiki, which contains a lot of information and reverse engineering of the protocol used on these devices.
* [DPS5005 communication protocol](https://www.mediafire.com/folder/3iogirsx1s0vp/DPS_communication_upper_computer#napmdzd4qt2dt) and Android/Windows software, from the manufacturer.
* [opendps](https://github.com/kanflo/opendps), a replacement firmware package for the DPS5005. (Incompatible with rdserialtool, as opendps uses its own communication interface.)
07070100000003000041ED0000273B00000064000000065E0D4DE400000000000000000000000000000000000000000000002500000000rdserialtool-0.2.1+20200101/rdserial07070100000004000081A40000273B00000064000000015E0D4DE40000033B000000000000000000000000000000000000003100000000rdserialtool-0.2.1+20200101/rdserial/__init__.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
import sys
assert(sys.version_info > (3, 4))
__version__ = '0.2.1'
07070100000005000041ED0000273B00000064000000025E0D4DE400000000000000000000000000000000000000000000002C00000000rdserialtool-0.2.1+20200101/rdserial/device07070100000006000081A40000273B00000064000000015E0D4DE400000DE9000000000000000000000000000000000000003800000000rdserialtool-0.2.1+20200101/rdserial/device/__init__.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
import logging
try:
import bluetooth
HAS_BLUETOOTH = True
except ImportError:
HAS_BLUETOOTH = False
try:
import serial
HAS_SERIAL = True
except ImportError:
HAS_SERIAL = False
class Serial:
def __init__(self, port, baudrate=9600):
if not HAS_SERIAL:
raise NotImplementedError('pyserial not available')
self.port = port
self.baudrate = baudrate
self.socket = None
def connect(self):
if self.socket:
return True
logging.debug('Serial: Connecting to {}'.format(self.port))
self.socket = serial.Serial()
self.socket.port = self.port
self.socket.baudrate = self.baudrate
self.socket.writeTimeout = 0
self.socket.open()
return self.socket is not None
def close(self):
if self.socket:
self.socket.close()
self.socket = None
def send(self, request):
if not request:
return 0
logging.debug('Serial: SEND begin ({})'.format(request))
size = self.socket.write(request)
logging.debug('Serial: SEND end ({} bytes)'.format(size))
return size
def recv(self, size):
result = b''
logging.debug('Serial: RECV begin')
while len(result) < size:
buf = self.socket.read()
result += buf
logging.debug('Serial: RECV end ({})'.format(result))
return result
def __str__(self):
return '%s' % self.port
class Bluetooth:
def __init__(self, address, port=1):
if not HAS_BLUETOOTH:
raise NotImplementedError('pybluez not available')
self.address = address
self.port = port
self.socket = None
def connect(self):
if self.socket:
return True
logging.debug('Bluetooth: Connecting to {} port {}'.format(self.address, self.port))
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.socket.connect((self.address, self.port))
return self.socket is not None
def close(self):
if self.socket:
self.socket.close()
self.socket = None
def send(self, request):
if not request:
return 0
logging.debug('Bluetooth: SEND begin ({})'.format(request))
size = self.socket.send(request)
logging.debug('Bluetooth: SEND end ({} bytes)'.format(size))
return size
def recv(self, size):
result = b''
logging.debug('Bluetooth: RECV begin')
while len(result) < size:
buf = self.socket.recv(size)
result += buf
logging.debug('Bluetooth: RECV end ({})'.format(result))
return result
def __str__(self):
return '%s:%s' % (self.address, self.port)
07070100000007000041ED0000273B00000064000000025E0D4DE400000000000000000000000000000000000000000000002900000000rdserialtool-0.2.1+20200101/rdserial/dps07070100000008000081A40000273B00000064000000015E0D4DE4000033B9000000000000000000000000000000000000003500000000rdserialtool-0.2.1+20200101/rdserial/dps/__init__.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
import datetime
PROTECTION_GOOD = 0
PROTECTION_OV = 1
PROTECTION_OC = 2
PROTECTION_OP = 3
def _simple_int(multiple=1):
return {
'from_int': lambda x: x / multiple,
'to_int': lambda x: int(x * multiple),
}
def _simple_bool():
return {
'from_int': lambda x: bool(x),
'to_int': lambda x: int(x),
}
class DPSDeviceState:
def __init__(self, collection_time=None):
self.register_properties = {
'setting_volts': {
'description': 'Voltage setting',
'register': 0x00,
**_simple_int(100),
},
'setting_amps': {
'description': 'Amperage setting',
'register': 0x01,
**_simple_int(1000),
},
'volts': {
'description': 'Output volts',
'register': 0x02,
**_simple_int(100),
},
'amps': {
'description': 'Output amps',
'register': 0x03,
**_simple_int(100),
},
'watts': {
'description': 'Output watts',
'register': 0x04,
**_simple_int(100),
},
'input_volts': {
'description': 'Input volts',
'register': 0x05,
**_simple_int(100),
},
'key_lock': {
'description': 'Key lock',
'register': 0x06,
**_simple_bool(),
},
'protection': {
'description': 'Protection status',
'register': 0x07,
**_simple_int(),
},
'constant_current': {
'description': 'Constant current mode',
'register': 0x08,
**_simple_bool(),
},
'output_state': {
'description': 'Output state',
'register': 0x09,
**_simple_bool(),
},
'brightness': {
'description': 'Brightness level',
'register': 0x0a,
**_simple_int(),
},
'model': {
'description': 'Device model',
'register': 0x0b,
**_simple_int(),
},
'firmware': {
'description': 'Device firmware',
'register': 0x0c,
**_simple_int(),
},
'group_loader': {
'description': 'Group loader',
'register': 0x23,
'from_int': lambda x: 0, # Write-only
'to_int': lambda x: int(x),
},
}
if collection_time is None:
collection_time = datetime.datetime.now()
self.collection_time = collection_time
for name in self.register_properties:
setattr(self, name, self.register_properties[name]['from_int'](0))
self.groups = {}
def load(self, data, offset=0):
pos_map = {v['register']: k for k, v in self.register_properties.items()}
i = 0
for raw_val in data:
val_pos = offset + i
if val_pos in pos_map:
name = pos_map[val_pos]
translation = self.register_properties[name]['from_int']
val = translation(raw_val)
setattr(self, name, val)
i = i + 1
class DPSGroupState:
def __init__(self, group):
self.group = group
self.register_properties = {
'setting_volts': {
'description': 'Voltage setting',
'register': 0x50 + (0x10 * group),
**_simple_int(100),
},
'setting_amps': {
'description': 'Amperage setting',
'register': 0x51 + (0x10 * group),
**_simple_int(1000),
},
'cutoff_volts': {
'description': 'Volts cutoff',
'register': 0x52 + (0x10 * group),
**_simple_int(100),
},
'cutoff_amps': {
'description': 'Amps cutoff',
'register': 0x53 + (0x10 * group),
**_simple_int(1000),
},
'cutoff_watts': {
'description': 'Watts cutoff',
'register': 0x54 + (0x10 * group),
**_simple_int(10),
},
'brightness': {
'description': 'Brightness level',
'register': 0x55 + (0x10 * group),
**_simple_int(),
},
'maintain_output': {
'description': 'Maintain output state during group change',
'register': 0x56 + (0x10 * group),
**_simple_bool(),
},
'poweron_output': {
'description': 'Enable output on power-on',
'register': 0x57 + (0x10 * group),
**_simple_bool(),
},
}
for name in self.register_properties:
setattr(self, name, self.register_properties[name]['from_int'](0))
def load(self, data, offset=0):
pos_map = {v['register']: k for k, v in self.register_properties.items()}
i = 0
for raw_val in data:
val_pos = offset + i
if val_pos in pos_map:
name = pos_map[val_pos]
translation = self.register_properties[name]['from_int']
val = translation(raw_val)
setattr(self, name, val)
i = i + 1
class RDDeviceState:
def __init__(self, collection_time=None):
self.register_properties = {
'model': {
'description': 'Device model',
'register': 0x00,
**_simple_int(),
},
'serial': {
'description': 'Device serial',
'register': 0x02, # 0x01 high?
**_simple_int(),
},
'firmware': {
'description': 'Device firmware',
'register': 0x03,
**_simple_int(),
},
'fan_temp_c': {
'description': 'Fan start temperature (C)',
'register': 0x05, # 0x04 high?
**_simple_int(),
},
'fan_temp_f': {
'description': 'Fan start temperature (F)',
'register': 0x07, # 0x06 high?
**_simple_int(),
},
'setting_volts': {
'description': 'Voltage setting',
'register': 0x08,
**_simple_int(100),
},
'setting_amps': {
'description': 'Amperage setting',
'register': 0x09,
**_simple_int(1000),
},
'volts': {
'description': 'Output volts',
'register': 0x0a,
**_simple_int(100),
},
'amps': {
'description': 'Output amps',
'register': 0x0b,
**_simple_int(100),
},
'watts': {
'description': 'Output watts',
'register': 0x0d, # 0x0c high?
**_simple_int(100),
},
'input_volts': {
'description': 'Input volts',
'register': 0x0e,
**_simple_int(100),
},
'key_lock': {
'description': 'Key lock',
'register': 0x0f,
**_simple_bool(),
},
'protection': {
'description': 'Protection status',
'register': 0x10,
**_simple_int(),
},
'constant_current': {
'description': 'Constant current mode',
'register': 0x11,
**_simple_bool(),
},
'output_state': {
'description': 'Output state',
'register': 0x12,
**_simple_bool(),
},
'group_loader': {
'description': 'Group loader',
'register': 0x13,
'from_int': lambda x: 0, # Write-only
'to_int': lambda x: int(x),
},
# 0x14 - 0x2f: All 0
# 0x21: Unknown, 1/2/3 observed
'temp_c': {
'description': 'Temperature (C)',
'register': 0x23, # 0x22 high?
**_simple_int(),
},
'temp_f': {
'description': 'Temperature (F)',
'register': 0x25, # 0x24 high?
**_simple_int(),
},
'cumulative_charge': {
'description': 'Cumulative charge (Ah)',
'register': 0x27, # 0x26 high?
**_simple_int(1000),
},
'cumulative_energy': {
'description': 'Cumulative energy (Wh)',
'register': 0x29, # 0x28 high?
**_simple_int(1000),
},
'datetime_year': {'description': 'Year', 'register': 0x30, **_simple_int()},
'datetime_month': {'description': 'Month', 'register': 0x31, **_simple_int()},
'datetime_day': {'description': 'Day', 'register': 0x32, **_simple_int()},
'datetime_hour': {'description': 'Hour', 'register': 0x33, **_simple_int()},
'datetime_minute': {'description': 'Minute', 'register': 0x34, **_simple_int()},
'datetime_second': {'description': 'Second', 'register': 0x35, **_simple_int()},
'brightness': {
'description': 'Brightness level',
'register': 0x48,
**_simple_int(),
},
'ovp': {
'description': 'Over-voltage limit (V)',
'register': 0x52,
**_simple_int(100),
},
'ocp': {
'description': 'Over-current limit (A)',
'register': 0x53,
**_simple_int(1000),
},
}
if collection_time is None:
collection_time = datetime.datetime.now()
self.collection_time = collection_time
for name in self.register_properties:
setattr(self, name, self.register_properties[name]['from_int'](0))
self.groups = {}
def load(self, data, offset=0):
pos_map = {v['register']: k for k, v in self.register_properties.items()}
i = 0
for raw_val in data:
val_pos = offset + i
if val_pos in pos_map:
name = pos_map[val_pos]
translation = self.register_properties[name]['from_int']
val = translation(raw_val)
setattr(self, name, val)
i = i + 1
class RDGroupState:
def __init__(self, group):
self.group = group
self.register_properties = {
'setting_volts': {
'description': 'Voltage setting',
'register': 0x50 + (0x04 * group),
**_simple_int(100),
},
'setting_amps': {
'description': 'Amperage setting',
'register': 0x51 + (0x04 * group),
**_simple_int(1000),
},
'cutoff_volts': {
'description': 'Volts cutoff',
'register': 0x52 + (0x04 * group),
**_simple_int(100),
},
'cutoff_amps': {
'description': 'Amps cutoff',
'register': 0x53 + (0x04 * group),
**_simple_int(1000),
},
}
for name in self.register_properties:
setattr(self, name, self.register_properties[name]['from_int'](0))
def load(self, data, offset=0):
pos_map = {v['register']: k for k, v in self.register_properties.items()}
i = 0
for raw_val in data:
val_pos = offset + i
if val_pos in pos_map:
name = pos_map[val_pos]
translation = self.register_properties[name]['from_int']
val = translation(raw_val)
setattr(self, name, val)
i = i + 1
07070100000009000081A40000273B00000064000000015E0D4DE400003182000000000000000000000000000000000000003100000000rdserialtool-0.2.1+20200101/rdserial/dps/tool.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
import logging
import json
import datetime
import time
import statistics
import rdserial.dps
import rdserial.modbus
dps_supported_devices = ['dps', 'dps3005', 'dps5005', 'dps5015', 'dps5020', 'dps8005', 'dph5005']
rd_supported_devices = ['rd', 'rd6006']
supported_devices = dps_supported_devices + rd_supported_devices
class Tool:
def __init__(self, parent=None):
self.trends = {}
if parent is not None:
self.args = parent.args
self.socket = parent.socket
def trend_s(self, name, value):
if not self.args.watch:
return ''
if name in self.trends:
trend = statistics.mean(self.trends[name])
self.trends[name] = self.trends[name][1:] + [value]
if value > trend:
return '\u2197'
elif value < trend:
return '\u2198'
else:
return ' '
else:
self.trends[name] = [value for x in range(self.args.trend_points)]
return ' '
def send_commands(self):
register_commands = {}
device_state = self.device_state_class()
command_map = (
('set_volts', 'setting_volts'),
('set_amps', 'setting_amps'),
('set_output_state', 'output_state'),
('set_key_lock', 'key_lock'),
('set_brightness', 'brightness'),
('load_group', 'group_loader'),
)
group_command_map = (
('set_group_volts', 'setting_volts'),
('set_group_amps', 'setting_amps'),
('set_group_cutoff_volts', 'cutoff_volts'),
('set_group_cutoff_amps', 'cutoff_amps'),
)
if self.device_mode == 'dps':
group_command_map += (
('set_group_cutoff_watts', 'cutoff_watts'),
('set_group_brightness', 'brightness'),
('set_group_maintain_output', 'maintain_output'),
('set_group_poweron_output', 'poweron_output'),
)
for arg_name, register_name in command_map:
arg_val = getattr(self.args, arg_name)
if arg_val is None:
continue
translation = device_state.register_properties[register_name]['to_int']
description = device_state.register_properties[register_name]['description']
register_num = device_state.register_properties[register_name]['register']
register_val = translation(arg_val)
logging.info('Setting "{}" to {}'.format(
description, arg_val
))
logging.debug('{} "{}" (register {}): {} ({})'.format(
register_name, description, register_num, arg_val, register_val
))
register_commands[register_num] = register_val
if getattr(self.args, 'set_clock'):
logging.info('Setting device clock')
now = datetime.datetime.now()
for name in ('year', 'month', 'day', 'hour', 'minute', 'second'):
register_name = 'datetime_{}'.format(name)
val = getattr(now, name)
translation = device_state.register_properties[register_name]['to_int']
description = device_state.register_properties[register_name]['description']
register_num = device_state.register_properties[register_name]['register']
register_val = translation(val)
logging.debug('{} "{}" (register {}): {} ({})'.format(
register_name, description, register_num, val, register_val
))
register_commands[register_num] = register_val
if self.args.all_groups:
groups = range(10)
elif self.args.group is not None:
groups = self.args.group
else:
groups = []
for group in groups:
device_group_state = self.device_group_state_class(group)
for arg_name, register_name in group_command_map:
arg_val = getattr(self.args, arg_name)
if arg_val is None:
continue
translation = device_group_state.register_properties[register_name]['to_int']
description = device_group_state.register_properties[register_name]['description']
register_num = device_group_state.register_properties[register_name]['register']
register_val = translation(arg_val)
logging.info('Setting group {} "{}" to {}'.format(
group, description, arg_val
))
logging.debug('Group {} {} "{}" (register {}): {} ({})'.format(
group, register_name, description, register_num, arg_val, register_val
))
register_commands[register_num] = register_val
if len(register_commands) > 0:
logging.info('')
# Optimize into a set of minimal register writes
register_commands_opt = {}
for register in sorted(register_commands.keys()):
found_opt = False
for g in register_commands_opt:
if (register == g + len(register_commands_opt[g])) and (len(register_commands_opt[g]) < 32):
register_commands_opt[g].append(register_commands[register])
found_opt = True
break
if not found_opt:
register_commands_opt[register] = [register_commands[register]]
for register_base in register_commands_opt:
logging.debug('Writing {} register(s) ({}) at base {}'.format(
len(register_commands_opt[register_base]),
register_commands_opt[register_base],
register_base,
))
self.modbus_client.write_registers(
register_base, register_commands_opt[register_base], unit=self.args.modbus_unit,
)
def print_human(self, device_state):
protection_map = {
rdserial.dps.PROTECTION_GOOD: 'good',
rdserial.dps.PROTECTION_OV: 'over-voltage',
rdserial.dps.PROTECTION_OC: 'over-current',
rdserial.dps.PROTECTION_OP: 'over-power',
}
print('Setting: {:5.02f}V, {:6.03f}A ({})'.format(
device_state.setting_volts,
device_state.setting_amps,
('CC' if device_state.constant_current else 'CV'),
))
print('Output {:5}: {:5.02f}V{}, {:5.02f}A{}, {:6.02f}W{}'.format(
('(on)' if device_state.output_state else '(off)'),
device_state.volts,
self.trend_s('volts', device_state.volts),
device_state.amps,
self.trend_s('amps', device_state.amps),
device_state.watts,
self.trend_s('watts', device_state.watts),
))
print('Input: {:5.02f}V{}, protection: {}'.format(
device_state.input_volts,
self.trend_s('input_volts', device_state.input_volts),
protection_map[device_state.protection],
))
print('Brightness: {}/5, key lock: {}'.format(
device_state.brightness,
'on' if device_state.key_lock else 'off',
))
if hasattr(device_state, 'serial'):
print('Model: {}, firmware: {}, serial: {}'.format(device_state.model, device_state.firmware, device_state.serial))
else:
print('Model: {}, firmware: {}'.format(device_state.model, device_state.firmware))
print('Collection time: {}'.format(device_state.collection_time))
if len(device_state.groups) > 0:
print()
for group, device_group_state in sorted(device_state.groups.items()):
print('Group {}:'.format(group))
print(' Setting: {:5.02f}V, {:6.03f}A'.format(device_group_state.setting_volts, device_group_state.setting_amps))
if hasattr(device_group_state, 'cutoff_watts'):
print(' Cutoff: {:5.02f}V, {:6.03f}A, {:5.01f}W'.format(
device_group_state.cutoff_volts,
device_group_state.cutoff_amps,
device_group_state.cutoff_watts,
))
else:
print(' Cutoff: {:5.02f}V, {:6.03f}A'.format(
device_group_state.cutoff_volts,
device_group_state.cutoff_amps,
))
if hasattr(device_group_state, 'brightness'):
print(' Brightness: {}/5'.format(device_group_state.brightness))
if hasattr(device_group_state, 'maintain_output'):
print(' Maintain output state: {}'.format(device_group_state.maintain_output))
if hasattr(device_group_state, 'poweron_output'):
print(' Output on power-on: {}'.format(device_group_state.poweron_output))
def print_json(self, device_state):
out = {x: getattr(device_state, x) for x in device_state.register_properties}
out['collection_time'] = (device_state.collection_time - datetime.datetime.fromtimestamp(0)).total_seconds()
out['groups'] = {}
for group, device_group_state in device_state.groups.items():
out['groups'][group] = {x: getattr(device_group_state, x) for x in device_group_state.register_properties}
print(json.dumps(out, sort_keys=True))
def assemble_device_state(self):
device_state = self.device_state_class()
registers_length = (85 if self.device_mode == 'rd' else 13)
registers = self.modbus_client.read_registers(
0x00, registers_length, unit=self.args.modbus_unit,
)
device_state.load(registers)
if self.args.all_groups:
groups = range(10)
elif self.args.group is not None:
groups = self.args.group
else:
groups = []
for group in groups:
register_offset = (0x04 if self.device_mode == 'rd' else 0x10)
device_group_state = self.device_group_state_class(group)
registers = self.modbus_client.read_registers(
0x50 + (register_offset * group),
len(device_group_state.register_properties),
unit=self.args.modbus_unit,
)
device_group_state.load(registers, offset=(0x50 + (register_offset * group)))
device_state.groups[group] = device_group_state
return device_state
def loop(self):
while True:
try:
device_state = self.assemble_device_state()
if self.args.json:
self.print_json(device_state)
else:
self.print_human(device_state)
except KeyboardInterrupt:
raise
except Exception:
if self.args.watch:
logging.exception('An exception has occurred')
else:
raise
if self.args.watch:
if not self.args.json:
print()
time.sleep(self.args.watch_seconds)
else:
return
def main(self):
if self.args.device in rd_supported_devices:
self.device_mode = 'rd'
self.device_state_class = rdserial.dps.RDDeviceState
self.device_group_state_class = rdserial.dps.RDGroupState
else:
self.device_mode = 'dps'
self.device_state_class = rdserial.dps.DPSDeviceState
self.device_group_state_class = rdserial.dps.DPSGroupState
self.modbus_client = rdserial.modbus.RTUClient(
self.socket,
baudrate=self.args.baud,
)
try:
self.send_commands()
self.loop()
except KeyboardInterrupt:
pass
0707010000000A000041ED0000273B00000064000000025E0D4DE400000000000000000000000000000000000000000000002C00000000rdserialtool-0.2.1+20200101/rdserial/modbus0707010000000B000081A40000273B00000064000000015E0D4DE4000019AD000000000000000000000000000000000000003800000000rdserialtool-0.2.1+20200101/rdserial/modbus/__init__.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
# Note that this is a massively simplified Modbus RTU client,
# and is not suitable for general Modbus use.
import time
import struct
import logging
def modbus_crc(data):
lookup_table = (
0x0000, 0xc0c1, 0xc181, 0x0140, 0xc301, 0x03c0, 0x0280, 0xc241,
0xc601, 0x06c0, 0x0780, 0xc741, 0x0500, 0xc5c1, 0xc481, 0x0440,
0xcc01, 0x0cc0, 0x0d80, 0xcd41, 0x0f00, 0xcfc1, 0xce81, 0x0e40,
0x0a00, 0xcac1, 0xcb81, 0x0b40, 0xc901, 0x09c0, 0x0880, 0xc841,
0xd801, 0x18c0, 0x1980, 0xd941, 0x1b00, 0xdbc1, 0xda81, 0x1a40,
0x1e00, 0xdec1, 0xdf81, 0x1f40, 0xdd01, 0x1dc0, 0x1c80, 0xdc41,
0x1400, 0xd4c1, 0xd581, 0x1540, 0xd701, 0x17c0, 0x1680, 0xd641,
0xd201, 0x12c0, 0x1380, 0xd341, 0x1100, 0xd1c1, 0xd081, 0x1040,
0xf001, 0x30c0, 0x3180, 0xf141, 0x3300, 0xf3c1, 0xf281, 0x3240,
0x3600, 0xf6c1, 0xf781, 0x3740, 0xf501, 0x35c0, 0x3480, 0xf441,
0x3c00, 0xfcc1, 0xfd81, 0x3d40, 0xff01, 0x3fc0, 0x3e80, 0xfe41,
0xfa01, 0x3ac0, 0x3b80, 0xfb41, 0x3900, 0xf9c1, 0xf881, 0x3840,
0x2800, 0xe8c1, 0xe981, 0x2940, 0xeb01, 0x2bc0, 0x2a80, 0xea41,
0xee01, 0x2ec0, 0x2f80, 0xef41, 0x2d00, 0xedc1, 0xec81, 0x2c40,
0xe401, 0x24c0, 0x2580, 0xe541, 0x2700, 0xe7c1, 0xe681, 0x2640,
0x2200, 0xe2c1, 0xe381, 0x2340, 0xe101, 0x21c0, 0x2080, 0xe041,
0xa001, 0x60c0, 0x6180, 0xa141, 0x6300, 0xa3c1, 0xa281, 0x6240,
0x6600, 0xa6c1, 0xa781, 0x6740, 0xa501, 0x65c0, 0x6480, 0xa441,
0x6c00, 0xacc1, 0xad81, 0x6d40, 0xaf01, 0x6fc0, 0x6e80, 0xae41,
0xaa01, 0x6ac0, 0x6b80, 0xab41, 0x6900, 0xa9c1, 0xa881, 0x6840,
0x7800, 0xb8c1, 0xb981, 0x7940, 0xbb01, 0x7bc0, 0x7a80, 0xba41,
0xbe01, 0x7ec0, 0x7f80, 0xbf41, 0x7d00, 0xbdc1, 0xbc81, 0x7c40,
0xb401, 0x74c0, 0x7580, 0xb541, 0x7700, 0xb7c1, 0xb681, 0x7640,
0x7200, 0xb2c1, 0xb381, 0x7340, 0xb101, 0x71c0, 0x7080, 0xb041,
0x5000, 0x90c1, 0x9181, 0x5140, 0x9301, 0x53c0, 0x5280, 0x9241,
0x9601, 0x56c0, 0x5780, 0x9741, 0x5500, 0x95c1, 0x9481, 0x5440,
0x9c01, 0x5cc0, 0x5d80, 0x9d41, 0x5f00, 0x9fc1, 0x9e81, 0x5e40,
0x5a00, 0x9ac1, 0x9b81, 0x5b40, 0x9901, 0x59c0, 0x5880, 0x9841,
0x8801, 0x48c0, 0x4980, 0x8941, 0x4b00, 0x8bc1, 0x8a81, 0x4a40,
0x4e00, 0x8ec1, 0x8f81, 0x4f40, 0x8d01, 0x4dc0, 0x4c80, 0x8c41,
0x4400, 0x84c1, 0x8581, 0x4540, 0x8701, 0x47c0, 0x4680, 0x8641,
0x8201, 0x42c0, 0x4380, 0x8341, 0x4100, 0x81c1, 0x8081, 0x4040,
)
crc = 0xffff
for b in data:
n = b ^ crc
crc >>= 8
crc ^= lookup_table[n % 256]
return crc
class RTUClient:
def __init__(self, socket, baudrate):
self.socket = socket
self._last_frame_end = time.time()
if baudrate > 19200:
self._silent_interval = 1.75/1000
else:
self._silent_interval = 3.5 * (1 + 8 + 2) / baudrate
def read_registers(self, base, length, unit=1):
request = struct.pack('>B', unit) + \
struct.pack('>B', 0x03) + \
struct.pack('>H', base) + \
struct.pack('>H', length)
request += struct.pack('<H', modbus_crc(request))
self.send(request)
expected_response_length = 5 + (2 * length)
response = self.recv(expected_response_length)
assert(struct.unpack('<H', response[-2:])[0] == modbus_crc(response[0:-2]))
assert(struct.unpack('>B', response[0:1])[0] == unit)
assert(struct.unpack('>B', response[1:2])[0] == 0x03)
assert(struct.unpack('>B', response[2:3])[0] == (length * 2))
registers = []
for i in range(length):
pos = 3 + (i * 2)
val = struct.unpack('>H', response[pos:pos+2])[0]
logging.debug('Register 0x{:02x}: {}'.format(pos, val))
registers.append(val)
return registers
def write_register(self, register, value, unit=1):
request = struct.pack('>B', unit) + \
struct.pack('>B', 0x06) + \
struct.pack('>H', register) + \
struct.pack('>H', value)
request += struct.pack('<H', modbus_crc(request))
self.send(request)
expected_response_length = 8
response = self.recv(expected_response_length)
assert(response == request)
def write_registers(self, register, values, unit=1):
request = struct.pack('>B', unit) + \
struct.pack('>B', 0x10) + \
struct.pack('>H', register) + \
struct.pack('>H', len(values)) + \
struct.pack('>B', len(values) * 2)
for value in values:
request += struct.pack('>H', value)
request += struct.pack('<H', modbus_crc(request))
self.send(request)
expected_response_length = 8
response = self.recv(expected_response_length)
assert(struct.unpack('<H', response[-2:])[0] == modbus_crc(response[0:-2]))
assert(struct.unpack('>B', response[0:1])[0] == unit)
assert(struct.unpack('>B', response[1:2])[0] == 0x10)
assert(struct.unpack('>H', response[2:4])[0] == register)
assert(struct.unpack('>H', response[4:6])[0] == len(values))
def send(self, data):
ts = time.time()
if ts < self._last_frame_end + self._silent_interval:
to_sleep = self._last_frame_end + self._silent_interval - ts
logging.debug('Sleeping {} for 3.5 char ({}) quiet period'.format(
to_sleep,
self._silent_interval,
))
time.sleep(to_sleep)
result = self.socket.send(data)
self._last_frame_end = time.time()
return result
def recv(self, size):
result = self.socket.recv(size)
self._last_frame_end = time.time()
return result
0707010000000C000081A40000273B00000064000000015E0D4DE400002673000000000000000000000000000000000000002D00000000rdserialtool-0.2.1+20200101/rdserial/tool.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
import argparse
import sys
import os
import logging
import time
from rdserial import __version__
import rdserial.device
import rdserial.um.tool
import rdserial.dps.tool
def parse_args(argv=None):
"""Parse user arguments."""
if argv is None:
argv = sys.argv
def loose_bool(val):
return val.lower() in ('on', 'true', 'yes')
def validate_set_record_threshold(string):
val = float(string)
if val not in [x / 100 for x in range(31)]:
raise argparse.ArgumentTypeError('Must be between 0.00 and 0.30, in 0.01 steps')
return val
parser = argparse.ArgumentParser(
description='rdserialtool ({})'.format(__version__),
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
prog=os.path.basename(argv[0]),
)
parser.add_argument(
'--version', '-V', action='version',
version=__version__,
help='Report the program version',
)
parser.add_argument(
'--quiet', '-q', action='store_true',
help='Suppress human-readable stderr information',
)
parser.add_argument(
'--debug', action='store_true',
help='Print extra debugging information.',
)
supported_devices = []
supported_devices += rdserial.um.tool.supported_devices
supported_devices += rdserial.dps.tool.supported_devices
parser.add_argument(
'--device', '-d', required=True,
choices=sorted(supported_devices),
help='Device type',
)
device_group = parser.add_mutually_exclusive_group(required=True)
device_group.add_argument(
'--bluetooth-address', '-b',
help='Bluetooth EUI-48 address of the device',
)
device_group.add_argument(
'--serial-device', '-s',
help='Serial filename (e.g. /dev/rfcomm0) of the device',
)
parser.add_argument(
'--bluetooth-port', type=int, default=1,
help='Bluetooth RFCOMM port number',
)
parser.add_argument(
'--baud', type=int, default=9600,
help='Serial port baud rate',
)
parser.add_argument(
'--connect-delay', type=float, default=0.3,
help='Seconds to wait after connecting to the serial port',
)
parser.add_argument(
'--json', action='store_true',
help='Output JSON data',
)
parser.add_argument(
'--watch', action='store_true',
help='Repeat data collection until cancelled',
)
parser.add_argument(
'--watch-seconds', type=float, default=2.0,
help='Number of seconds between collections in watch mode',
)
parser.add_argument(
'--trend-points', type=int, default=5,
help='Number of points to remember for determining a trend in watch mode',
)
parser_group_dps = parser.add_argument_group(
'DPS/RD-related arguments'
)
parser_group_dps.add_argument(
'--modbus-unit', type=int, default=1,
help='Modbus unit number',
)
parser_group_dps.add_argument(
'--group', type=int, action='append',
help='Display/set selected group(s)',
)
parser_group_dps.add_argument(
'--all-groups', action='store_true',
help='Display/set all groups',
)
parser_group_dps.add_argument(
'--set-volts', type=float, default=None,
help='Set voltage setting',
)
parser_group_dps.add_argument(
'--set-amps', type=float, default=None,
help='Set current setting',
)
parser_group_dps.add_argument(
'--set-clock', action='store_true',
help='Set clock to current time [RD]',
)
onoff_group = parser_group_dps.add_mutually_exclusive_group(required=False)
onoff_group.add_argument(
'--set-output-state', type=loose_bool, dest='set_output_state', default=None,
help='Set output on/off',
)
onoff_group.add_argument(
'--on', action='store_true', dest='set_output_state',
help='Set output on',
)
onoff_group.add_argument(
'--off', action='store_false', dest='set_output_state',
help='Set output off',
)
parser_group_dps.add_argument(
'--set-key-lock', type=loose_bool, default=None,
help='Set key lock on/off',
)
parser_group_dps.add_argument(
'--set-brightness', type=int, choices=range(6), default=None,
help='Set screen brightness',
)
parser_group_dps.add_argument(
'--load-group', type=int, choices=range(10), default=None,
help='Load group settings into group 0',
)
parser_group_dps.add_argument(
'--set-group-volts', type=float, default=None,
help='Set group voltage setting',
)
parser_group_dps.add_argument(
'--set-group-amps', type=float, default=None,
help='Set group current setting',
)
parser_group_dps.add_argument(
'--set-group-cutoff-volts', type=float, default=None,
help='Set group cutoff volts',
)
parser_group_dps.add_argument(
'--set-group-cutoff-amps', type=float, default=None,
help='Set group cutoff amps',
)
parser_group_dps.add_argument(
'--set-group-cutoff-watts', type=float, default=None,
help='Set group cutoff watts',
)
parser_group_dps.add_argument(
'--set-group-brightness', type=int, choices=range(6), default=None,
help='Set group screen brightness',
)
parser_group_dps.add_argument(
'--set-group-maintain-output', type=loose_bool, default=None,
help='Set group maintain output state during group change',
)
parser_group_dps.add_argument(
'--set-group-poweron-output', type=loose_bool, default=None,
help='Set group enable output on power-on',
)
parser_group_um = parser.add_argument_group(
'UM-related arguments'
)
parser_group_um.add_argument(
'--next-screen', action='store_true',
help='Go to the next screen on the display',
)
parser_group_um.add_argument(
'--rotate-screen', action='store_true',
help='Rotate the screen 90 degrees clockwise',
)
parser_group_um.add_argument(
'--clear-data-group', action='store_true',
help='Clear the current data group',
)
parser_group_um.add_argument(
'--set-record-threshold', type=validate_set_record_threshold, default=None,
help='Set the recording threshold, 0.00-0.30 inclusive',
)
parser_group_um.add_argument(
'--set-screen-brightness', type=int, choices=range(6), default=None,
help='Set the screen brightness',
)
parser_group_um.add_argument(
'--set-screen-timeout', type=int, choices=range(10), default=None,
help='Set the screen timeout',
)
parser_group_um.add_argument(
'--previous-screen', action='store_true',
help='Go to the previous screen on the display',
)
parser_group_um.add_argument(
'--set-data-group', type=int, choices=range(10), default=None,
help='Set the selected data group',
)
parser_group_um.add_argument(
'--next-data-group', action='store_true',
help='Change to the next data group',
)
args = parser.parse_args(args=argv[1:])
return args
class RDSerialTool:
def setup_logging(self):
logging_format = '%(message)s'
if self.args.debug:
logging_level = logging.DEBUG
logging_format = '%(asctime)s %(levelname)s: %(message)s'
elif self.args.quiet:
logging_level = logging.ERROR
else:
logging_level = logging.INFO
logging.basicConfig(
format=logging_format,
level=logging_level,
)
def main(self):
self.args = parse_args()
self.setup_logging()
logging.info('rdserialtool {}'.format(__version__))
logging.info('Copyright (C) 2019 Ryan Finnie')
logging.info('')
if self.args.serial_device:
logging.info('Connecting to {} {}'.format(self.args.device.upper(), self.args.serial_device))
self.socket = rdserial.device.Serial(
self.args.serial_device,
baudrate=self.args.baud,
)
else:
logging.info('Connecting to {} {}'.format(self.args.device.upper(), self.args.bluetooth_address))
self.socket = rdserial.device.Bluetooth(
self.args.bluetooth_address,
port=self.args.bluetooth_port,
)
self.socket.connect()
logging.info('Connection established')
logging.info('')
time.sleep(self.args.connect_delay)
if self.args.device in rdserial.um.tool.supported_devices:
tool = rdserial.um.tool.Tool(self)
elif self.args.device in rdserial.dps.tool.supported_devices:
tool = rdserial.dps.tool.Tool(self)
ret = tool.main()
self.socket.close()
return ret
def main():
return RDSerialTool().main()
if __name__ == '__main__':
sys.exit(main())
0707010000000D000041ED0000273B00000064000000025E0D4DE400000000000000000000000000000000000000000000002800000000rdserialtool-0.2.1+20200101/rdserial/um0707010000000E000081A40000273B00000064000000015E0D4DE40000244D000000000000000000000000000000000000003400000000rdserialtool-0.2.1+20200101/rdserial/um/__init__.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
import struct
import datetime
import logging
CHARGING_UNKNOWN = 0
CHARGING_QC2 = 1
CHARGING_QC3 = 2
CHARGING_APP2_4A = 3
CHARGING_APP2_1A = 4
CHARGING_APP1_0A = 5
CHARGING_APP0_5A = 6
CHARGING_DCP1_5A = 7
CHARGING_SAMSUNG = 8
class DataGroup:
group = 0
amp_hours = 0
watt_hours = 0
def __repr__(self):
return ('<DataGroup {}: {:0.03f}Ah, {:0.03f}Wh>'.format(
self.group,
self.amp_hours,
self.watt_hours,
))
def __init__(self, group=0):
self.group = group
class Response:
def __repr__(self):
return ('<Response: {} at {}, {:0.02f}V, {:0.03f}A>'.format(
self.device_type,
self.collection_time,
self.volts,
self.amps,
))
def __init__(self, data=None, collection_time=None, device_type='UM24C'):
self.device_type = device_type
if device_type == 'UM25C':
self.device_multiplier = 10
else:
self.device_multiplier = 1
self.field_properties = {
'start': {
'description': 'Start bytes',
'position': 0,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'volts': {
'description': 'Volts',
'position': 2,
'length': 2,
'from_int': lambda x: x / (100 * self.device_multiplier),
'to_int': lambda x: int(x * (100 * self.device_multiplier)),
},
'amps': {
'description': 'Amps',
'position': 4,
'length': 2,
'from_int': lambda x: x / (1000 * self.device_multiplier),
'to_int': lambda x: int(x * (1000 * self.device_multiplier)),
},
'watts': {
'description': 'Watts',
'position': 6,
'length': 4,
'from_int': lambda x: x / 1000,
'to_int': lambda x: int(x * 1000),
},
'temp_c': {
'description': 'Temperature (Celsius)',
'position': 10,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'temp_f': {
'description': 'Temperature (Fahrenheit)',
'position': 12,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'data_group_selected': {
'description': 'Currently selected data group',
'position': 14,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'data_line_positive_volts': {
'description': 'Positive data line volts',
'position': 96,
'length': 2,
'from_int': lambda x: x / 100,
'to_int': lambda x: int(x * 100),
},
'data_line_negative_volts': {
'description': 'Negative data line volts',
'position': 98,
'length': 2,
'from_int': lambda x: x / 100,
'to_int': lambda x: int(x * 100),
},
'charging_mode': {
'description': 'Charging mode',
'position': 100,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'record_amphours': {
'description': 'Recorded amp-hours',
'position': 102,
'length': 4,
'from_int': lambda x: x / 1000,
'to_int': lambda x: int(x * 1000),
},
'record_watthours': {
'description': 'Recorded watt-hours',
'position': 106,
'length': 4,
'from_int': lambda x: x / 1000,
'to_int': lambda x: int(x * 1000),
},
'record_threshold': {
'description': 'Recording threshold (Amps)',
'position': 110,
'length': 2,
'from_int': lambda x: x / 100,
'to_int': lambda x: int(x * 100),
},
'record_seconds': {
'description': 'Recorded time (Seconds)',
'position': 112,
'length': 4,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'recording': {
'description': 'Recording',
'position': 116,
'length': 2,
'from_int': lambda x: bool(x),
'to_int': lambda x: int(x),
},
'screen_timeout': {
'description': 'Screen timeout (Minutes)',
'position': 118,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'screen_brightness': {
'description': 'Screen brightness',
'position': 120,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'resistance': {
'description': 'Resistance (Ohms)',
'position': 122,
'length': 4,
'from_int': lambda x: x / 10,
'to_int': lambda x: int(x * 10),
},
'screen_selected': {
'description': 'Currently selected screen',
'position': 126,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
'end': {
'description': 'End bytes',
'position': 128,
'length': 2,
'from_int': lambda x: x,
'to_int': lambda x: int(x),
},
}
if collection_time is None:
collection_time = datetime.datetime.now()
self.collection_time = collection_time
for name in self.field_properties:
setattr(self, name, 0)
self.data_groups = [DataGroup(x) for x in range(10)]
if data:
self.load(data)
def dump(self):
data = bytearray(130)
for name in self.field_properties:
pos = self.field_properties[name]['position']
pos_len = self.field_properties[name]['length']
if pos_len == 2:
pack_format = '>H'
elif pos_len == 4:
pack_format = '>L'
else:
pack_format = 'B'
conversion_dump = self.field_properties[name]['to_int']
data[pos:pos+pos_len] = struct.pack(pack_format, conversion_dump(getattr(self, name)))
for data_group in self.data_groups:
if (data_group.group > 9) or (data_group.group < 0):
continue
pos = 16 + (data_group.group * 8)
data[pos:pos+4] = struct.pack('>L', int(data_group.amp_hours * 1000))
data[pos+4:pos+8] = struct.pack('>L', int(data_group.watt_hours * 1000))
return bytes(data)
def load(self, data):
if len(data) != 130:
raise ValueError('Invalid data length', data)
logging.debug('Start: 0x{:02x}{:02x}, end: 0x{:02x}{:02x}'.format(data[0], data[1], data[128], data[129]))
for name in self.field_properties:
pos = self.field_properties[name]['position']
pos_len = self.field_properties[name]['length']
if pos_len == 2:
pack_format = '>H'
elif pos_len == 4:
pack_format = '>L'
else:
pack_format = 'B'
conversion_load = self.field_properties[name]['from_int']
val = conversion_load(struct.unpack(pack_format, data[pos:pos+pos_len])[0])
setattr(self, name, val)
self.data_groups = []
for i in range(10):
data_group = DataGroup(i)
pos = 16 + (i * 8)
data_group.amp_hours = struct.unpack('>L', data[pos:pos+4])[0] / 1000
data_group.watt_hours = struct.unpack('>L', data[pos+4:pos+8])[0] / 1000
self.data_groups.append(data_group)
0707010000000F000081A40000273B00000064000000015E0D4DE4000020E0000000000000000000000000000000000000003000000000rdserialtool-0.2.1+20200101/rdserial/um/tool.py# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
import json
import time
import datetime
import logging
import statistics
import rdserial.um
supported_devices = ['um24c', 'um25c', 'um34c']
class Tool:
def __init__(self, parent=None):
self.trends = {}
if parent is not None:
self.args = parent.args
self.socket = parent.socket
def trend_s(self, name, value):
if not self.args.watch:
return ''
if name in self.trends:
trend = statistics.mean(self.trends[name])
self.trends[name] = self.trends[name][1:] + [value]
if value > trend:
return '\u2197'
elif value < trend:
return '\u2198'
else:
return ' '
else:
self.trends[name] = [value for x in range(self.args.trend_points)]
return ' '
def print_json(self, response):
out = {x: getattr(response, x) for x in response.field_properties}
out['data_groups'] = [{'amp_hours': x.amp_hours, 'watt_hours': x.watt_hours} for x in response.data_groups]
out['collection_time'] = (response.collection_time - datetime.datetime.fromtimestamp(0)).total_seconds()
print(json.dumps(out, sort_keys=True))
def print_human(self, response):
logging.debug('DUMP: {}'.format(repr(response.dump())))
charging_map = {
rdserial.um.CHARGING_UNKNOWN: 'Unknown / Normal',
rdserial.um.CHARGING_QC2: 'Quick Charge 2.0',
rdserial.um.CHARGING_QC3: 'Quick Charge 3.0',
rdserial.um.CHARGING_APP2_4A: 'Apple 2.4A',
rdserial.um.CHARGING_APP2_1A: 'Apple 2.1A',
rdserial.um.CHARGING_APP1_0A: 'Apple 1.0A',
rdserial.um.CHARGING_APP0_5A: 'Apple 0.5A',
rdserial.um.CHARGING_DCP1_5A: 'DCP 1.5A',
rdserial.um.CHARGING_SAMSUNG: 'Samsung',
}
if self.args.device == 'um25c':
usb_format = 'USB: {:5.03f}V{}, {:6.04f}A{}, {:6.03f}W{}, {:6.01f}Ω{}'
else:
usb_format = 'USB: {:5.02f}V{}, {:6.03f}A{}, {:6.03f}W{}, {:6.01f}Ω{}'
print(usb_format.format(
response.volts,
self.trend_s('volts', response.volts),
response.amps,
self.trend_s('amps', response.amps),
response.watts,
self.trend_s('watts', response.watts),
response.resistance,
self.trend_s('resistance', response.resistance),
))
print('Data: {:5.02f}V(+){}, {:5.02f}V(-){}, charging mode: {}'.format(
response.data_line_positive_volts,
self.trend_s('data_line_positive_volts', response.data_line_positive_volts),
response.data_line_negative_volts,
self.trend_s('data_line_negative_volts', response.data_line_negative_volts),
charging_map[response.charging_mode],
))
print('Recording {:5}: {:8.03f}Ah{}, {:8.03f}Wh{}, {:6d}{} sec at >= {:4.02f}A'.format(
'(on)' if response.recording else '(off)',
response.record_amphours,
self.trend_s('record_amphours', response.record_amphours),
response.record_watthours,
self.trend_s('record_watthours', response.record_watthours),
response.record_seconds,
self.trend_s('record_seconds', response.record_seconds),
response.record_threshold,
))
def make_dgpart(response, idx):
data_group = response.data_groups[idx]
return '{}{:d}: {:8.03f}Ah{}, {:8.03f}Wh{}'.format(
'*' if data_group.group == response.data_group_selected else ' ',
data_group.group,
data_group.amp_hours,
self.trend_s('dg_{}_amp_hours'.format(data_group.group), data_group.amp_hours),
data_group.watt_hours,
self.trend_s('dg_{}_watt_hours'.format(data_group.group), data_group.watt_hours),
)
print('Data groups:')
print(' {:32}{}'.format(
make_dgpart(response, 0),
make_dgpart(response, 5),
))
print(' {:32}{}'.format(
make_dgpart(response, 1),
make_dgpart(response, 6),
))
print(' {:32}{}'.format(
make_dgpart(response, 2),
make_dgpart(response, 7),
))
print(' {:32}{}'.format(
make_dgpart(response, 3),
make_dgpart(response, 8),
))
print(' {:32}{}'.format(
make_dgpart(response, 4),
make_dgpart(response, 9),
))
print('{:>5s}, temperature: {:3d}C{} ({:3d}F{})'.format(
self.args.device.upper(),
response.temp_c,
self.trend_s('temp_c', response.temp_c),
response.temp_f,
self.trend_s('temp_f', response.temp_f),
))
print('Screen: {:d}/6, brightness: {:d}/5, timeout: {}'.format(
response.screen_selected,
response.screen_brightness,
'{:d} min'.format(response.screen_timeout) if response.screen_timeout else 'off',
))
if response.collection_time:
print('Collection time: {}'.format(response.collection_time))
def send_commands(self):
for arg, command_val in [
('next_screen', b'\xf1'),
('rotate_screen', b'\xf2'),
('next_data_group', b'\xf3'),
('previous_screen', b'\xf3'),
('clear_data_group', b'\xf4'),
('set_data_group', lambda x: bytes([0xa0 + x])),
('set_record_threshold', lambda x: bytes([0xb0 + int(x * 100)])),
('set_screen_brightness', lambda x: bytes([0xd0 + x])),
('set_screen_timeout', lambda x: bytes([0xe0 + x])),
]:
if not hasattr(self.args, arg):
continue
arg_val = getattr(self.args, arg)
if (arg_val is None) or (arg_val is False):
continue
if type(command_val) != bytes:
command_val = command_val(getattr(self.args, arg))
logging.info('Setting {} to {}'.format(arg, getattr(self.args, arg)))
self.socket.send(command_val)
# Sometimes you can send multiple commands quickly, but sometimes
# it'll eat commands. Sleeping 0.5s between commands is safe.
time.sleep(0.5)
def loop(self):
while True:
try:
self.socket.send(b'\xf0')
if self.args.json:
self.print_json(rdserial.um.Response(
self.socket.recv(130),
collection_time=datetime.datetime.now(),
device_type=self.args.device.upper(),
))
else:
self.print_human(rdserial.um.Response(
self.socket.recv(130),
collection_time=datetime.datetime.now(),
device_type=self.args.device.upper(),
))
except KeyboardInterrupt:
raise
except Exception:
if self.args.watch:
logging.exception('An exception has occurred')
else:
raise
if self.args.watch:
if not self.args.json:
print()
time.sleep(self.args.watch_seconds)
else:
return
def main(self):
try:
self.send_commands()
self.loop()
except KeyboardInterrupt:
pass
07070100000010000081ED0000273B00000064000000015E0D4DE400000374000000000000000000000000000000000000002900000000rdserialtool-0.2.1+20200101/rdserialtool#!/usr/bin/env python3
# rdserialtool
# Copyright (C) 2019 Ryan Finnie
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
if __name__ == '__main__':
import sys
import rdserial.tool
sys.exit(rdserial.tool.main())
07070100000011000081A40000273B00000064000000015E0D4DE4000005E1000000000000000000000000000000000000002500000000rdserialtool-0.2.1+20200101/setup.py#!/usr/bin/env python3
import os
import sys
from setuptools import setup, find_packages
assert(sys.version_info > (3, 4))
def read(filename):
return open(os.path.join(os.path.dirname(__file__), filename)).read()
__version__ = None
with open(os.path.join(os.path.dirname(__file__), 'rdserial', '__init__.py')) as f:
for line in f:
if not line.startswith('__version__ = '):
continue
__version__ = eval(line.rsplit(None, 1)[-1])
break
setup(
name='rdserialtool',
description='RDTech UM/DPS series device interface tool',
long_description=read('README'),
version=__version__,
license='GPLv2+',
platforms=['Unix'],
author='Ryan Finnie',
author_email='ryan@finnie.org',
url='https://github.com/rfinnie/rdserialtool',
download_url='https://github.com/rfinnie/rdserialtool',
packages=find_packages(),
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Scientific/Engineering :: Information Analysis',
'Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator',
],
entry_points={
'console_scripts': [
'rdserialtool = rdserial.tool:main',
],
},
test_suite='tests',
)
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!165 blocks