diff --git a/.gitignore b/.gitignore
index 82f9275..7b6caf3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -159,4 +159,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
-#.idea/
+.idea/
diff --git a/README.md b/README.md
index c66a6a8..68a2ab1 100644
--- a/README.md
+++ b/README.md
@@ -19,6 +19,37 @@ while allowing user to simulate a simplified test environment. The structure exp
controllers on both PEV and EVSE sides. It's possible use an ADVANTICS controller on one side and another controller
that supports the same charging standards on the other side.
+## Quick Install
+### On your machine
+
+Note: If you do not have a working version of Python3 installed, please refer to the more details section [Software Setup](#software-setup) below.
+You can install this utility globally on your machine using pipx:
+
+**Simulation tools**
+```
+pipx install "advsimulators@git+https://github.com/ADVANTICS/examples.git@dev#&subdirectory=charge-controllers/tutorials"
+```
+**Monitoring tools**
+```
+pipx install "advmonitors@git+https://github.com/ADVANTICS/examples.git@dev#&subdirectory=charge-controllers/monitors"
+```
+You will then be able to call the simulation utilities in command line from anywhere on your system.
+If you would rather install it in a virtual environement, you can use `pip` once your virtual environement is activated instead. In that case, the simulation commands will be only callable when your virtual environement is actived.
+
+### On the controllers
+You can install this package with pip (`pipx` is not available on controllers):
+
+**Simulation tools**
+
+```
+pip install "advsimulators@git+https://github.com/ADVANTICS/examples.git@dev#&subdirectory=charge-controllers/tutorials"
+```
+**Monitoring tools**
+```
+pip install "advmonitors@git+https://github.com/ADVANTICS/examples.git@dev#&subdirectory=charge-controllers/monitors"
+```
+
+
## Typical Real System
On both EVSE and PEV sides, the controller hardware with the charge controller software are provided by ADVANTICS. From
@@ -77,6 +108,13 @@ is ready to run a power transfer session.
## Software Setup
+> The simplest is to install the project using pip as described in the Install section.
+> However, if you want to run locally without installing the package, you can follow the instructions below.
+
+
+The simplest is to install the project using pip as described in the Install section.
+If you want
+
The test system used in this guide runs Python on Linux. The required pip packages of each module is listed in the
`requirements.txt` in the same directory.
@@ -87,16 +125,32 @@ Install the needed `apt` packages, clone this repository, create a virtual envir
```shell
sudo apt update
-sudo apt install git can-utils net-utils python3 python3-pip python3-venv
+sudo apt install git can-utils net-utils python3 python3-pip python3-venv pipx
+```
+If you want to install the utility globally on your system, use:
+```
+pipx install "advsimulators@git+https://github.com/ADVANTICS/examples.git@dev#&subdirectory=charge-controllers/tutorials"
+pipx install "advmonitors@git+https://github.com/ADVANTICS/examples.git@dev#&subdirectory=charge-controllers/monitors"
+```
+
+If you would rather like to run from source:
+```
git clone https://github.com/ADVANTICS/examples
cd examples
python3 -m venv venv
source venv/bin/activate
+```
+```
+# Use the following if you want to install the package from source
+pip install -e charge-controllers/monitors/
+pip install -e charge-controllers/tutorials/
+```
+```
+# ... or use the following instead if you want to run from the scripts without installing the package
pip install -r charge-controllers/monitors/requirements.txt
-pip install -r charge-controllers/tutorials/EVSE Generic Interface v3/requirements.txt
-pip install -r charge-controllers/tutorials/PEV Generic Interface v2/requirements.txt
+pip install -r charge-controllers/tutorials/requirements.txt
```
If you use a PEAK USB CAN
@@ -210,26 +264,59 @@ Restart the controller software for the new config to take effect:
Assuming the virtual environment of this project is active with all the steps are completed as described above, in four
separate terminal sessions:
+### For a global installation (pipx)
+**EVSE simulator**
+```
+evse-simulator-v3
+```
+
+**PEV simulator**
+```
+pev-simulator-v2
+```
+
+**EVSE monitor**
+```
+evse-monitor
+```
+
+**PEV monitor**
+```
+pev-monitor
+```
+
+### When running from local scripts
+
```shell
-cd charge-controllers/tutorials/EVSE\ Generic\ Interface\ v3
-python3 evse-simulator.py
+cd charge-controllers/tutorials
+python3 simulate_generic_evse_v3.py
```
```shell
cd charge-controllers/monitors
-python3 evse-monitor.py
+python3 evse_monitor.py
```
```shell
-cd charge-controllers/tutorials/PEV\ Generic\ Interface\ v2
-python3 pev-simulator.py
+cd charge-controllers/tutorials/
+python3 simulate_generic_pev_v2.py
```
```shell
cd charge-controllers/monitors
-python3 pev-monitor.py
+python3 pev_monitor_v2.py
```
+Note that the default CAN config (`can.conf`) will run on channel `can0`. You can pass you custom can config using `--can-config` flag. If you want to use virtual CAN instead (e.g., for preliminary tests), there is a `vcan.conf` that you can use directly, e.g.:
+
+```shell
+cd charge-controllers/tutorials/
+python3 simulate_generic_pev_v2.py --can-config vcan.conf
+```
+
+If you are unsure about how to set up a virtual CAN interface you can check the corresponding socketcan documentation [here](https://netmodule-linux.readthedocs.io/en/latest/howto/can.html).
+
+
> **NOTE:** Each module is connecting to CAN interface on start and closes it gracefully when CTRL+C arrives from the
> user, pressing CTRL+C repetitively or killing the process will cause CAN interface to close abruptly and make it **
> DOWN
diff --git a/charge-controllers/monitors/advmonitors/__init__.py b/charge-controllers/monitors/advmonitors/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/charge-controllers/tutorials/EVSE Generic Interface v3/can.conf b/charge-controllers/monitors/advmonitors/conf/can.conf
similarity index 100%
rename from charge-controllers/tutorials/EVSE Generic Interface v3/can.conf
rename to charge-controllers/monitors/advmonitors/conf/can.conf
diff --git a/charge-controllers/monitors/advmonitors/conf/vcan.conf b/charge-controllers/monitors/advmonitors/conf/vcan.conf
new file mode 100644
index 0000000..0d1106c
--- /dev/null
+++ b/charge-controllers/monitors/advmonitors/conf/vcan.conf
@@ -0,0 +1,4 @@
+[default]
+interface = socketcan
+channel = vcan0
+bitrate = 500000
diff --git a/charge-controllers/monitors/Advantics_Generic_EVSE_protocol_v3.kcd b/charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_EVSE_protocol_v3.kcd
similarity index 89%
rename from charge-controllers/monitors/Advantics_Generic_EVSE_protocol_v3.kcd
rename to charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_EVSE_protocol_v3.kcd
index 081c5b9..163de99 100644
--- a/charge-controllers/monitors/Advantics_Generic_EVSE_protocol_v3.kcd
+++ b/charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_EVSE_protocol_v3.kcd
@@ -107,6 +107,7 @@
+
@@ -446,12 +447,15 @@
-
+
> message. As well as every time
vehicle changes these information.
+
+ Many of these information can be optional. There default value is 0 if the vehicle
+ did not send them.
]]>
@@ -459,22 +463,40 @@
- Total battery capacity (if provided).
+ Total battery capacity.
- Battery SoC in percent (informative, do not rely on it).
+ Battery SoC, in percent.
+
+
+
+
+ Minimum battery SoC vehicle wants, in percent.
+
+
+
+
+ Target battery SoC vehicle wants, in percent.
+
+
+
+
+ Maximum battery SoC vehicle wants, in percent.
-
+
> message. As well as every time
vehicle changes these information.
+
+ Many of these information can be optional. There default value is 0 if the vehicle
+ did not send them.
]]>
@@ -482,12 +504,21 @@
- Minimum vehicle voltage (if provided).
+ Minimum vehicle voltage.
- Maximum vehicle voltage (if provided).
+ Maximum vehicle voltage.
+
+
+
+
+
+ Present voltage reported by vehicle.
+
+ Note: Can be use in Constant Voltage mode to compensate for cable losses.
+
@@ -498,6 +529,9 @@
Sent at least once after first <> message. As well as every time
vehicle changes these information.
+
+ Many of these information can be optional. There default value is 0 if the vehicle
+ did not send them.
]]>
@@ -505,22 +539,22 @@
- Minimum vehicle charge current (if provided).
+ Minimum vehicle charge current.
- Maximum vehicle charge current (if provided).
+ Maximum vehicle charge current.
- Minimum vehicle charge power (if provided).
+ Minimum vehicle charge power.
- Maximum vehicle charge power (if provided).
+ Maximum vehicle charge power.
@@ -531,6 +565,9 @@
Sent at least once after first <> message. As well as every time
vehicle changes these information.
+
+ Many of these information can be optional. There default value is 0 if the vehicle
+ did not send them.
]]>
@@ -538,26 +575,120 @@
- Minimum vehicle discharge current (if provided).
+ Minimum vehicle discharge current.
- Maximum vehicle discharge current (if provided).
+ Maximum vehicle discharge current.
- Minimum vehicle discharge power (if provided).
+ Minimum vehicle discharge power.
- Maximum vehicle discharge power (if provided).
+ Maximum vehicle discharge power.
+
+ > message. As well as every time
+ vehicle changes these information.
+
+ Many of these information can be optional. There default value is 0 if the vehicle
+ did not send them.
+ ]]>
+
+
+
+
+
+
+
+ Minimum (estimated) remaining energy vehicle will consume (positive value),
+ or provide (negative value).
+
+
+
+
+
+
+ Targeted (estimated) remaining energy vehicle will consume (positive value),
+ or provide (negative value).
+
+
+
+
+
+
+ Maximum (estimated) remaining energy vehicle will consume (positive value),
+ or provide (negative value).
+
+
+
+
+
+
+
+ The EV ID provided by the vehicle in CCS DIN SPEC 70121 and ISO 15118-2.
+ EV ID for CCS ISO 15118-20 is not supported by this message.
+ (the EV ID is defined as a string of max 255 characters in ISO 15118-20)
+
+
+
+
+
+
+
+
+ Byte 0 of the EV ID.
+
+
+
+
+
+
+ Byte 1 of the EV ID.
+
+
+
+
+
+
+ Byte 2 of the EV ID.
+
+
+
+
+
+
+ Byte 3 of the EV ID.
+
+
+
+
+
+
+ Byte 4 of the EV ID.
+
+
+
+
+
+
+ Byte 5 of the EV ID.
+
+
+
+
+
Controller (ADM-CO-CUI1 hardware variant, ie. "generic/mobile") is reporting various
@@ -741,7 +872,7 @@
]]>
-
+
@@ -1009,7 +1140,7 @@
Sets the logical state of DIG_OUT1 (CONN5, position 1).
- Needs to be declared as monitored in `/srv/config.cfg`:
+ Needs to be declared as CAN Controlled in `/srv/config.cfg`:
[hardware]
dig_out1 = CAN_Controlled
@@ -1020,7 +1151,7 @@
Reports the logical state of DIG_OUT2 (CONN5, position 2).
- Needs to be declared as monitored in `/srv/config.cfg`:
+ Needs to be declared as CAN Controlled in `/srv/config.cfg`:
[hardware]
dig_out2 = CAN_Controlled
@@ -1031,7 +1162,7 @@
Reports the logical state of DIG_OUT3 (CONN5, position 3).
- Needs to be declared as monitored in `/srv/config.cfg`:
+ Needs to be declared as CAN Controlled in `/srv/config.cfg`:
[hardware]
dig_out3 = CAN_Controlled
@@ -1042,7 +1173,7 @@
Reports the logical state of DIG_OUT4 (CONN5, position 4).
- Needs to be declared as monitored in `/srv/config.cfg`:
+ Needs to be declared as CAN Controlled in `/srv/config.cfg`:
[hardware]
dig_out4 = CAN_Controlled
@@ -1056,4 +1187,4 @@
-
+
\ No newline at end of file
diff --git a/charge-controllers/monitors/Advantics_Generic_PEV_protocol_v2.kcd b/charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_PEV_protocol_v1.kcd
similarity index 83%
rename from charge-controllers/monitors/Advantics_Generic_PEV_protocol_v2.kcd
rename to charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_PEV_protocol_v1.kcd
index 0937801..908aad0 100644
--- a/charge-controllers/monitors/Advantics_Generic_PEV_protocol_v2.kcd
+++ b/charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_PEV_protocol_v1.kcd
@@ -2,7 +2,7 @@
-
+
@@ -102,7 +102,7 @@
-
+
Information provided by the vehicle.
@@ -113,15 +113,6 @@
Battery SoC in percent (only used in HLC mode).
-
-
-
- The energy capacity of the EV battery.
- In case this value is 0 we default to the config file entry `energy_capacity`.
-
-
-
-
@@ -339,91 +330,6 @@
-
-
- ISO 15118-20 specific message.
- The energy of the EV corresponding to the SOC specified by the owner.
- The energy request is represented by an energy range including a target energy request.
-
- Energy requests should satisfy the following relationship:
- Minimum_Energy_Request ≤ Target_Energy_Request ≤ Maximum_Energy_Request
-
-
-
-
-
-
-
-
- The energy of the EV corresponding to the target SOC.
-
-
-
-
-
-
- The energy of the EV corresponding to the minimum SOC.
- In case this message is not sent we default to the config file entry `min_energy_request`.
-
-
-
-
-
-
- The energy of the EV corresponding the to maximum SOC.
-
- In case this message is not sent we default to the config file entry `max_energy_request`.
-
- Will be capped by Energy_Capacity.
-
-
-
-
-
-
-
- ISO 15118-20 specific message. (Optional)
- Indicates a preferred operational V2X range for bidirectional cycling.
- V2X energy requests should satisfy the following relationships:
- Minimum_Energy_Request ≤ Minimum_V2X_Energy_Request
- Maximum_V2X_Energy_Request ≤ Maximum_Energy_Request
-
-
-
-
-
-
-
- The minimum energy level for the bidirectional cycling activity range.
-
-
-
-
- The maximum energy level for the bidirectional cycling activity range.
-
-
-
-
-
-
- ISO 15118-20 specific message.
- EV extra bidirectional power transfer information.
-
-
-
-
-
-
-
-
- Indicate when the EV intends to finish the charging process
- The offset in seconds from the point in time of sending this message.
-
-
-
-
-
-
Extra information from CCS (for information only).
diff --git a/charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_PEV_protocol_v2.kcd b/charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_PEV_protocol_v2.kcd
new file mode 100644
index 0000000..2ff8eb8
--- /dev/null
+++ b/charge-controllers/monitors/advmonitors/dbs/Advantics_Generic_PEV_protocol_v2.kcd
@@ -0,0 +1,971 @@
+
+
+
+
+
+
+
+
+
+ Information about the EVSE (AC or DC source), when available.
+
+
+
+
+
+
+
+ Current internal state of the charge controller, reflecting communication stage.
+
+ Initialising:: Controller's applications are starting up.
+ Waiting_For_EVSE:: Controller is idle and ready for a plug-in.
+ Negotiating_Connection:: Controller is plugged to a charger and the connection is
+ being initialised. Important charge information are exchanged.
+ Connected_With_Full_Info:: All charge information from the EVSE were retrieved
+ and a charge session can be considered to be properly started.
+ Insulation_Test:: Insulation of charge cable is being tested by charger.
+ Precharge:: Charger is matching its output voltage to present voltage of the
+ battery.
+ Waiting_For_Charge:: PEV is about to begin the actual charge.
+ Charging:: Main charging loop going on.
+ Ending_Charge:: A normal charge stop condition happened and the PEV is exiting
+ the charging loop.
+ Welding_Detection:: PEV is testing if its contactors are welded.
+ Closing_Communication:: PEV can unplug and we are reinitialising in order to
+ then go back to __Waiting_For_EVSE__.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ The charging protocol EVSE is using.
+
+
+
+
+
+
+
+
+
+
+ Pins on which current is supplied. __CCS_AC__ corresponds to any AC pins.
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Maximum current the on-board charger (AC) or BMS (DC) is allowed to draw.
+
+ In AC, it is defined as the minimum between EVSE max current
+ (ie. CP PWM duty cycle) and cable max current (ie. PP resistance).
+
+ In DC it corresponds to HLC data.
+
+
+
+
+
+
+ Status of EVSE Residual Current Device (only available in HLC mode, 0 otherwise).
+ For information only.
+
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+
+ Information provided by the vehicle.
+
+
+
+
+
+
+ Battery SoC in percent (only used in HLC mode).
+
+
+
+
+
+ The energy capacity of the EV battery.
+ In case this value is 0 we default to the config file entry `energy_capacity`.
+
+
+
+
+
+
+
+ AC charging specific control (from charge controller to on-board charger).
+
+
+
+
+
+
+ > flag, and THEN start to draw current.
+
+ When going to __Not_Ready__ it means the AC source request a normal stop.
+ On-board charger should stop drawing current and THEN set its
+ <> flag to 0.
+ ]]>
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+
+ AC charging specific status (from on-board charger to charge controller).
+
+
+
+
+
+
+
+ On-board charger is ready to charge (corresponds to asserting CP state C/D).
+ You should set it to 1 before starting to draw current.
+ And set it to 0 after current draw stops.
+
+ Only exception is in case of emergency stop, where you can set it to 0 without
+ waiting for the current draw to stop beforehand.
+
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+
+ DC charging specific control (from charge controller to BMS).
+
+
+
+
+
+
+
+ Request for closing contactors. Close contactors when 1. Open them when 0.
+ If the message does not come after 1s timeout, consider it to be an emergency
+ situation and open contactors.
+
+ Alternatively, ADM-CS-EVCC unit has DC contactor drivers. To use them directly
+ instead of this CAN signal, enable them in the `/srv/config.cfg` file:
+
+ [vehicle]
+ dc_contactors_use_ios = true
+
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+
+ DC charging specific status (from BMS to charge controller), message 1.
+
+
+
+
+
+
+ >, the target current will be set to the config file
+ entry "max_current". When doing so, you should set a safe `max_charge_voltage` to terminate the
+ charge earlier than at full pack (ie. bulk charging):
+
+ [vehicle]
+ no_bms = true
+ max_current = 20
+ max_charge_voltage = 380
+ target_voltage = 410
+ max_voltage = 450
+
+ In no BMS mode, this signal will be ignored.
+
+ NOTE: Signal is of signed data type for compatibility with future bidirectional
+ implementations.
+ ]]>
+
+
+
+
+ > documentation to know how the sensor should be wired.
+
+ NOTE: Signal is of signed data type for compatibility with future bidirectional
+ implementations.
+ ]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ DC charging specific status (from BMS to charge controller), message 2.
+
+
+
+
+
+
+
+ Report states of contactors.
+
+ Alternatively, ADM-CS-EVCC unit has DC contactors feedback inputs.
+ To use them directly instead of this CAN signal, enable them in
+ the `/srv/config.cfg` file:
+
+ [vehicle]
+ dc_contactors_use_ios = true
+
+
+
+
+
+
+
+
+
+ Vehicle request a normal charge termination
+ (user request, or soft battery voltage limit reached (eg. bulk charging)).
+
+ Only acts on change (so you need to cycle it back to 0).
+
+ Alternatively, you can also use the 12V input (R-3F) on ADM-CS-EVCC unit.
+
+
+
+
+
+
+
+
+
+ Vehicle request an emergency charge termination
+ Only acts on change (so you need to cycle it back to 0).
+
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+ > documentation to know how the sensor should be wired.
+ ]]>
+
+
+
+
+ > documentation to know how the sensor should be wired.
+ ]]>
+
+
+
+
+
+ Used to report the status of the EV (from BMS to charge controller)
+
+
+
+
+
+
+
+ Allows the vehicle to delay the transition to powered states (starting from the insulation test) until the HV system is ready.
+
+ Use cases:
+ - Make sure the vehicle IMD is disabled before sending cablecheck request.
+ - Wait while preparing the HV system for power.
+
+ This signal should be set to 0 if the vehicle is ready for power.
+ When the charger is plugged-in and this signal is set to 1, the session will block at Communication_Stage.Connected_With_Full_Info state until this signal is set to 0 (and the inlet is locked).
+
+ 0=False (Hold off Not Requested), 1=True (Hold off)
+
+ Note: limited by wait_hv_ready_timeout_ms config entry. Default is 40 seconds (defined by the standards):
+ [ccs]
+ wait_hv_ready_timeout_ms = 40000
+
+
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+
+
+ ISO 15118-20 specific message.
+ The energy of the EV corresponding to the SOC specified by the owner.
+ The energy request is represented by an energy range including a target energy request.
+
+ Energy requests should satisfy the following relationship:
+ Minimum_Energy_Request ≤ Target_Energy_Request ≤ Maximum_Energy_Request
+
+
+
+
+
+
+
+
+ The energy of the EV corresponding to the target SOC.
+
+
+
+
+
+
+ The energy of the EV corresponding the to maximum SOC.
+
+ In case this message is not sent we default to the config file entry `max_energy_request`.
+
+ Will be capped by Energy_Capacity.
+
+
+
+
+
+
+ The energy of the EV corresponding to the minimum SOC.
+ In case this message is not sent we default to the config file entry `min_energy_request`.
+
+
+
+
+
+
+
+ ISO 15118-20 specific message. (Optional)
+ Indicates a preferred operational V2X range for bidirectional cycling.
+ V2X energy requests should satisfy the following relationships:
+ Minimum_Energy_Request ≤ Minimum_V2X_Energy_Request
+ Maximum_V2X_Energy_Request ≤ Maximum_Energy_Request
+
+
+
+
+
+
+
+ The minimum energy level for the bidirectional cycling activity range.
+
+
+
+
+ The maximum energy level for the bidirectional cycling activity range.
+
+
+
+
+
+
+ ISO 15118-20 specific message.
+ EV extra bidirectional power transfer information.
+
+
+
+
+
+
+
+
+ Indicate when the EV intends to finish the charging process
+ The offset in seconds from the point in time of sending this message.
+
+
+
+
+
+
+
+ Extra information from CCS (for information only).
+
+
+
+
+
+
+ Measured PWM duty cycle on the CP line.
+
+
+
+
+ Measured PWM top voltage on the CP line.
+
+
+
+
+ Measured CP state.
+
+
+
+
+
+
+
+
+
+
+ Measured resistance from PP. 2550 Ohms means disconnected.
+
+
+
+
+ CCS Inlet lock state.
+
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+
+
+ Controller (ADM-CS-EVCC and ADM-CS-MEVC hardware variant) has various outputs that
+ can be controlled through this message.
+
+
+
+
+
+
+
+
+ Sets the logical state of DIG_OUT1.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ dig_out1 = CAN_Controlled
+
+
+
+
+
+
+ Reports the logical state of DIG_OUT2.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ dig_out2 = CAN_Controlled
+
+
+
+
+
+
+ Reports the logical state of DIG_OUT3.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ dig_out3 = CAN_Controlled
+
+
+
+
+
+ Padding bits between digital outputs and leds.
+
+
+
+
+ Sets the brightness of LED1 (3C1) in a scale from 1 to 100.
+ Greater values than 100 will be treated as max brigthness.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ led1 = CAN_Controlled
+
+
+
+
+
+
+ Sets the brightness of LED2 (3C2) in a scale from 1 to 100.
+ Greater values than 100 will be treated as max brigthness.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ led2 = CAN_Controlled
+
+
+
+
+
+
+ Sets the brightness of LED3 (3C3) in a scale from 1 to 100.
+ Greater values than 100 will be treated as max brigthness.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ led3 = CAN_Controlled
+
+
+
+
+
+
+
+ Controller (ADM-CS-EVCC hardware variant) is reporting various information
+ about its inputs. It is sent every seconds (for temperature channels).
+ Or on change for other digital inputs.
+
+
+
+
+
+
+
+ Reports the logical state of R-2E.
+
+
+
+
+
+
+
+ Reports the logical state of R-2F.
+
+
+
+
+
+
+
+
+ Reports the logical state of R-1G.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ dig_in1 = Monitor
+
+
+
+
+
+
+ Reports the logical state of R-2G.
+ Needs to be declared as monitored in `/srv/config.cfg`:
+
+ [hardware]
+ dig_in2 = Monitor
+
+
+
+
+
+
+ Reports the logical state of R-3F (or any other input declared as Stop function).
+
+
+
+
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+ Measured temperature sensor on L-2D.
+
+
+
+
+ Measured temperature sensor on L-1E.
+
+
+
+
+ Measured temperature sensor on L-3E.
+
+
+
+
+ Temperature reported by the CPU chip of the charge controller.
+
+
+
+
+ If using a CAN sensor, and it has a temperature channel.
+
+
+
+
+
+
+ Diagnostic status of the charge controller.
+ This message gets reset to default values when the controller the state cycles back to
+ Communication_Stage.Waiting_For_EVSE
+
+
+
+
+
+
+
+
+ Indicates a failure in locking the connector.
+
+
+
+
+ Unlock request declined due to voltage/current measured at the inlet.
+
+
+
+
+ Error due to high temperature detected.
+
+
+
+
+ Indicates a contactor fault. Can also be caused by wrong wiring or configuration of contactor feedbacks.
+
+
+
+
+ Voltage deviation detected beyond allowed limits.
+
+
+
+
+ Current deviation detected beyond allowed limits.
+
+
+
+
+ Precharge process did not complete within the expected time.
+
+
+
+
+ CCS PP signal not detected.
+
+
+
+
+ MCS ID signal not detected.
+
+
+
+
+ Fault detected on the CCS CP line.
+
+
+
+
+ Fault detected in MCS CE line.
+
+
+
+
+ Unexpected duty cycle detected on CCS CP line.
+
+
+
+
+ Unexpected state detected on CCS CP line.
+
+
+
+
+ Unexpected state detected in MCS CE line.
+
+
+
+
+ CCS CP state not supported by the system.
+
+
+
+
+ Power Line Communication (PLC) fault detected.
+ This is a generic flag, for more specific information (when available) refer to the V2GTP* flags below.
+
+
+
+
+ V2GTP TLS error occurred during communication.
+
+
+
+
+ V2GTP authorization process failed.
+
+
+
+
+ V2GTP service selection failed.
+
+
+
+
+ Invalid V2GTP protocol version detected.
+
+
+
+
+ Unexpected V2GTP session ID encountered.
+
+
+
+
+ Invalid payload detected in V2GTP communication.
+
+
+
+
+ V2GTP energy transfer mode not supported.
+
+
+
+
+ Sequence error detected in V2GTP communication.
+
+
+
+
+ Timeout occurred in V2GTP sequence.
+
+
+
+
+ Service Discovery Protocol (SDP) error detected.
+
+
+
+
+ Reserved bits for future uses.
+
+
+
+
+ High-Level Communication (HLC) response codes for EVSE communication protocols.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Only relevant for DIN SPEC 70121 and ISO 15118-2 protocols.
+ (was removed in ISO 15118-20)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Only relevant for DIN SPEC 70121 and ISO 15118-2 protocols.
+ (was removed in ISO 15118-20)
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/charge-controllers/monitors/evse-monitor.py b/charge-controllers/monitors/advmonitors/evse.py
similarity index 86%
rename from charge-controllers/monitors/evse-monitor.py
rename to charge-controllers/monitors/advmonitors/evse.py
index 6ae4128..bdd61b1 100644
--- a/charge-controllers/monitors/evse-monitor.py
+++ b/charge-controllers/monitors/advmonitors/evse.py
@@ -7,19 +7,21 @@
# spell-checker:ignore cantools EVSE SECC SLAC OCPP CHAdeMO exctype excinst exctb asciichartpy
from __future__ import annotations
-# System imports
import os
from collections import deque
from dataclasses import dataclass
-from pathlib import Path
-from typing import TYPE_CHECKING
from datetime import datetime
+# System imports
+from importlib import resources
+from typing import TYPE_CHECKING
+
# Third-party imports
import asciichartpy
import can
import cantools.database
import typer
+from cantools.database.errors import DecodeError
from rich import box, print
from rich.console import Console
from rich.layout import Layout
@@ -94,8 +96,12 @@ class VehicleStatus:
ready: str
battery_capacity: int
soc: int
+ min_soc: int
+ target_soc: int
+ max_soc: int
min_voltage: float
max_voltage: float
+ present_voltage: float
min_charge_current: float
max_charge_current: float
min_charge_power: int
@@ -104,24 +110,28 @@ class VehicleStatus:
max_discharge_current: float
min_discharge_power: int
max_discharge_power: int
+ min_energy_request: float
+ max_energy_request: float
+ target_energy_request: float
enable_can_log = False
logged_messages = {}
-log_filename = f"./evse_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log"
+log_filename = f'./evse_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
-def log_can_msg(msg_name, signals, senders=None):
+def log_can_msg(msg_name, signals, senders=None) -> None:
if not enable_can_log:
return
- if msg_name in logged_messages:
- if logged_messages[msg_name] == signals:
- # The same message is logged with the same body last time, do not repeat
- return
+ if msg_name in logged_messages and logged_messages[msg_name] == signals:
+ # The same message is logged with the same body last time, do not repeat
+ return
with open(log_filename, 'a') as file:
- file.write(f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}\t{senders}\t{msg_name}\t{signals}\n")
+ file.write(
+ f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}\t{senders}\t{msg_name}\t{signals}\n'
+ )
logged_messages[msg_name] = signals
@@ -130,17 +140,20 @@ def __init__(self, app: Application, pistol_index: int) -> None:
self._app = app
self._bus = app.bus
self._index = pistol_index
- self._db: cantools.database.Database = cantools.database.load_file(
- 'Advantics_Generic_EVSE_protocol_v3.kcd',
- ) # type: ignore[reportAttributeAccessIssue]
+ can_db_path = (
+ resources.files('advmonitors') / 'dbs' / 'Advantics_Generic_EVSE_protocol_v3.kcd'
+ )
+ self._db: cantools.database.Database = cantools.database.load_file(can_db_path)
self._bus.set_filters(
[
- can.typechecking.CanFilterExtended(
- can_id=message.frame_id | ((self._index & 0x0F) << 24),
- can_mask=0x1FFFFFFF,
- extended=True,
- )
+ # can.typechecking.CanFilterExtended(
+ {
+ 'can_id': message.frame_id | ((self._index & 0x0F) << 24),
+ 'can_mask': 0x1FFFFFFF,
+ 'extended': True,
+ }
+ # )
for message in self._db.messages
# if message.frame_id & (1 << 15)
],
@@ -200,8 +213,12 @@ def __init__(self, app: Application, pistol_index: int) -> None:
ready='----',
battery_capacity=0,
soc=0,
+ min_soc=0,
+ target_soc=0,
+ max_soc=0,
min_voltage=0,
max_voltage=0,
+ present_voltage=0,
min_charge_current=0,
max_charge_current=0,
min_charge_power=0,
@@ -210,15 +227,18 @@ def __init__(self, app: Application, pistol_index: int) -> None:
max_discharge_current=0,
min_discharge_power=0,
max_discharge_power=0,
+ min_energy_request=0,
+ max_energy_request=0,
+ target_energy_request=0,
)
self._app.update_vehicle_status(self.vehicle_status)
def on_message_received(self, msg: can.Message) -> None: # noqa: C901, PLR0912, PLR0915
try:
message = self._db.get_message_by_frame_id(msg.arbitration_id & 0xFFFFFF)
- except KeyError:
+ signals = message.decode(msg.data)
+ except (KeyError, DecodeError):
return
- signals = message.decode(msg.data)
log_can_msg(message.name, signals, message.senders)
@@ -258,11 +278,15 @@ def on_message_received(self, msg: can.Message) -> None: # noqa: C901, PLR0912,
elif message.name == 'EV_Information_Battery':
self.vehicle_status.battery_capacity = int(signals['Battery_Capacity'])
self.vehicle_status.soc = int(signals['Present_State_of_Charge'])
+ self.vehicle_status.min_soc = int(signals['Minimum_State_of_Charge'])
+ self.vehicle_status.target_soc = int(signals['Target_State_of_Charge'])
+ self.vehicle_status.max_soc = int(signals['Maximum_State_of_Charge'])
self._app.update_vehicle_status(self.vehicle_status)
elif message.name == 'EV_Information_Voltages':
self.vehicle_status.min_voltage = float(signals['EV_Minimum_Voltage'])
self.vehicle_status.max_voltage = float(signals['EV_Maximum_Voltage'])
+ self.vehicle_status.present_voltage = float(signals['EV_Present_Voltage'])
self._app.update_vehicle_status(self.vehicle_status)
elif message.name == 'EV_Information_Charge_Limits':
@@ -274,23 +298,29 @@ def on_message_received(self, msg: can.Message) -> None: # noqa: C901, PLR0912,
elif message.name == 'EV_Information_Discharge_Limits':
self.vehicle_status.min_discharge_current = float(
- signals['EV_Minimum_Discharge_Current']
+ signals['EV_Minimum_Discharge_Current'],
)
self.vehicle_status.max_discharge_current = float(
- signals['EV_Maximum_Discharge_Current']
+ signals['EV_Maximum_Discharge_Current'],
)
self.vehicle_status.min_discharge_power = int(signals['EV_Minimum_Discharge_Power'])
self.vehicle_status.max_discharge_power = int(signals['EV_Maximum_Discharge_Power'])
self._app.update_vehicle_status(self.vehicle_status)
+ elif message.name == 'EV_Information_Energy':
+ self.vehicle_status.min_energy_request = float(signals['EV_Minimum_Energy_Request'])
+ self.vehicle_status.target_energy_request = float(signals['EV_Target_Energy_Request'])
+ self.vehicle_status.max_energy_request = float(signals['EV_Maximum_Energy_Request'])
+ self._app.update_vehicle_status(self.vehicle_status)
+
elif message.name == 'ADM_CO_CUI1_Inputs':
self.charger_status.digital_inputs = (
- f'0:{"H" if signals['SWITCH0'] else "L"} '
- f'1:{"H" if signals['SWITCH1'] else "L"} '
- f'2:{"H" if signals['SWITCH2'] else "L"} '
- f'3:{"H" if signals['SWITCH3'] else "L"} '
- f'4:{"H" if signals['SWITCH4'] else "L"} '
- f'5:{"H" if signals['SWITCH5'] else "L"}'
+ f'0:{"H" if signals["SWITCH0"] else "L"} '
+ f'1:{"H" if signals["SWITCH1"] else "L"} '
+ f'2:{"H" if signals["SWITCH2"] else "L"} '
+ f'3:{"H" if signals["SWITCH3"] else "L"} '
+ f'4:{"H" if signals["SWITCH4"] else "L"} '
+ f'5:{"H" if signals["SWITCH5"] else "L"}'
)
self.charger_status.cpu_temp = int(signals['CPU_Temperature'])
self.charger_status.pistol_ptc1 = int(signals['Pistol_PTC1'])
@@ -299,10 +329,10 @@ def on_message_received(self, msg: can.Message) -> None: # noqa: C901, PLR0912,
elif message.name == 'ADM_CS_SECC_Inputs':
self.charger_status.digital_inputs = (
- f'1:{"H" if signals['Digital_Input1'] else "L"} '
- f'2:{"H" if signals['Digital_Input2'] else "L"} '
- f'3:{"H" if signals['Digital_Input3'] else "L"} '
- f'4:{"H" if signals['Digital_Input4'] else "L"}'
+ f'1:{"H" if signals["Digital_Input1"] else "L"} '
+ f'2:{"H" if signals["Digital_Input2"] else "L"} '
+ f'3:{"H" if signals["Digital_Input3"] else "L"} '
+ f'4:{"H" if signals["Digital_Input4"] else "L"}'
)
self.charger_status.cpu_temp = int(signals['CPU_Temperature'])
self.charger_status.pistol_ptc1 = int(signals['Pistol_PTC1'])
@@ -328,22 +358,22 @@ def on_message_received(self, msg: can.Message) -> None: # noqa: C901, PLR0912,
elif message.name == 'DC_Power_Parameters':
self.charger_parameters.maximum_voltage = float(signals['Maximum_Voltage'])
self.charger_parameters.maximum_charge_current = float(
- signals['Maximum_Charge_Current']
+ signals['Maximum_Charge_Current'],
)
self.charger_parameters.maximum_discharge_current = float(
- signals['Maximum_Discharge_Current']
+ signals['Maximum_Discharge_Current'],
)
self.charger_parameters.range_target_current = float(signals['Range_Target_Current'])
self._app.update_charger_parameters(self.charger_parameters)
elif message.name == 'Sequence_Control':
self.charger_parameters.start_charge_authorisation = str(
- signals['Start_Charge_Authorisation']
+ signals['Start_Charge_Authorisation'],
)
self.charger_parameters.chademo_start_button = str(signals['CHAdeMO_Start_Button'])
self.charger_parameters.ccs_authorisation_done = str(signals['CCS_Authorisation_Done'])
self.charger_parameters.ccs_authorisation_valid = str(
- signals['CCS_Authorisation_Valid']
+ signals['CCS_Authorisation_Valid'],
)
self.charger_parameters.charge_parameters_done = str(signals['Charge_Parameters_Done'])
self.charger_parameters.user_stop_button = str(signals['User_Stop_Button'])
@@ -351,10 +381,10 @@ def on_message_received(self, msg: can.Message) -> None: # noqa: C901, PLR0912,
elif message.name == 'ADM_CS_SECC_Outputs':
self.charger_parameters.digital_outputs = (
- f'1:{"H" if signals['Digital_Output1'] else "L"} '
- f'2:{"H" if signals['Digital_Output2'] else "L"} '
- f'3:{"H" if signals['Digital_Output3'] else "L"} '
- f'4:{"H" if signals['Digital_Output4'] else "L"}'
+ f'1:{"H" if signals["Digital_Output1"] else "L"} '
+ f'2:{"H" if signals["Digital_Output2"] else "L"} '
+ f'3:{"H" if signals["Digital_Output3"] else "L"} '
+ f'4:{"H" if signals["Digital_Output4"] else "L"}'
)
self._app.update_charger_parameters(self.charger_parameters)
@@ -378,17 +408,17 @@ def __init__(self) -> None:
def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
texts = self.export_text(clear=False).split('\n')
- yield from texts[-options.height:]
+ yield from texts[-options.height :]
class PlotPanel:
def __init__(
- self,
- app: Application,
- kind: str,
- unit: str,
- color: str = '',
- ratio: int = 2,
+ self,
+ app: Application,
+ kind: str,
+ unit: str,
+ color: str = '',
+ ratio: int = 2,
) -> None:
self._app = app
self._data: deque[float] = deque(maxlen=500)
@@ -538,10 +568,10 @@ def __enter__(self) -> Self:
return self
def __exit__(
- self,
- exctype: type[BaseException] | None,
- excinst: BaseException | None,
- exctb: TracebackType | None,
+ self,
+ exctype: type[BaseException] | None,
+ excinst: BaseException | None,
+ exctb: TracebackType | None,
) -> bool:
self.shutdown()
return False
@@ -593,7 +623,10 @@ def update_charger_parameters(self, data: ChargerParameters) -> None:
table.add_column()
table.add_row('[b]Maximum voltage:[/]', f'{data.maximum_voltage:.2f} V')
table.add_row('[b]Maximum charge current:[/]', f'{data.maximum_charge_current:.2f} A')
- table.add_row('[b]Maximum discharge current:[/]', f'{data.maximum_discharge_current:.2f} A')
+ table.add_row(
+ '[b]Maximum discharge current:[/]',
+ f'{data.maximum_discharge_current:.2f} A',
+ )
table.add_row('[b]Range target current:[/]', f'{data.range_target_current:.2f} A')
table.add_section()
table.add_row(
@@ -640,9 +673,13 @@ def update_vehicle_status(self, data: VehicleStatus) -> None:
table.add_section()
table.add_row('[b]Battery capacity:[/]', f'{data.battery_capacity} kWh')
table.add_row('[b]State of charge:[/]', f'{data.soc} %')
+ table.add_row('[b]Min State of charge:[/]', f'{data.min_soc} %')
+ table.add_row('[b]Target State of charge:[/]', f'{data.target_soc} %')
+ table.add_row('[b]Max State of charge:[/]', f'{data.max_soc} %')
table.add_section()
table.add_row('[b]Min voltage:[/]', f'{data.min_voltage:0.2f} V')
table.add_row('[b]Max voltage:[/]', f'{data.max_voltage:0.2f} V')
+ table.add_row('[b]Present voltage:[/]', f'{data.present_voltage:0.2f} V')
table.add_section()
table.add_row('[b]Min charge current:[/]', f'{data.min_charge_current:0.2f} A')
table.add_row('[b]Max charge current:[/]', f'{data.max_charge_current:0.2f} A')
@@ -653,15 +690,19 @@ def update_vehicle_status(self, data: VehicleStatus) -> None:
table.add_row('[b]Max discharge current:[/]', f'{data.max_discharge_current:0.2f} A')
table.add_row('[b]Min discharge power:[/]', f'{data.min_discharge_power} kW')
table.add_row('[b]Max discharge power:[/]', f'{data.max_discharge_power} kW')
+ table.add_section()
+ table.add_row('[b]Min energy request:[/]', f'{data.min_energy_request:0.2f} kWh')
+ table.add_row('[b]Target energy request:[/]', f'{data.target_energy_request:0.2f} kWh')
+ table.add_row('[b]Max energy request:[/]', f'{data.max_energy_request:0.2f} kWh')
self.layout['First']['vehicle-status'].update(Panel(table, title='Vehicle Status'))
def update_wait_on( # noqa: C901, PLR0912, PLR0915
- self,
- charger_control: ChargerControl,
- charger_status: ChargerStatus,
- charger_parameters: ChargerParameters,
- session_status: SessionStatus,
- vehicle_status: VehicleStatus,
+ self,
+ charger_control: ChargerControl,
+ charger_status: ChargerStatus,
+ charger_parameters: ChargerParameters,
+ session_status: SessionStatus,
+ vehicle_status: VehicleStatus,
) -> None:
def _update_status(text: str) -> None:
self.layout['Hints'].update(Text(text, style='black on white'))
@@ -775,8 +816,7 @@ def _unknown_state(extra: str = '') -> None:
elif charger_control.power_function == 'Standby':
_update_status(
- 'Vehicle should have closed its contactors. '
- 'Waiting for it to start charging.',
+ 'Vehicle should have closed its contactors. Waiting for it to start charging.',
)
else:
@@ -794,7 +834,11 @@ def _unknown_state(extra: str = '') -> None:
if charger_control.setpoints_mode == 'Target_Mode':
target_current = charger_control.current_range_max
- if (target_current * 0.8) <= charger_status.present_current <= (target_current * 1.1):
+ if (
+ (target_current * 0.8)
+ <= charger_status.present_current
+ <= (target_current * 1.1)
+ ):
_update_status(f'{direction}! Waiting for charge to stop.')
else:
_update_status(
@@ -876,14 +920,15 @@ def _color_logic(self, ios: str) -> str:
def cli_main(
- can_config: Path = Path('can.conf'),
- pistol_index: int = 1,
- enable_can_logging: bool = False
+ can_config: str = 'can.conf',
+ pistol_index: int = 1,
+ enable_can_logging: bool = False,
) -> None:
global enable_can_log
enable_can_log = enable_can_logging
+ can_config_path = resources.files('advmonitors') / 'conf' / can_config
try:
- bus_config = can.util.load_config(path=can_config)
+ bus_config = can.util.load_config(path=can_config_path)
except can.exceptions.CanInterfaceNotImplementedError as ex:
print(f'[red]ERROR:[/] Incorrect CAN configuration. {ex}.')
raise typer.Abort from ex
@@ -892,5 +937,9 @@ def cli_main(
app.display()
-if __name__ == '__main__':
+def main() -> None:
typer.run(cli_main)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/charge-controllers/monitors/advmonitors/pev_v1.py b/charge-controllers/monitors/advmonitors/pev_v1.py
new file mode 100644
index 0000000..24abdf6
--- /dev/null
+++ b/charge-controllers/monitors/advmonitors/pev_v1.py
@@ -0,0 +1,460 @@
+# Copyright (c) 2024 ADVANTICS SAS
+# Original author: Axel Voitier
+# Part of Advantics examples
+# MIT licensed
+#
+# spell-checker:words renderable
+# spell-checker:ignore cantools EVSE EVCC exctype excinst exctb
+from __future__ import annotations
+
+import os
+from dataclasses import dataclass
+from datetime import datetime
+
+# System imports
+from importlib import resources
+from typing import TYPE_CHECKING
+
+# Third-party imports
+import can
+import cantools.database
+import typer
+from rich import box, print
+from rich.console import Console
+from rich.layout import Layout
+from rich.live import Live
+from rich.panel import Panel
+from rich.table import Table
+
+# Local imports
+
+if TYPE_CHECKING:
+ from pathlib import Path
+ from types import TracebackType
+ from typing import Any, Self
+
+ from rich.console import ConsoleOptions, RenderResult
+
+
+CAN_CONFIGS: dict[str, Path] = {
+ path.name: path for path in (resources.files('advmonitors') / 'conf').iterdir()
+}
+DEFAULT_CAN_CONFIG = CAN_CONFIGS['can.conf']
+DEFAULT_VCAN_CONFIG = CAN_CONFIGS['vcan.conf']
+
+CAN_DBS: dict[str, Path] = {
+ path.name: path for path in (resources.files('advmonitors') / 'dbs').iterdir()
+}
+
+
+@dataclass
+class VehicleControl:
+ ac_evse_is_ready: str
+ dc_close_contactors: str
+ ###
+ dc_contactor_pos_fb: str
+ dc_contactor_neg_fb: str
+ digital_inputs: str
+ stop_charge: str
+ ptc0: int
+ ptc1: int
+ ptc2: int
+ cpu_temp: int
+ can_sensor_temp: int
+
+
+@dataclass
+class ChargerStatus:
+ max_current: float
+ rcd_status: str
+
+
+@dataclass
+class SessionStatus:
+ communication_stage: str
+ protocol: str
+ pins: str
+ cp_duty_cycle: int
+ cp_top_voltage: float
+ cp_state: str
+ pp_resistance: int
+ inlet_lock_state: str
+
+
+@dataclass
+class VehicleStatus:
+ soc: int
+ ac_vehicle_ready: str
+ dc_current_request: float
+ dc_present_current: float
+ dc_contactors_closed: str
+ dc_normal_end_of_charge: str
+ dc_battery_voltage: float
+ dc_inlet_voltage: float
+
+
+enable_can_log = False
+logged_messages = {}
+log_filename = f'./pev_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
+
+
+def log_can_msg(msg_name, signals, senders=None) -> None:
+ if not enable_can_log:
+ return
+
+ if msg_name in logged_messages and logged_messages[msg_name] == signals:
+ # The same message is logged with the same body last time, do not repeat
+ return
+
+ with open(log_filename, 'a') as file:
+ file.write(
+ f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}\t{senders}\t{msg_name}\t{signals}\n'
+ )
+ logged_messages[msg_name] = signals
+
+
+class AdvanticsPEVInterfaceV1(can.Listener):
+ def __init__(self, app: Application) -> None:
+ self._app = app
+ self._bus = app.bus
+ can_db_path = (
+ resources.files('advmonitors') / 'dbs' / 'Advantics_Generic_PEV_protocol_v1.kcd'
+ )
+ self._db: cantools.database.Database = cantools.database.load_file(can_db_path)
+
+ self._bus.set_filters(
+ [
+ can.typechecking.CanFilterExtended(
+ can_id=message.frame_id,
+ can_mask=0xFFF,
+ extended=False,
+ )
+ for message in self._db.messages
+ ],
+ )
+
+ self.vehicle_control = VehicleControl(
+ ac_evse_is_ready='----',
+ dc_close_contactors='----',
+ ###
+ dc_contactor_pos_fb='----',
+ dc_contactor_neg_fb='----',
+ digital_inputs='----',
+ stop_charge='----',
+ ptc0=-40,
+ ptc1=-40,
+ ptc2=-40,
+ cpu_temp=-40,
+ can_sensor_temp=-40,
+ )
+ self._app.update_vehicle_control(self.vehicle_control)
+
+ self.charger_status = ChargerStatus(
+ max_current=0,
+ rcd_status='----',
+ )
+ self._app.update_charger_status(self.charger_status)
+
+ self.session_status = SessionStatus(
+ communication_stage='----',
+ protocol='----',
+ pins='----',
+ cp_duty_cycle=0,
+ cp_top_voltage=0,
+ cp_state='----',
+ pp_resistance=0,
+ inlet_lock_state='----',
+ )
+ self._app.update_session_status(self.session_status)
+
+ self.vehicle_status = VehicleStatus(
+ soc=0,
+ ac_vehicle_ready='----',
+ dc_current_request=0,
+ dc_present_current=0,
+ dc_contactors_closed='----',
+ dc_normal_end_of_charge='----',
+ dc_battery_voltage=0,
+ dc_inlet_voltage=0,
+ )
+ self._app.update_vehicle_status(self.vehicle_status)
+
+ def on_message_received(self, msg: can.Message) -> None:
+ try:
+ message = self._db.get_message_by_frame_id(msg.arbitration_id)
+ except KeyError:
+ return
+ signals = message.decode(msg.data)
+
+ log_can_msg(message.name, signals, message.senders)
+
+ # Messages sent by the controller
+
+ if message.name == 'EVSE_Information':
+ self.session_status.communication_stage = str(signals['Communication_Stage'])
+ self.session_status.protocol = str(signals['Protocol'])
+ self.session_status.pins = str(signals['Pins'])
+ self._app.update_session_status(self.session_status)
+ self.charger_status.max_current = float(signals['Max_Current'])
+ self.charger_status.rcd_status = str(signals['RCD_Status'])
+ self._app.update_charger_status(self.charger_status)
+
+ elif message.name == 'AC_Control':
+ self.vehicle_control.ac_evse_is_ready = str(signals['Ready_To_Deliver_Power'])
+ self._app.update_vehicle_control(self.vehicle_control)
+
+ elif message.name == 'DC_Control':
+ self.vehicle_control.dc_close_contactors = str(signals['Close_Contactors'])
+ self._app.update_vehicle_control(self.vehicle_control)
+
+ elif message.name == 'CCS_Extra_Information':
+ self.session_status.cp_duty_cycle = int(signals['CP_Duty_Cycle'])
+ self.session_status.cp_top_voltage = float(signals['CP_Top_Voltage'])
+ self.session_status.cp_state = str(signals['CP_State'])
+ self.session_status.pp_resistance = int(signals['PP_Resistance'])
+ self.session_status.inlet_lock_state = str(signals['Inlet_Lock_State'])
+ self._app.update_session_status(self.session_status)
+
+ elif message.name == 'ADM_CS_EVCC_Inputs':
+ self.vehicle_control.dc_contactor_pos_fb = str(
+ signals['DC_Contactor_Positive_Feedback'],
+ )
+ self.vehicle_control.dc_contactor_neg_fb = str(
+ signals['DC_Contactor_Negative_Feedback'],
+ )
+ self.vehicle_control.digital_inputs = (
+ f'1:{"H" if signals["Digital_Input1"] else "L"} '
+ f'2:{"H" if signals["Digital_Input2"] else "L"}'
+ )
+ self.vehicle_control.stop_charge = str(signals['Stop_Charge'])
+ self.vehicle_control.ptc0 = int(signals['PTC0'])
+ self.vehicle_control.ptc1 = int(signals['PTC1'])
+ self.vehicle_control.ptc2 = int(signals['PTC2'])
+ self.vehicle_control.cpu_temp = int(signals['CPU_Temperature'])
+ self.vehicle_control.can_sensor_temp = int(signals['CAN_Sensor_Temperature'])
+ self._app.update_vehicle_control(self.vehicle_control)
+
+ # Do also the messages going to the controller
+ # in case we are just watching someone else session
+
+ elif message.name == 'EV_Information':
+ self.vehicle_status.soc = int(signals['State_of_Charge'])
+ self._app.update_vehicle_status(self.vehicle_status)
+
+ elif message.name == 'AC_Status':
+ self.vehicle_status.ac_vehicle_ready = str(signals['Ready_To_Charge'])
+ self._app.update_vehicle_status(self.vehicle_status)
+
+ elif message.name == 'DC_Status1':
+ self.vehicle_status.dc_current_request = float(signals['Current_Request'])
+ self.vehicle_status.dc_present_current = float(signals['Present_Current'])
+ self._app.update_vehicle_status(self.vehicle_status)
+
+ elif message.name == 'DC_Status2':
+ self.vehicle_status.dc_contactors_closed = str(signals['Contactors_Closed'])
+ self.vehicle_status.dc_normal_end_of_charge = str(signals['Normal_End_of_Charge'])
+ self.vehicle_status.dc_battery_voltage = float(signals['Battery_Voltage'])
+ self.vehicle_status.dc_inlet_voltage = float(signals['Inlet_Voltage'])
+ self._app.update_vehicle_status(self.vehicle_status)
+
+ def on_error(self, exc: Exception) -> None:
+ self._app.live.stop()
+ raise exc
+
+
+class RenderableConsole(Console):
+ def __init__(self) -> None:
+ super().__init__(record=True, file=open(os.devnull, 'w')) # noqa: PTH123, SIM115
+
+ def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
+ texts = self.export_text(clear=False).split('\n')
+ yield from texts[-options.height :]
+
+
+class Application:
+ def __init__(self, bus_config: can.typechecking.BusConfig, **interface_config: Any) -> None:
+ self._bus_config = bus_config
+ self._interface_config = interface_config
+ self._bus: can.BusABC | None = None
+ self._notifier: can.Notifier | None = None
+ self._pev: AdvanticsPEVInterfaceV1 | None = None
+
+ self.console = RenderableConsole()
+ self.layout = self._make_layout()
+ self.live = Live(self.layout, screen=True, redirect_stderr=True)
+
+ def _make_layout(self) -> Layout:
+ layout = Layout(name='EVSE')
+ layout.split_column(
+ Layout(name='First'),
+ Layout(name='Second'),
+ Layout(name='Monitor', visible=False),
+ )
+ layout['First'].split_row(
+ Layout(name='vehicle-control'),
+ Layout(name='charge-session'),
+ Layout(name='vehicle-status'),
+ )
+ layout['Second'].split_row(
+ Layout(name='charger-status'),
+ Layout(Panel(self.console, title='Console'), name='placeholder1', ratio=2),
+ )
+
+ return layout
+
+ @property
+ def bus(self) -> can.BusABC:
+ if (bus := self._bus) is None:
+ bus = self._bus = can.Bus(**self._bus_config)
+
+ return bus
+
+ @property
+ def pev(self) -> AdvanticsPEVInterfaceV1:
+ if (pev := self._pev) is None:
+ pev = self._pev = AdvanticsPEVInterfaceV1(self, **self._interface_config)
+
+ return pev
+
+ @property
+ def notifier(self) -> can.Notifier:
+ if (notifier := self._notifier) is None:
+ notifier = self._notifier = can.Notifier(self.bus, [self.pev])
+
+ return notifier
+
+ def start(self) -> None:
+ _ = self.notifier # Implicitly instantiate bus, pev, and notifier, which starts-up
+
+ def shutdown(self) -> None:
+ self.live.stop()
+
+ if self._notifier is not None:
+ self._notifier.stop()
+ self._notifier = None
+
+ if self._bus is not None:
+ self._bus.shutdown()
+ self._bus = None
+
+ def __enter__(self) -> Self:
+ self.start()
+ return self
+
+ def __exit__(
+ self,
+ exctype: type[BaseException] | None,
+ excinst: BaseException | None,
+ exctb: TracebackType | None,
+ ) -> bool:
+ self.shutdown()
+ return False
+
+ def display(self) -> None:
+ with self.live:
+ self.live._refresh_thread.join()
+
+ def update_vehicle_control(self, data: VehicleControl) -> None:
+ table = Table.grid(expand=True)
+ table.box = box.HORIZONTALS
+ table.add_column()
+ table.add_column()
+ table.add_row('[b]DC close contactors:[/]', data.dc_close_contactors)
+ table.add_section()
+ table.add_row('[b]AC EVSE ready:[/]', self._color_flag(data.ac_evse_is_ready))
+ table.add_section()
+ table.add_row('[b]DC contactor + feedback:[/]', data.dc_contactor_pos_fb)
+ table.add_row('[b]DC contactor - feedback:[/]', data.dc_contactor_neg_fb)
+ table.add_row(
+ '[b]Stop charge input:[/]',
+ self._color_flag(data.stop_charge, not_prefix='No_', invert=True),
+ )
+ table.add_row('[b]Digital inputs:[/]', self._color_logic(data.digital_inputs))
+ table.add_section()
+ table.add_row('[b]PTC 0:[/]', f'{data.ptc0} °C')
+ table.add_row('[b]PTC 1:[/]', f'{data.ptc1} °C')
+ table.add_row('[b]PTC 2:[/]', f'{data.ptc2} °C')
+ table.add_row('[b]CPU temperature:[/]', f'{data.cpu_temp} °C')
+ table.add_row('[b]CAN sensor temperature:[/]', f'{data.can_sensor_temp} °C')
+ self.layout['First']['vehicle-control'].update(Panel(table, title='Vehicle Control'))
+
+ def update_charger_status(self, data: ChargerStatus) -> None:
+ table = Table.grid(expand=True)
+ table.box = box.HORIZONTALS
+ table.add_column()
+ table.add_column()
+ table.add_row('[b]Max current:[/]', f'{data.max_current:.2f} A')
+ table.add_row(
+ '[b]RCD status:[/]',
+ self._color_flag(data.rcd_status, not_prefix='No_', invert=True),
+ )
+ self.layout['Second']['charger-status'].update(Panel(table, title='Charger Status'))
+
+ def update_session_status(self, data: SessionStatus) -> None:
+ table = Table.grid(expand=True)
+ table.box = box.HORIZONTALS
+ table.add_column()
+ table.add_column()
+ table.add_row('[b]Communication stage:[/]', data.communication_stage)
+ table.add_row('[b]Protocol:[/]', data.protocol)
+ table.add_row('[b]Pins:[/]', data.pins)
+ table.add_section()
+ table.add_row('[b]CP duty cycle:[/]', f'{data.cp_duty_cycle} %')
+ table.add_row('[b]CP top voltage:[/]', f'{data.cp_top_voltage:0.1f} V')
+ table.add_row('[b]CP state:[/]', data.cp_state)
+ table.add_row('[b]PP resistance:[/]', f'{data.pp_resistance} Ω')
+ table.add_row('[b]Inlet lock state:[/]', data.inlet_lock_state)
+ self.layout['First']['charge-session'].update(Panel(table, title='Charge Session'))
+
+ def update_vehicle_status(self, data: VehicleStatus) -> None:
+ table = Table.grid(expand=True)
+ table.box = box.HORIZONTALS
+ table.add_column()
+ table.add_column()
+ table.add_row('[b]State of charge:[/]', f'{data.soc} %')
+ table.add_section()
+ table.add_row('[b]DC inlet voltage:[/]', f'{data.dc_inlet_voltage:0.2f} V')
+ table.add_row('[b]DC battery voltage:[/]', f'{data.dc_battery_voltage:0.2f} V')
+ table.add_row('[b]DC current request:[/]', f'{data.dc_current_request:0.2f} A')
+ table.add_row('[b]DC present current:[/]', f'{data.dc_present_current:0.2f} A')
+ table.add_row('[b]DC contactors closed:[/]', data.dc_contactors_closed)
+ table.add_row(
+ '[b]DC normal end of charge:[/]',
+ self._color_flag(data.dc_normal_end_of_charge, not_prefix='No_', invert=True),
+ )
+ table.add_section()
+ table.add_row('[b]AC vehicle ready:[/]', self._color_flag(data.ac_vehicle_ready))
+ self.layout['First']['vehicle-status'].update(Panel(table, title='Vehicle Status'))
+
+ def _color_flag(self, flag: str, not_prefix: str = 'Not_', *, invert: bool = False) -> str:
+ if flag == '----':
+ return flag
+ if flag.startswith(not_prefix) != invert:
+ return f'[red]{flag}[/]'
+ else:
+ return f'[green]{flag}[/]'
+
+ def _color_logic(self, ios: str) -> str:
+ return ios.replace('H', '[green]H[/]').replace('L', '[red]L[/]')
+
+
+def cli_main(can_config: str = 'can.conf', enable_can_logging: bool = False) -> None:
+ global enable_can_log
+ enable_can_log = enable_can_logging
+ can_config_path = resources.files('advmonitors') / 'conf' / can_config
+ try:
+ bus_config = can.util.load_config(path=can_config_path)
+ except can.exceptions.CanInterfaceNotImplementedError as ex:
+ print(f'[red]ERROR:[/] Incorrect CAN configuration. {ex}.')
+ raise typer.Abort from ex
+
+ with Application(bus_config) as app:
+ app.display()
+
+
+def main() -> None:
+ typer.run(cli_main)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/charge-controllers/monitors/pev-monitor.py b/charge-controllers/monitors/advmonitors/pev_v2.py
similarity index 86%
rename from charge-controllers/monitors/pev-monitor.py
rename to charge-controllers/monitors/advmonitors/pev_v2.py
index 42fbaa9..88f6585 100644
--- a/charge-controllers/monitors/pev-monitor.py
+++ b/charge-controllers/monitors/advmonitors/pev_v2.py
@@ -7,13 +7,14 @@
# spell-checker:ignore cantools EVSE EVCC exctype excinst exctb
from __future__ import annotations
-# System imports
import os
from dataclasses import dataclass
-from pathlib import Path
-from typing import TYPE_CHECKING
from datetime import datetime
+# System imports
+from importlib import resources
+from typing import TYPE_CHECKING
+
# Third-party imports
import can
import cantools.database
@@ -83,30 +84,34 @@ class VehicleStatus:
soc: int
energy_capacity: float
ac_vehicle_ready: str
- dc_current_request: float
+ max_charge_current: float
dc_present_current: float
+ max_discharge_current: float
dc_contactors_closed: str
dc_normal_end_of_charge: str
+ dc_emergency_stop: str
dc_battery_voltage: float
dc_inlet_voltage: float
+ hv_preparing_hold_off: str
enable_can_log = False
logged_messages = {}
-log_filename = f"./pev_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log"
+log_filename = f'./pev_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
-def log_can_msg(msg_name, signals, senders=None):
+def log_can_msg(msg_name, signals, senders=None) -> None:
if not enable_can_log:
return
- if msg_name in logged_messages:
- if logged_messages[msg_name] == signals:
- # The same message is logged with the same body last time, do not repeat
- return
+ if msg_name in logged_messages and logged_messages[msg_name] == signals:
+ # The same message is logged with the same body last time, do not repeat
+ return
with open(log_filename, 'a') as file:
- file.write(f"{datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}\t{senders}\t{msg_name}\t{signals}\n")
+ file.write(
+ f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}\t{senders}\t{msg_name}\t{signals}\n'
+ )
logged_messages[msg_name] = signals
@@ -114,9 +119,10 @@ class AdvanticsPEVInterfaceV2(can.Listener):
def __init__(self, app: Application) -> None:
self._app = app
self._bus = app.bus
- self._db: cantools.database.Database = cantools.database.load_file(
- 'Advantics_Generic_PEV_protocol_v2.kcd',
- ) # type: ignore[reportAttributeAccessIssue]
+ can_db_path = (
+ resources.files('advmonitors') / 'dbs' / 'Advantics_Generic_PEV_protocol_v2.kcd'
+ )
+ self._db: cantools.database.Database = cantools.database.load_file(can_db_path)
self._bus.set_filters(
[
@@ -177,12 +183,15 @@ def __init__(self, app: Application) -> None:
soc=0,
energy_capacity=0,
ac_vehicle_ready='----',
- dc_current_request=0,
+ max_discharge_current=0,
dc_present_current=0,
+ max_charge_current=0,
dc_contactors_closed='----',
dc_normal_end_of_charge='----',
+ dc_emergency_stop='----',
dc_battery_voltage=0,
dc_inlet_voltage=0,
+ hv_preparing_hold_off='----',
)
self._app.update_vehicle_status(self.vehicle_status)
@@ -230,8 +239,8 @@ def on_message_received(self, msg: can.Message) -> None:
signals['DC_Contactor_Negative_Feedback']
)
self.vehicle_control.digital_inputs = (
- f'1:{"H" if signals['Digital_Input1'] else "L"} '
- f'2:{"H" if signals['Digital_Input2'] else "L"}'
+ f'1:{"H" if signals["Digital_Input1"] else "L"} '
+ f'2:{"H" if signals["Digital_Input2"] else "L"}'
)
self.vehicle_control.stop_charge = str(signals['Stop_Charge'])
self.vehicle_control.ptc0 = int(signals['PTC0'])
@@ -254,17 +263,23 @@ def on_message_received(self, msg: can.Message) -> None:
self._app.update_vehicle_status(self.vehicle_status)
elif message.name == 'DC_Status1':
- self.vehicle_status.dc_current_request = float(signals['Current_Request'])
+ self.vehicle_status.max_discharge_current = float(signals['Max_Discharge_Current'])
self.vehicle_status.dc_present_current = float(signals['Present_Current'])
+ self.vehicle_status.max_charge_current = float(signals['Max_Charge_Current'])
self._app.update_vehicle_status(self.vehicle_status)
elif message.name == 'DC_Status2':
self.vehicle_status.dc_contactors_closed = str(signals['Contactors_Closed'])
self.vehicle_status.dc_normal_end_of_charge = str(signals['Normal_End_of_Charge'])
+ self.vehicle_status.dc_emergency_stop = str(signals['Emergency_Stop'])
self.vehicle_status.dc_battery_voltage = float(signals['Battery_Voltage'])
self.vehicle_status.dc_inlet_voltage = float(signals['Inlet_Voltage'])
+ self.vehicle_status.dc_inlet_voltage = float(signals['Inlet_Voltage'])
self._app.update_vehicle_status(self.vehicle_status)
+ elif message.name == 'EV_Status':
+ self.vehicle_status.hv_preparing_hold_off = str(signals['HV_Preparing_Hold_Off'])
+
elif message.name == 'EV_Energy_Request':
self.vehicle_parameters.target_energy_request = float(signals['Target_Energy_Request'])
self.vehicle_parameters.min_energy_request = float(signals['Minimum_Energy_Request'])
@@ -273,10 +288,10 @@ def on_message_received(self, msg: can.Message) -> None:
elif message.name == 'EV_V2X_Energy_Request':
self.vehicle_parameters.min_v2x_energy_request = float(
- signals['Minimum_V2X_Energy_Request']
+ signals['Minimum_V2X_Energy_Request'],
)
self.vehicle_parameters.max_v2x_energy_request = float(
- signals['Maximum_V2X_Energy_Request']
+ signals['Maximum_V2X_Energy_Request'],
)
self._app.update_vehicle_parameters(self.vehicle_parameters)
@@ -295,7 +310,7 @@ def __init__(self) -> None:
def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
texts = self.export_text(clear=False).split('\n')
- yield from texts[-options.height:]
+ yield from texts[-options.height :]
class Application:
@@ -370,10 +385,10 @@ def __enter__(self) -> Self:
return self
def __exit__(
- self,
- exctype: type[BaseException] | None,
- excinst: BaseException | None,
- exctb: TracebackType | None,
+ self,
+ exctype: type[BaseException] | None,
+ excinst: BaseException | None,
+ exctb: TracebackType | None,
) -> bool:
self.shutdown()
return False
@@ -427,8 +442,14 @@ def update_vehicle_parameters(self, data: VehicleParameters) -> None:
table.add_row('[b]Minimum energy request:[/]', f'{data.min_energy_request:.2f} kWh')
table.add_row('[b]Maximum energy request:[/]', f'{data.max_energy_request:.2f} kWh')
table.add_section()
- table.add_row('[b]Minimum V2X energy request:[/]', f'{data.min_v2x_energy_request:.2f} kWh')
- table.add_row('[b]Maximum V2X energy request:[/]', f'{data.max_v2x_energy_request:.2f} kWh')
+ table.add_row(
+ '[b]Minimum V2X energy request:[/]',
+ f'{data.min_v2x_energy_request:.2f} kWh',
+ )
+ table.add_row(
+ '[b]Maximum V2X energy request:[/]',
+ f'{data.max_v2x_energy_request:.2f} kWh',
+ )
table.add_section()
table.add_row('[b]Departure time:[/]', f'{data.departure_time} s')
self.layout['Second']['vehicle-parameters'].update(Panel(table, title='Vehicle Parameters'))
@@ -459,15 +480,18 @@ def update_vehicle_status(self, data: VehicleStatus) -> None:
table.add_section()
table.add_row('[b]DC inlet voltage:[/]', f'{data.dc_inlet_voltage:0.2f} V')
table.add_row('[b]DC battery voltage:[/]', f'{data.dc_battery_voltage:0.2f} V')
- table.add_row('[b]DC current request:[/]', f'{data.dc_current_request:0.2f} A')
+ table.add_row('[b]DC max charge current:[/]', f'{data.max_charge_current:0.2f} A')
table.add_row('[b]DC present current:[/]', f'{data.dc_present_current:0.2f} A')
+ table.add_row('[b]DC max discharge current:[/]', f'{data.max_discharge_current:0.2f} A')
table.add_row('[b]DC contactors closed:[/]', data.dc_contactors_closed)
table.add_row(
'[b]DC normal end of charge:[/]',
self._color_flag(data.dc_normal_end_of_charge, not_prefix='No_', invert=True),
)
+ table.add_row('[b]DC emergency stop:[/]', data.dc_emergency_stop)
table.add_section()
table.add_row('[b]AC vehicle ready:[/]', self._color_flag(data.ac_vehicle_ready))
+ table.add_row('[b]HV preparing hold off:[/]', f'{data.hv_preparing_hold_off}')
self.layout['First']['vehicle-status'].update(Panel(table, title='Vehicle Status'))
def _color_flag(self, flag: str, not_prefix: str = 'Not_', *, invert: bool = False) -> str:
@@ -482,14 +506,12 @@ def _color_logic(self, ios: str) -> str:
return ios.replace('H', '[green]H[/]').replace('L', '[red]L[/]')
-def cli_main(
- can_config: Path = Path('can.conf'),
- enable_can_logging: bool = False
-) -> None:
+def cli_main(can_config: str = 'can.conf', enable_can_logging: bool = False) -> None:
global enable_can_log
enable_can_log = enable_can_logging
+ can_config_path = resources.files('advmonitors') / 'conf' / can_config
try:
- bus_config = can.util.load_config(path=can_config)
+ bus_config = can.util.load_config(path=can_config_path)
except can.exceptions.CanInterfaceNotImplementedError as ex:
print(f'[red]ERROR:[/] Incorrect CAN configuration. {ex}.')
raise typer.Abort from ex
@@ -498,5 +520,9 @@ def cli_main(
app.display()
-if __name__ == '__main__':
+def main() -> None:
typer.run(cli_main)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/charge-controllers/monitors/evse_monitor.py b/charge-controllers/monitors/evse_monitor.py
new file mode 100644
index 0000000..328dbbd
--- /dev/null
+++ b/charge-controllers/monitors/evse_monitor.py
@@ -0,0 +1,3 @@
+from advmonitors.evse import main
+
+main()
diff --git a/charge-controllers/monitors/pev_monitor_v1.py b/charge-controllers/monitors/pev_monitor_v1.py
new file mode 100644
index 0000000..670efd4
--- /dev/null
+++ b/charge-controllers/monitors/pev_monitor_v1.py
@@ -0,0 +1,3 @@
+from advmonitors.pev_v1 import main
+
+main()
diff --git a/charge-controllers/monitors/pev_monitor_v2.py b/charge-controllers/monitors/pev_monitor_v2.py
new file mode 100644
index 0000000..8511cf9
--- /dev/null
+++ b/charge-controllers/monitors/pev_monitor_v2.py
@@ -0,0 +1,3 @@
+from advmonitors.pev_v2 import main
+
+main()
diff --git a/charge-controllers/monitors/pyproject.toml b/charge-controllers/monitors/pyproject.toml
new file mode 100644
index 0000000..5c30f2d
--- /dev/null
+++ b/charge-controllers/monitors/pyproject.toml
@@ -0,0 +1,48 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "advmonitors"
+version="1.0.0"
+description = "Monitoring tools for the Advantics CAN interfaces"
+requires-python = ">=3.9"
+authors = [
+ { name = "Axel Voitier", email = "axel.voitier@advantics.fr" },
+ { name = "Can Berk Durmus", email = "can.durmus@advantics.fr" },
+ { name = "Baptiste Pestourie", email = "baptiste.pestourie@advantics.fr" },
+]
+
+# Note: project-level dependencies are automatically installed in the default environement
+# and all the envs inheriting from it
+dependencies = [
+ "python-can>=3.3.4",
+ "cantools>=36.0.0",
+ "typer~=0.12.0",
+ "asciichartpy~=1.5.0",
+]
+
+[project.urls]
+Homepage = "https://github.com/ADVANTICS/examples"
+
+[tool.hatch.build.targets.wheel]
+packages = ["advmonitors"]
+include = [
+ "advmonitors/conf/*.conf",
+ "advmonitors/dbs/*.kcd"
+]
+
+[project.scripts]
+evse-monitor="advmonitors.evse:main"
+pev-monitor-v1="advmonitors.pev_v1:main"
+pev-monitor-v2="advmonitors.pev_v2:main"
+
+[tool.mypy]
+namespace_packages = true
+explicit_package_bases= true
+ignore_missing_imports = true
+
+[tool.ruff.format]
+quote-style = "single"
+indent-style = "space"
+docstring-code-format = true
diff --git a/charge-controllers/monitors/requirements.txt b/charge-controllers/monitors/requirements.txt
index 09fea2c..084260b 100644
--- a/charge-controllers/monitors/requirements.txt
+++ b/charge-controllers/monitors/requirements.txt
@@ -1,4 +1,4 @@
-python-can ~= 4.4.0
+python-can >= 4.4.2
cantools ~= 39.4.0
typer ~= 0.12.0
asciichartpy ~= 1.5.0
diff --git a/charge-controllers/tutorials/EVSE Generic Interface v3/requirements.txt b/charge-controllers/tutorials/EVSE Generic Interface v3/requirements.txt
deleted file mode 100644
index be96ccf..0000000
--- a/charge-controllers/tutorials/EVSE Generic Interface v3/requirements.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-python-can ~= 4.4.0
-typer ~= 0.12.0
diff --git a/charge-controllers/tutorials/PEV Generic Interface v2/requirements.txt b/charge-controllers/tutorials/PEV Generic Interface v2/requirements.txt
deleted file mode 100644
index be96ccf..0000000
--- a/charge-controllers/tutorials/PEV Generic Interface v2/requirements.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-python-can ~= 4.4.0
-typer ~= 0.12.0
diff --git a/charge-controllers/tutorials/advsimulators/__init__.py b/charge-controllers/tutorials/advsimulators/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/charge-controllers/tutorials/PEV Generic Interface v2/can.conf b/charge-controllers/tutorials/advsimulators/conf/can.conf
similarity index 100%
rename from charge-controllers/tutorials/PEV Generic Interface v2/can.conf
rename to charge-controllers/tutorials/advsimulators/conf/can.conf
diff --git a/charge-controllers/tutorials/advsimulators/conf/vcan.conf b/charge-controllers/tutorials/advsimulators/conf/vcan.conf
new file mode 100644
index 0000000..0d1106c
--- /dev/null
+++ b/charge-controllers/tutorials/advsimulators/conf/vcan.conf
@@ -0,0 +1,4 @@
+[default]
+interface = socketcan
+channel = vcan0
+bitrate = 500000
diff --git a/charge-controllers/tutorials/advsimulators/evse/__init__.py b/charge-controllers/tutorials/advsimulators/evse/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/charge-controllers/tutorials/EVSE Generic Interface v3/evse-simulator.py b/charge-controllers/tutorials/advsimulators/evse/generic_v3.py
similarity index 82%
rename from charge-controllers/tutorials/EVSE Generic Interface v3/evse-simulator.py
rename to charge-controllers/tutorials/advsimulators/evse/generic_v3.py
index e208971..1ca27a5 100644
--- a/charge-controllers/tutorials/EVSE Generic Interface v3/evse-simulator.py
+++ b/charge-controllers/tutorials/advsimulators/evse/generic_v3.py
@@ -11,7 +11,7 @@
import enum
import struct
import time
-from pathlib import Path
+from importlib import resources
from threading import Event, Thread
from typing import TYPE_CHECKING
@@ -138,23 +138,23 @@ class Simulator(can.Listener):
_sequence_flags: int
def __init__(
- self,
- app: Application,
- *,
- pistol_index: int,
- ccs_authorisation_duration: float = 3,
- ccs_authorisation_success: bool = True,
- charge_parameters_negotiation_duration: float = 1.5,
- power_modules_wake_up_duration: float = 1,
- power_modules_dead_time: float = 1,
- voltage_ramp_up_slope: float = 200,
- voltage_ramp_down_slope: float = 100,
- current_ramp_up_slope: float = 20,
- current_ramp_down_slope: float = 20,
- charge_duration: float = 10,
- maximum_voltage: float = 500,
- maximum_charge_current: float = 120,
- maximum_discharge_current: float = 120,
+ self,
+ app: Application,
+ *,
+ pistol_index: int,
+ ccs_authorisation_duration: float = 3,
+ ccs_authorisation_success: bool = True,
+ charge_parameters_negotiation_duration: float = 1.5,
+ power_modules_wake_up_duration: float = 1,
+ power_modules_dead_time: float = 1,
+ voltage_ramp_up_slope: float = 200,
+ voltage_ramp_down_slope: float = 100,
+ current_ramp_up_slope: float = 20,
+ current_ramp_down_slope: float = 20,
+ charge_duration: float = 10,
+ maximum_voltage: float = 500,
+ maximum_charge_current: float = 120,
+ maximum_discharge_current: float = 120,
) -> None:
self._app = app
self._bus = app.bus
@@ -316,65 +316,64 @@ def on_message_received(self, msg: can.Message) -> None:
def update_state(self, new_state: ControllerState) -> None:
"""Uses [0x6B000] Advantics_Controller_Status.State to sequence non-powered things"""
- match new_state:
- case ControllerState.Not_Available:
- # The controller starts by default in Not_Available.
- # You need to send it once the Sequence_Control message with
- # flag Start_Charge_Authorisation set to 1.
- if self.sequence_flags & SequenceFlags.Start_Charge_Authorisation:
- self.send_sequence_control()
-
- case ControllerState.Waiting_For_PEV:
- # This indicates we terminated a charge session (or controller just started).
- # Use it to reset our internal states.
- self.reset()
- print('--------------------------------------')
-
- case ControllerState.CCS_Authorisation_Process:
- # In CCS_Authorisation_Process we want to update sequence flags
- # with both CCS_Authorisation_Done, and CCS_Authorisation_Valid.
- if not (self.sequence_flags & SequenceFlags.CCS_Authorisation_Done):
- print('Authorising user...')
- call_later(self.simulate_ccs_authorisation, self._ccs_authorisation_duration)
-
- case ControllerState.Connected_With_Full_Info:
- # Here we have to update sequence flags with Charge_Parameters_Done
- if not (self.sequence_flags & SequenceFlags.Charge_Parameters_Done):
- print('Determining charge parameters...')
- call_later(
- self.simulate_charge_parameters_done,
- self._charge_parameters_negotiation_duration,
- )
-
- case ControllerState.Insulation_Test:
- # Before we can receive setpoints for insulation test, we must signal
- # System_Enable to be 1. You could have it always set to 1 if you want.
- # But here we will use it for the intended feature of waiting
- # power modules wake-up.
- if not self.system_enable:
- print('Waking-up power modules...')
- call_later(
- self.simulate_power_modules_wake_up,
- self._power_modules_wake_up_duration,
- )
-
- case ControllerState.Charging:
- # Fall back to charging at max current in the beginning of charging every time.
- # To be overridden by OCPP if wanted.
- self._dynamic_target_current = self.current_range_max
- print(f'Update dynamic target current as {self._dynamic_target_current}')
- # To end the simulation after a set time
- call_later(self.simulate_normal_charge_stop, self._charge_duration)
-
- case _:
- pass
+ if new_state == ControllerState.Not_Available:
+ # The controller starts by default in Not_Available.
+ # You need to send it once the Sequence_Control message with
+ # flag Start_Charge_Authorisation set to 1.
+ if self.sequence_flags & SequenceFlags.Start_Charge_Authorisation:
+ self.send_sequence_control()
+
+ elif new_state == ControllerState.Waiting_For_PEV:
+ # This indicates we terminated a charge session (or controller just started).
+ # Use it to reset our internal states.
+ self.reset()
+ print('--------------------------------------')
+
+ elif new_state == ControllerState.CCS_Authorisation_Process:
+ # In CCS_Authorisation_Process we want to update sequence flags
+ # with both CCS_Authorisation_Done, and CCS_Authorisation_Valid.
+ if not (self.sequence_flags & SequenceFlags.CCS_Authorisation_Done):
+ print('Authorising user...')
+ call_later(
+ self.simulate_ccs_authorisation,
+ self._ccs_authorisation_duration,
+ )
+
+ elif new_state == ControllerState.Connected_With_Full_Info:
+ # Here we have to update sequence flags with Charge_Parameters_Done
+ if not (self.sequence_flags & SequenceFlags.Charge_Parameters_Done):
+ print('Determining charge parameters...')
+ call_later(
+ self.simulate_charge_parameters_done,
+ self._charge_parameters_negotiation_duration,
+ )
+
+ elif new_state == ControllerState.Insulation_Test:
+ # Before we can receive setpoints for insulation test, we must signal
+ # System_Enable to be 1. You could have it always set to 1 if you want.
+ # But here we will use it for the intended feature of waiting
+ # power modules wake-up.
+ if not self.system_enable:
+ print('Waking-up power modules...')
+ call_later(
+ self.simulate_power_modules_wake_up,
+ self._power_modules_wake_up_duration,
+ )
+
+ elif new_state == ControllerState.Charging:
+ # Fall back to charging at max current in the beginning of charging every time.
+ # To be overridden by OCPP if wanted.
+ self._dynamic_target_current = self.current_range_max
+ print(f'Update dynamic target current as {self._dynamic_target_current}')
+ # To end the simulation after a set time
+ call_later(self.simulate_normal_charge_stop, self._charge_duration)
def simulate_ccs_authorisation(self) -> None:
"""Delayed callback to proceed with user authorisation process"""
if self._ccs_authorisation_success:
self.sequence_flags |= (
- SequenceFlags.CCS_Authorisation_Done | SequenceFlags.CCS_Authorisation_Valid
+ SequenceFlags.CCS_Authorisation_Done | SequenceFlags.CCS_Authorisation_Valid
)
print('User is authorised to charge')
else:
@@ -411,21 +410,20 @@ def update_dc_power_control(self) -> None:
or range, the setpoints mode, and the output contactors and voltage lowering commands.
"""
- match self.power_function:
- case PowerFunction.Off:
- self.handle_off()
+ if self.power_function == PowerFunction.Off:
+ self.handle_off()
- case PowerFunction.Standby:
- self.handle_standby()
+ elif self.power_function == PowerFunction.Standby:
+ self.handle_standby()
- case PowerFunction.Insulation_Test:
- self.handle_insulation_test()
+ elif self.power_function == PowerFunction.Insulation_Test:
+ self.handle_insulation_test()
- case PowerFunction.Precharge:
- self.handle_precharge()
+ elif self.power_function == PowerFunction.Precharge:
+ self.handle_precharge()
- case PowerFunction.Power_Transfer:
- self.handle_power_transfer()
+ elif self.power_function == PowerFunction.Power_Transfer:
+ self.handle_power_transfer()
self._last_power_function = self.power_function
@@ -472,12 +470,12 @@ def _0v_reached() -> bool:
self.present_current = 0
if (
- self.lower_output_voltage
- and (self.present_voltage > 0)
- and self._ramps_simulator.target_voltage != 0
+ self.lower_output_voltage
+ and (self.present_voltage > 0)
+ and self._ramps_simulator.target_voltage != 0
):
print('Lowering output voltage...')
- # Set the target_current to zero to avoid the ramp sim from ramping back up to target from 0A
+ # Set target_current to zero to prevent the ramp sim from ramping back to target from 0A
self._ramps_simulator.set_target_current(0)
self._ramps_simulator.set_target_voltage(0, reached_cb=_0v_reached)
@@ -553,7 +551,11 @@ def handle_power_transfer(self) -> None:
# if in range_mode, get the target setpoint from _dynamic_target_current
if self.setpoints_mode == SetpointsMode.Range_Mode:
- target_current = self._cap(self._dynamic_target_current, self.current_range_min, self.current_range_max)
+ target_current = self._cap(
+ self._dynamic_target_current,
+ self.current_range_min,
+ self.current_range_max,
+ )
else:
# Simple unidirectional power delivery on maximum current
target_current = self.current_range_max
@@ -726,13 +728,13 @@ def wrapper() -> None:
class RampSimulator:
def __init__(
- self,
- simulator: Simulator,
- voltage_ramp_up: float,
- voltage_ramp_down: float,
- current_ramp_up: float,
- current_ramp_down: float,
- update_dt: float = 0.1,
+ self,
+ simulator: Simulator,
+ voltage_ramp_up: float,
+ voltage_ramp_down: float,
+ current_ramp_up: float,
+ current_ramp_down: float,
+ update_dt: float = 0.1,
) -> None:
self._simulator = simulator
self._update_dt = update_dt
@@ -749,20 +751,20 @@ def __init__(
self.current_reached_cb: Callable[[], bool | None] | None = None
def set_target_voltage(
- self,
- target_voltage: float,
- reached_cb: Callable[[], bool | None] | None = None,
- delay: float = 0,
+ self,
+ target_voltage: float,
+ reached_cb: Callable[[], bool | None] | None = None,
+ delay: float = 0,
) -> None:
self.voltage_reached_cb = reached_cb
self._delay = time.monotonic() + delay
self.target_voltage = target_voltage
def set_target_current(
- self,
- target_current: float,
- reached_cb: Callable[[], bool | None] | None = None,
- delay: float = 0,
+ self,
+ target_current: float,
+ reached_cb: Callable[[], bool | None] | None = None,
+ delay: float = 0,
) -> None:
self.current_reached_cb = reached_cb
self._delay = time.monotonic() + delay
@@ -886,10 +888,10 @@ def run(self) -> None:
###
def __exit__(
- self,
- exctype: type[BaseException] | None,
- excinst: BaseException | None,
- exctb: TracebackType | None,
+ self,
+ exctype: type[BaseException] | None,
+ excinst: BaseException | None,
+ exctb: TracebackType | None,
) -> bool:
"""Exits a with-statement by shutting down the application (incl. closing the bus).
Does not handle any exception."""
@@ -911,49 +913,53 @@ def shutdown(self) -> None:
def cli_main(
- can_config: Path = Path('can.conf'),
- *,
- pistol_index: int = 1,
- ccs_authorisation_duration: float = 3,
- ccs_authorisation_success: bool = True,
- charge_parameters_negotiation_duration: float = 1.5,
- power_modules_wake_up_duration: float = 1,
- power_modules_dead_time: float = 1,
- voltage_ramp_up_slope: float = 200,
- voltage_ramp_down_slope: float = 100,
- current_ramp_up_slope: float = 20,
- current_ramp_down_slope: float = 20,
- charge_duration: float = 10,
- maximum_voltage: float = 500,
- maximum_charge_current: float = 120,
- maximum_discharge_current: float = 120,
+ can_config: str = 'can.conf',
+ pistol_index: int = 1,
+ ccs_authorisation_duration: float = 3,
+ ccs_authorisation_success: bool = True,
+ charge_parameters_negotiation_duration: float = 1.5,
+ power_modules_wake_up_duration: float = 1,
+ power_modules_dead_time: float = 1,
+ voltage_ramp_up_slope: float = 200,
+ voltage_ramp_down_slope: float = 100,
+ current_ramp_up_slope: float = 20,
+ current_ramp_down_slope: float = 20,
+ charge_duration: float = 10,
+ maximum_voltage: float = 500,
+ maximum_charge_current: float = 120,
+ maximum_discharge_current: float = 120,
) -> None:
"""Simulator of power modules compatible with Advantics EVSE Generic CAN interface v3"""
+ can_config_path = resources.files('advsimulators') / 'conf' / can_config
try:
- bus_config = can.util.load_config(path=can_config)
+ bus_config = can.util.load_config(path=can_config_path)
except can.exceptions.CanInterfaceNotImplementedError as ex:
print(f'[red]ERROR:[/] Incorrect CAN configuration. {ex}.')
raise typer.Abort from ex
with Application(
- bus_config,
- pistol_index=pistol_index,
- ccs_authorisation_duration=ccs_authorisation_duration,
- ccs_authorisation_success=ccs_authorisation_success,
- charge_parameters_negotiation_duration=charge_parameters_negotiation_duration,
- power_modules_wake_up_duration=power_modules_wake_up_duration,
- power_modules_dead_time=power_modules_dead_time,
- voltage_ramp_up_slope=voltage_ramp_up_slope,
- voltage_ramp_down_slope=voltage_ramp_down_slope,
- current_ramp_up_slope=current_ramp_up_slope,
- current_ramp_down_slope=current_ramp_down_slope,
- charge_duration=charge_duration,
- maximum_voltage=maximum_voltage,
- maximum_charge_current=maximum_charge_current,
- maximum_discharge_current=maximum_discharge_current,
+ bus_config,
+ pistol_index=pistol_index,
+ ccs_authorisation_duration=ccs_authorisation_duration,
+ ccs_authorisation_success=ccs_authorisation_success,
+ charge_parameters_negotiation_duration=charge_parameters_negotiation_duration,
+ power_modules_wake_up_duration=power_modules_wake_up_duration,
+ power_modules_dead_time=power_modules_dead_time,
+ voltage_ramp_up_slope=voltage_ramp_up_slope,
+ voltage_ramp_down_slope=voltage_ramp_down_slope,
+ current_ramp_up_slope=current_ramp_up_slope,
+ current_ramp_down_slope=current_ramp_down_slope,
+ charge_duration=charge_duration,
+ maximum_voltage=maximum_voltage,
+ maximum_charge_current=maximum_charge_current,
+ maximum_discharge_current=maximum_discharge_current,
) as app:
app.run()
-if __name__ == '__main__':
+def main() -> None:
typer.run(cli_main)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/charge-controllers/tutorials/advsimulators/pev/__init__.py b/charge-controllers/tutorials/advsimulators/pev/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/charge-controllers/tutorials/advsimulators/pev/generic_v1.py b/charge-controllers/tutorials/advsimulators/pev/generic_v1.py
new file mode 100644
index 0000000..edeeed1
--- /dev/null
+++ b/charge-controllers/tutorials/advsimulators/pev/generic_v1.py
@@ -0,0 +1,596 @@
+# Copyright (c) 2025 ADVANTICS SAS
+# Original author: Can Durmus
+# Part of Advantics examples
+# MIT licensed
+#
+# spell-checker:words
+# spell-checker:ignore EVSE EVCC exctype excinst exctb incl
+from __future__ import annotations
+
+import struct
+import time
+from enum import IntEnum
+
+# System imports
+from importlib import resources
+from threading import Event, Thread
+from typing import TYPE_CHECKING
+
+# Third-party imports
+import can
+import typer
+from rich import print
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+ from types import TracebackType
+ from typing import Any, Self
+
+
+class FrameID(IntEnum):
+ """Enumeration of all CAN messages of the interface and their frame IDs"""
+
+ # Controller sends
+ EVSE_Information = 0x600
+ AC_Control = 0x601
+ DC_Control = 0x602
+ CCS_Extra_Information = 0x603
+ ADM_CS_EVCC_Inputs = 0x604
+
+ # BMS / vehicle side sends
+ EV_Information = 0x610
+ AC_Status = 0x611
+ DC_Status1 = 0x612
+ DC_Status2 = 0x613
+ # EV_Energy_Request = 0x614 # Not supported on PEV Generic Interface v1
+ # EV_V2X_Energy_Request = 0x615 # Not supported on PEV Generic Interface v1
+ # EV_Extra_BPT_Information = 0x616 # Not supported on PEV Generic Interface v1
+
+
+class CommunicationStage(IntEnum):
+ """Enumeration of communication stages.
+ Used in message [0x600] EVSE_Information.
+ """
+
+ Initialising = 0
+ Waiting_For_EVSE = 1
+ Negotiating_Connection = 2
+ Connected_With_Full_Info = 3
+ Insulation_Test = 4
+ Precharge = 5
+ Waiting_For_Charge = 6
+ Charging = 7
+ Ending_Charge = 8
+ Welding_Detection = 9
+ Closing_Communication = 10
+
+ def __str__(self) -> str:
+ return self.name
+
+
+class ChargingProtocol(IntEnum):
+ """Enumeration of charging protocols.
+ Used in message [0x600] EVSE_Information.
+ """
+
+ NONE = 0
+ CCS_PWM = 1
+ CCS_DIN_70121_2012_v2 = 2
+ CCS_ISO_15118_2013_v2 = 3
+ CCS_ISO_15118_2022 = 4
+
+
+class InletPins(IntEnum):
+ """Enumeration of inlet pins.
+ Used in message [0x600] EVSE_Information.
+ """
+
+ NONE = 0
+ CCS_AC = 1
+ CCS_AC_Single_Phase_Core = 2
+ CCS_AC_Three_Phase_Core = 3
+ CCS_DC_Core = 4
+ CCS_DC_Extended = 5
+
+
+class Simulator(can.Listener):
+ """Simulator of BMS/vehicle side compatible with Advantics PEV Generic CAN interface v2"""
+
+ # Internal states definitions
+ # Session info
+ session_stage: CommunicationStage
+ session_protocol: ChargingProtocol
+ session_pins: InletPins
+ # EVES info
+ evse_max_current: float
+ evse_ac_ready: bool
+ # EV info
+ ev_soc: int
+ # ev_energy_capacity: float # Not supported on PEV Generic Interface v1
+ ev_ac_ready: bool
+ ev_dc_battery_voltage: float
+ ev_dc_inlet_voltage: float
+ ev_dc_current_request: float
+ ev_dc_present_current: float
+ ev_dc_contactors_closed: bool
+ ev_dc_request_normal_stop: bool
+ # Commands from controller
+ command_close_contactors: bool
+
+ def __init__(
+ self,
+ app: Application,
+ charger_dead_time: float = 1,
+ charger_voltage_ramp_up_slope: float = 200,
+ contactors_delay: float = 0.6,
+ maximum_energy_request: float = 75.530, # 100%
+ target_energy_request: float = 60.424, # 80%
+ minimum_energy_request: float = 22.659, # 30%
+ maximum_v2x_energy_request: float = 60.424, # 80%
+ minimum_v2x_energy_request: float = 22.659, # 30%
+ departure_time: int = 86400, # 24h in s
+ ) -> None:
+ self._app = app
+ self._bus = app.bus
+
+ # Simulation parameters
+ self._charger_dead_time = charger_dead_time
+ self._charger_voltage_ramp_up_slope = charger_voltage_ramp_up_slope
+ self._contactors_delay = contactors_delay
+ self._maximum_energy_request: float = maximum_energy_request
+ self._target_energy_request: float = target_energy_request
+ self._minimum_energy_request: float = minimum_energy_request
+ self._maximum_v2x_energy_request: float = maximum_v2x_energy_request
+ self._minimum_v2x_energy_request: float = minimum_v2x_energy_request
+ self._departure_time: int = departure_time
+
+ # Initialise internal states
+ self._slope_start_voltage = 0
+ self.session_stage = CommunicationStage.Initialising
+ self._last_stage = CommunicationStage.Initialising
+ self.session_protocol = ChargingProtocol.NONE
+ self.session_pins = InletPins.NONE
+ self.evse_max_current = 0
+ self.evse_ac_ready = False
+ self.command_close_contactors = False
+ self._previous_contactors_command = False
+ self.reset()
+
+ # Periodic messages
+ # EV_Information
+ self._ev_information_msg = can.Message(
+ arbitration_id=FrameID.EV_Information,
+ is_extended_id=False,
+ data=self.encode_ev_information(),
+ )
+ self._ev_information_task = self._bus.send_periodic(
+ self._ev_information_msg,
+ period=0.1,
+ modifier_callback=self._send_callback_ev_information,
+ )
+ # DC_Status1
+ self._dc_status1_msg = can.Message(
+ arbitration_id=FrameID.DC_Status1,
+ is_extended_id=False,
+ data=self.encode_dc_status1(),
+ )
+ self._dc_status1_task = self._bus.send_periodic(
+ self._dc_status1_msg,
+ period=0.1,
+ modifier_callback=self._send_callback_dc_status1,
+ )
+ # DC_Status2
+ self._dc_status2_msg = can.Message(
+ arbitration_id=FrameID.DC_Status2,
+ is_extended_id=False,
+ data=self.encode_dc_status2(),
+ )
+ self._dc_status2_task = self._bus.send_periodic(
+ self._dc_status2_msg,
+ period=0.1,
+ modifier_callback=self._send_callback_dc_status2,
+ )
+
+ # Internal states
+
+ def reset(self) -> None:
+ """Reset our internal states"""
+ self.ev_soc = 30
+ self.ev_ac_ready = False
+ self.ev_dc_battery_voltage = 330
+ self.ev_dc_inlet_voltage = 0
+ self.ev_dc_current_request = 200
+ self.ev_dc_present_current = 0
+ self.ev_dc_contactors_closed = False
+ self.ev_dc_request_normal_stop = False
+
+ # Messages reading and state processing
+
+ def on_message_received(self, msg: can.Message) -> None:
+ """Main callback for every CAN message received.
+
+ Basic principle is we look for message that interest us. When there is one,
+ we decode it (NB: the decode functions directly change our internal state variables).
+ """
+
+ # Messages from the controller
+ if msg.arbitration_id == FrameID.EVSE_Information:
+ self.decode_evse_information(msg.data)
+ if self._last_stage != self.session_stage:
+ self.update_state()
+ self._last_stage = self.session_stage
+
+ elif msg.arbitration_id == FrameID.AC_Control:
+ self.decode_ac_control(msg.data)
+
+ elif msg.arbitration_id == FrameID.DC_Control:
+ self.decode_dc_control(msg.data)
+ self.handle_dc_control() # DC_Control contains commands
+
+ # elif msg.arbitration_id == FrameID.CCS_Extra_Information:
+ # self.decode_ccs_extra_information(msg.data)
+
+ # elif msg.arbitration_id == FrameID.ADM_CS_EVCC_Inputs:
+ # self.decode_adm_cs_evcc_inputs(msg.data)
+
+ # Messages to the controller
+ # We listen for those only to update our own internal states in case
+ # these are sent externally (eg. you playing around with sending some CAN messages)
+ # You do not have to do the same in your implementation.
+
+ elif msg.arbitration_id == FrameID.EV_Information:
+ self.decode_ev_information(msg.data)
+
+ elif msg.arbitration_id == FrameID.AC_Status:
+ self.decode_ac_status(msg.data)
+
+ elif msg.arbitration_id == FrameID.DC_Status1:
+ self.decode_dc_status1(msg.data)
+
+ elif msg.arbitration_id == FrameID.DC_Status2:
+ self.decode_dc_status2(msg.data)
+
+ def update_state(self) -> None:
+ if self.session_stage == CommunicationStage.Waiting_For_EVSE:
+ # This indicates we terminated a charge session (or controller just started).
+ # Use it to reset our internal states.
+ self.reset()
+
+ elif self.session_stage == CommunicationStage.Precharge:
+ print(f'Inlet voltage ramping up to {self.ev_dc_battery_voltage:.1f} V...')
+ self._slope_start_voltage = self.ev_dc_inlet_voltage
+ total_time = self._charger_dead_time + (
+ (self.ev_dc_battery_voltage - self.ev_dc_inlet_voltage)
+ / self._charger_voltage_ramp_up_slope
+ )
+ subdivide_dt(self.simulate_precharge, total_time, 0.1)
+
+ elif self.session_stage == CommunicationStage.Welding_Detection:
+ print('Setting present_current to 0 A')
+ self.ev_dc_present_current = 0
+
+ def simulate_precharge(self, elapsed: float, done: bool) -> None: # noqa: FBT001
+ if elapsed <= self._charger_dead_time:
+ return
+
+ if not done:
+ elapsed -= self._charger_dead_time
+ self.ev_dc_inlet_voltage = min(
+ self._slope_start_voltage + (self._charger_voltage_ramp_up_slope * elapsed),
+ self.ev_dc_battery_voltage,
+ )
+ else:
+ self.ev_dc_inlet_voltage = self.ev_dc_battery_voltage
+ print(f'Inlet voltage at {self.ev_dc_inlet_voltage:.1f} V')
+
+ def handle_dc_control(self) -> None:
+ if self.command_close_contactors != self._previous_contactors_command:
+ if self.command_close_contactors:
+ print('Closing contactors...')
+ call_later(self.simulate_close_contactors, self._contactors_delay)
+ else:
+ print('Opening contactors')
+ call_later(self.simulate_open_contactors, self._contactors_delay)
+
+ self._previous_contactors_command = self.command_close_contactors
+
+ def simulate_close_contactors(self) -> None:
+ self.ev_dc_contactors_closed = True
+ self.ev_dc_inlet_voltage = self.ev_dc_battery_voltage
+ print('Contactors closed')
+
+ def simulate_open_contactors(self) -> None:
+ self.ev_dc_contactors_closed = False
+ self.ev_dc_inlet_voltage = 0
+ print('Contactors opened')
+
+ # Message encoding, decoding, and sending methods
+
+ def _cap(self, value: float, min_value: float, max_value: float) -> float:
+ """Cap a given value to a minimum and maximum"""
+ return min(max(value, min_value), max_value)
+
+ # EVSE_Information
+
+ def decode_evse_information(self, data: bytes | bytearray) -> None:
+ stage, protocol, pins, max_current, _ = struct.unpack(' bytes:
+ return struct.pack(
+ ' None:
+ (soc,) = struct.unpack(' None:
+ """Callback provided to self._bus.send_periodic() for automatic update
+ of message content"""
+ msg.data = self.encode_ev_information()
+
+ # AC_Control
+
+ def decode_ac_control(self, data: bytes | bytearray) -> None:
+ (flags,) = struct.unpack(' bytes:
+ return struct.pack(' None:
+ (flags,) = struct.unpack(' None:
+ """Callback provided to self._bus.send_periodic() for automatic update
+ of message content"""
+ msg.data = self.encode_ac_status()
+
+ # DC_Control
+
+ def decode_dc_control(self, data: bytes | bytearray) -> None:
+ (flags,) = struct.unpack(' bytes:
+ return struct.pack(
+ ' None:
+ current_request, present_current, _, _ = struct.unpack(' None:
+ """Callback provided to self._bus.send_periodic() for automatic update
+ of message content"""
+ msg.data = self.encode_dc_status1()
+
+ # DC_Status2
+
+ def encode_dc_status2(self) -> bytes:
+ flags = 0
+ flags |= self.ev_dc_contactors_closed << 0
+ flags |= self.ev_dc_request_normal_stop << 1
+ return struct.pack(
+ ' None:
+ flags, battery_voltage, inlet_voltage = struct.unpack(' None:
+ """Callback provided to self._bus.send_periodic() for automatic update
+ of message content"""
+ msg.data = self.encode_dc_status2()
+
+
+# Wrappers around threads to implement simulated behaviours
+
+
+def call_later(callback: Callable[[], None], dt: float) -> Thread:
+ """Simple delayed execution of code"""
+
+ def wrapper() -> None:
+ try:
+ time.sleep(dt)
+ callback()
+ except Exception as ex: # noqa: BLE001
+ print(f'Got exception in call_later thread for {callback}.', str(ex))
+
+ thread = Thread(target=wrapper)
+ thread.start()
+
+ return thread
+
+
+def subdivide_dt(callback: Callable[[float, bool], None], dt: float, sub_dt: float) -> Thread:
+ """Calls a callback repeatedly, separated by time sub_dt, for a maximum total time of dt.
+ The callback has to take two arguments:
+ - A float (elapsed): This will be the time elapsed since the beginning.
+ - A boolean (done): Telling if the total time has elapsed (ie. True for the last call)
+
+ Guaranteed to have at least one call with done=False (unless dt=0).
+ Guaranteed to always have one call with done=True, and no subsequent call afterwards.
+ (Unless exception in callback)
+ """
+
+ def wrapper() -> None:
+ try:
+ started = time.monotonic()
+ now = started
+ ends_at = started + dt
+ while now < ends_at:
+ time.sleep(max(min(sub_dt, ends_at - now), 0))
+ now = time.monotonic()
+ callback(now - started, False) # noqa: FBT003
+ callback(now - started, True) # noqa: FBT003
+ except Exception as ex: # noqa: BLE001
+ print(f'Got exception in subdivide_dt thread for {callback}.', str(ex))
+
+ thread = Thread(target=wrapper)
+ thread.start()
+
+ return thread
+
+
+class Application:
+ """Main application class. Handles creation of various objects, and life cycle of it."""
+
+ def __init__(self, bus_config: can.typechecking.BusConfig, **interface_config: Any) -> None:
+ self._bus_config = bus_config
+ self._interface_config = interface_config
+
+ # Instantiation delayed on getter access
+ self._bus: can.BusABC | None = None
+ self._notifier: can.Notifier | None = None
+ self._simulator: Simulator | None = None
+
+ self.stop_event = Event()
+
+ @property
+ def bus(self) -> can.BusABC:
+ """A configured CAN bus object"""
+ if (bus := self._bus) is None:
+ bus = self._bus = can.Bus(**self._bus_config)
+
+ return bus
+
+ @property
+ def simulator(self) -> Simulator:
+ """Simulator instance"""
+ if (simulator := self._simulator) is None:
+ simulator = self._simulator = Simulator(self, **self._interface_config)
+
+ return simulator
+
+ @property
+ def notifier(self) -> can.Notifier:
+ """CAN notifier instance (ie. the main reading loop)"""
+ if (notifier := self._notifier) is None:
+ notifier = self._notifier = can.Notifier(self.bus, [self.simulator])
+
+ return notifier
+
+ # Application life cycle
+
+ def __enter__(self) -> Self:
+ """Enters a with-statement by starting the application and returns itself"""
+ self.start()
+ return self
+
+ def start(self) -> None:
+ """Instantiate bus, simulator, and notifier, and starts notifier up"""
+ self.stop_event.clear()
+ _ = self.notifier # Implicitly instantiation and start-up on getter access
+
+ ###
+
+ def run(self) -> None:
+ """Method blocking until the application stops, or is stopped by an exception.
+ NB: A Ctrl+C in the terminal will also work (raises a special exception)"""
+ try:
+ self.stop_event.wait()
+ except Exception:
+ self.stop_event.set()
+ raise
+
+ ###
+
+ def __exit__(
+ self,
+ exctype: type[BaseException] | None,
+ excinst: BaseException | None,
+ exctb: TracebackType | None,
+ ) -> bool:
+ """Exits a with-statement by shutting down the application (incl. closing the bus).
+ Does not handle any exception."""
+ self.shutdown()
+ return False
+
+ def shutdown(self) -> None:
+ """Stops the application by interrupting the run() blocking method,
+ stopping notifier, and closing CAN bus."""
+ self.stop_event.set()
+
+ if self._notifier is not None:
+ self._notifier.stop()
+ self._notifier = None
+
+ if self._bus is not None:
+ self._bus.shutdown()
+ self._bus = None
+
+
+def cli_main(
+ can_config: str = 'can.conf',
+ charger_dead_time: float = 1,
+ charger_voltage_ramp_up_slope: float = 200,
+ contactors_delay: float = 0.6,
+ maximum_energy_request: float = 75.530, # 100%
+ target_energy_request: float = 60.424, # 80%
+ minimum_energy_request: float = 22.659, # 30%
+ maximum_v2x_energy_request: float = 60.424, # 80%
+ minimum_v2x_energy_request: float = 22.659, # 30%
+ departure_time: int = 86400, # 24h in s
+) -> None:
+ """Simulator of BMS/vehicle side compatible with Advantics PEV Generic CAN interface v2"""
+ can_config_path = resources.files('advsimulators') / 'conf' / can_config
+ try:
+ bus_config = can.util.load_config(path=can_config_path)
+ except can.exceptions.CanInterfaceNotImplementedError as ex:
+ print(f'[red]ERROR:[/] Incorrect CAN configuration. {ex}.')
+ raise ex
+
+ with Application(
+ bus_config,
+ charger_dead_time=charger_dead_time,
+ charger_voltage_ramp_up_slope=charger_voltage_ramp_up_slope,
+ contactors_delay=contactors_delay,
+ maximum_energy_request=maximum_energy_request,
+ target_energy_request=target_energy_request,
+ minimum_energy_request=minimum_energy_request,
+ maximum_v2x_energy_request=maximum_v2x_energy_request,
+ minimum_v2x_energy_request=minimum_v2x_energy_request,
+ departure_time=departure_time,
+ ) as app:
+ app.run()
+
+
+def main() -> None:
+ typer.run(cli_main)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/charge-controllers/tutorials/PEV Generic Interface v2/pev-simulator.py b/charge-controllers/tutorials/advsimulators/pev/generic_v2.py
similarity index 90%
rename from charge-controllers/tutorials/PEV Generic Interface v2/pev-simulator.py
rename to charge-controllers/tutorials/advsimulators/pev/generic_v2.py
index ba6efe8..f699ec7 100644
--- a/charge-controllers/tutorials/PEV Generic Interface v2/pev-simulator.py
+++ b/charge-controllers/tutorials/advsimulators/pev/generic_v2.py
@@ -11,7 +11,7 @@
import struct
import time
from enum import IntEnum
-from pathlib import Path
+from importlib import resources
from threading import Event, Thread
from typing import TYPE_CHECKING
@@ -90,6 +90,7 @@ class InletPins(IntEnum):
CCS_AC_Three_Phase_Core = 3
CCS_DC_Core = 4
CCS_DC_Extended = 5
+ MCS = 6
class Simulator(can.Listener):
@@ -117,17 +118,17 @@ class Simulator(can.Listener):
command_close_contactors: bool
def __init__(
- self,
- app: Application,
- charger_dead_time: float = 1,
- charger_voltage_ramp_up_slope: float = 200,
- contactors_delay: float = 0.6,
- maximum_energy_request: float = 75.530, # 100%
- target_energy_request: float = 60.424, # 80%
- minimum_energy_request: float = 22.659, # 30%
- maximum_v2x_energy_request: float = 60.424, # 80%
- minimum_v2x_energy_request: float = 22.659, # 30%
- departure_time: int = 86400 # 24h in s
+ self,
+ app: Application,
+ charger_dead_time: float = 1,
+ charger_voltage_ramp_up_slope: float = 200,
+ contactors_delay: float = 0.6,
+ maximum_energy_request: float = 75.530, # 100%
+ target_energy_request: float = 60.424, # 80%
+ minimum_energy_request: float = 22.659, # 30%
+ maximum_v2x_energy_request: float = 60.424, # 80%
+ minimum_v2x_energy_request: float = 22.659, # 30%
+ departure_time: int = 86400, # 24h in s
) -> None:
self._app = app
self._bus = app.bus
@@ -296,27 +297,23 @@ def on_message_received(self, msg: can.Message) -> None:
self.decode_ev_extra_bpt_information(msg.data)
def update_state(self) -> None:
- match self.session_stage:
- case CommunicationStage.Waiting_For_EVSE:
- # This indicate we terminated a charge session (or controller just started).
- # Use it to reset our internal states.
- self.reset()
-
- case CommunicationStage.Precharge:
- print(f'Inlet voltage ramping up to {self.ev_dc_battery_voltage:.1f} V...')
- self._slope_start_voltage = self.ev_dc_inlet_voltage
- total_time = self._charger_dead_time + (
- (self.ev_dc_battery_voltage - self.ev_dc_inlet_voltage)
- / self._charger_voltage_ramp_up_slope
- )
- subdivide_dt(self.simulate_precharge, total_time, 0.1)
-
- case CommunicationStage.Welding_Detection:
- print('Setting present_current to 0 A')
- self.ev_dc_present_current = 0
-
- case _:
- pass
+ if self.session_stage == CommunicationStage.Waiting_For_EVSE:
+ # This indicates we terminated a charge session (or controller just started).
+ # Use it to reset our internal states.
+ self.reset()
+
+ elif self.session_stage == CommunicationStage.Precharge:
+ print(f'Inlet voltage ramping up to {self.ev_dc_battery_voltage:.1f} V...')
+ self._slope_start_voltage = self.ev_dc_inlet_voltage
+ total_time = self._charger_dead_time + (
+ (self.ev_dc_battery_voltage - self.ev_dc_inlet_voltage)
+ / self._charger_voltage_ramp_up_slope
+ )
+ subdivide_dt(self.simulate_precharge, total_time, 0.1)
+
+ elif self.session_stage == CommunicationStage.Welding_Detection:
+ print('Setting present_current to 0 A')
+ self.ev_dc_present_current = 0
def simulate_precharge(self, elapsed: float, done: bool) -> None: # noqa: FBT001
if elapsed <= self._charger_dead_time:
@@ -477,7 +474,10 @@ def encode_ev_energy_request(self) -> bytes:
)
def decode_ev_energy_request(self, data: bytes | bytearray) -> None:
- target_energy_request, minimum_energy_request, maximum_energy_request = struct.unpack(' None:
###
def __exit__(
- self,
- exctype: type[BaseException] | None,
- excinst: BaseException | None,
- exctb: TracebackType | None,
+ self,
+ exctype: type[BaseException] | None,
+ excinst: BaseException | None,
+ exctb: TracebackType | None,
) -> bool:
"""Exits a with-statement by shutting down the application (incl. closing the bus).
Does not handle any exception."""
@@ -660,38 +660,43 @@ def shutdown(self) -> None:
def cli_main(
- can_config: Path = Path('can.conf'),
- charger_dead_time: float = 1,
- charger_voltage_ramp_up_slope: float = 200,
- contactors_delay: float = 0.6,
- maximum_energy_request: float = 75.530, # 100%
- target_energy_request: float = 60.424, # 80%
- minimum_energy_request: float = 22.659, # 30%
- maximum_v2x_energy_request: float = 60.424, # 80%
- minimum_v2x_energy_request: float = 22.659, # 30%
- departure_time: int = 86400, # 24h in s
+ can_config: str = 'can.conf',
+ charger_dead_time: float = 1,
+ charger_voltage_ramp_up_slope: float = 200,
+ contactors_delay: float = 0.6,
+ maximum_energy_request: float = 75.530, # 100%
+ target_energy_request: float = 60.424, # 80%
+ minimum_energy_request: float = 22.659, # 30%
+ maximum_v2x_energy_request: float = 60.424, # 80%
+ minimum_v2x_energy_request: float = 22.659, # 30%
+ departure_time: int = 86400, # 24h in s
) -> None:
"""Simulator of BMS/vehicle side compatible with Advantics PEV Generic CAN interface v2"""
+ can_config_path = resources.files('advsimulators') / 'conf' / can_config
try:
- bus_config = can.util.load_config(path=can_config)
+ bus_config = can.util.load_config(path=can_config_path)
except can.exceptions.CanInterfaceNotImplementedError as ex:
print(f'[red]ERROR:[/] Incorrect CAN configuration. {ex}.')
raise typer.Abort from ex
with Application(
- bus_config,
- charger_dead_time=charger_dead_time,
- charger_voltage_ramp_up_slope=charger_voltage_ramp_up_slope,
- contactors_delay=contactors_delay,
- maximum_energy_request = maximum_energy_request,
- target_energy_request = target_energy_request,
- minimum_energy_request = minimum_energy_request,
- maximum_v2x_energy_request = maximum_v2x_energy_request,
- minimum_v2x_energy_request = minimum_v2x_energy_request,
- departure_time = departure_time,
+ bus_config,
+ charger_dead_time=charger_dead_time,
+ charger_voltage_ramp_up_slope=charger_voltage_ramp_up_slope,
+ contactors_delay=contactors_delay,
+ maximum_energy_request=maximum_energy_request,
+ target_energy_request=target_energy_request,
+ minimum_energy_request=minimum_energy_request,
+ maximum_v2x_energy_request=maximum_v2x_energy_request,
+ minimum_v2x_energy_request=minimum_v2x_energy_request,
+ departure_time=departure_time,
) as app:
app.run()
-if __name__ == '__main__':
+def main() -> None:
typer.run(cli_main)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/charge-controllers/tutorials/pyproject.toml b/charge-controllers/tutorials/pyproject.toml
new file mode 100644
index 0000000..331ef79
--- /dev/null
+++ b/charge-controllers/tutorials/pyproject.toml
@@ -0,0 +1,46 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "advsimulators"
+version="1.0.0"
+description = "Simulation tools for the Advantics CAN interfaces"
+requires-python = ">=3.9"
+authors = [
+ { name = "Axel Voitier", email = "axel.voitier@advantics.fr" },
+ { name = "Can Berk Durmus", email = "can.durmus@advantics.fr" },
+ { name = "Baptiste Pestourie", email = "baptiste.pestourie@advantics.fr" },
+]
+
+# Note: project-level dependencies are automatically installed in the default environement
+# and all the envs inheriting from it
+dependencies = [
+ "python-can>=4.4.2",
+ "typer==0.12.0",
+]
+
+[project.urls]
+Homepage = "https://github.com/ADVANTICS/examples"
+
+[tool.hatch.build.targets.wheel]
+packages = ["advsimulators"]
+include = [
+ "advsimulators/conf/*.conf",
+]
+
+[project.scripts]
+evse-simulator-v3="advsimulators.evse.generic_v3:main"
+pev-simulator-v1="advsimulators.pev.generic_v1:main"
+pev-simulator-v2="advsimulators.pev.generic_v2:main"
+
+
+[tool.mypy]
+namespace_packages = true
+explicit_package_bases= true
+ignore_missing_imports = true
+
+[tool.ruff.format]
+quote-style = "single"
+indent-style = "space"
+docstring-code-format = true
diff --git a/charge-controllers/tutorials/requirements.txt b/charge-controllers/tutorials/requirements.txt
new file mode 100644
index 0000000..7bc5553
--- /dev/null
+++ b/charge-controllers/tutorials/requirements.txt
@@ -0,0 +1,2 @@
+python-can >= 4.4.2
+typer ~= 0.12.0
diff --git a/charge-controllers/tutorials/simulate_generic_evse_v3.py b/charge-controllers/tutorials/simulate_generic_evse_v3.py
new file mode 100644
index 0000000..0821938
--- /dev/null
+++ b/charge-controllers/tutorials/simulate_generic_evse_v3.py
@@ -0,0 +1,8 @@
+"""
+Standalone script to run EVSE simulation locally without
+installing the package
+"""
+
+from advsimulators.evse.generic_v3 import main
+
+main()
diff --git a/charge-controllers/tutorials/simulate_generic_pev_v1.py b/charge-controllers/tutorials/simulate_generic_pev_v1.py
new file mode 100644
index 0000000..829c6d1
--- /dev/null
+++ b/charge-controllers/tutorials/simulate_generic_pev_v1.py
@@ -0,0 +1,8 @@
+"""
+Standalone script to run PEV simulation locally without
+installing the package
+"""
+
+from advsimulators.pev.generic_v1 import main
+
+main()
diff --git a/charge-controllers/tutorials/simulate_generic_pev_v2.py b/charge-controllers/tutorials/simulate_generic_pev_v2.py
new file mode 100644
index 0000000..b9d740d
--- /dev/null
+++ b/charge-controllers/tutorials/simulate_generic_pev_v2.py
@@ -0,0 +1,8 @@
+"""
+Standalone script to run PEV simulation locally without
+installing the package
+"""
+
+from advsimulators.pev.generic_v2 import main
+
+main()