Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
375 changes: 263 additions & 112 deletions doc/sphinx/stima_wifi/stimawifi_v3/stima_wifi_howto.rst

Large diffs are not rendered by default.

Binary file not shown.
Binary file not shown.
Binary file modified platformio/stima_v3/stimawifi/bin/lolin_c3_mini/firmware.bin
Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions platformio/stima_v3/stimawifi/bin/lolin_s3_mini/README.upload
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
esptool --chip esp32s3 --port "/dev/ttyACM0" --baud 460800 --before default-reset --after hard-reset write-flash -z --flash-mode dio --flash-freq 80m --flash-size detect 0x0000 bootloader.bin 0x8000 partitions.bin 0xe000 boot_app0.bin 0x10000 firmware.bin

Binary file not shown.
Binary file not shown.
Binary file modified platformio/stima_v3/stimawifi/bin/lolin_s3_mini/firmware.bin
Binary file not shown.
Binary file not shown.
17 changes: 14 additions & 3 deletions platformio/stima_v3/stimawifi/extra_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,20 @@
# Custom BIN from ELF
env.AddPostAction(
"$BUILD_DIR/${PROGNAME}.bin",
env.VerboseAction(" ".join([
env.VerboseAction(" ".join(
[
"mkdir", "-p", "$PROJECT_DIR/bin/${PIOENV};",
"cp",
"$BUILD_DIR/${PROGNAME}.bin",
"$PROJECT_DIR/bin/${PIOENV}/firmware.bin"
]), "Building $TARGET"))
"$PROJECT_DIR/bin/${PIOENV}/firmware.bin;"
"cp",
"$BUILD_DIR/bootloader.bin",
"$PROJECT_DIR/bin/${PIOENV}/;"
"cp",
"$BUILD_DIR/partitions.bin",
"$PROJECT_DIR/bin/${PIOENV}/;"
"cp",
"$PROJECT_PACKAGES_DIR/framework-arduinoespressif32/tools/partitions/boot_app0.bin",
"$PROJECT_DIR/bin/${PIOENV}/"
]
), "Building $TARGET"))
7 changes: 4 additions & 3 deletions platformio/stima_v3/stimawifi/include/sensors_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
\def SENSOR_MAX> 1
\brief Max number of sensor.
Max value here is 25 for stima atmega1284
NOTE: used to define queue len
*/
#if portNUM_PROCESSORS > 1
#define SENSORS_MAX (25)
#define SENSORS_MAX (10)
#else
#define SENSORS_MAX (3)
#endif
Expand All @@ -39,10 +40,10 @@ Max value here is 25 for stima atmega1284
\def SENSOR_UNIQUE_MAX
\brief Max number of unique sensor (Stima modules).
unique sensors are sensors that can have more driver but only one i2c address and only one setup and prepare
Max value here is 10
Max value here is 10 for stima atmega1284
*/
#if portNUM_PROCESSORS > 1
#define SENSORS_UNIQUE_MAX (10)
#define SENSORS_UNIQUE_MAX (5)
#else
#define SENSORS_UNIQUE_MAX (3)
#endif
Expand Down
28 changes: 17 additions & 11 deletions platformio/stima_v3/stimawifi/include/stimawifi_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
#define STIMAWIFI_CONFIG_H_

// increment on change
#define SOFTWARE_VERSION "2026-03-31T00:00" // date and time iso format
#define MAJOR_VERSION "20260331" // date YYYYMMDD
#define SOFTWARE_VERSION "2026-04-23T00:00" // date and time iso format
#define MAJOR_VERSION "20260423" // date YYYYMMDD
#define MINOR_VERSION "0" // time HHMM without leading 0

// SSID and password of WiFi for setup
Expand Down Expand Up @@ -100,20 +100,26 @@
#define SENSORDRIVER_META_LEN 30

// define parameter for queues len and communication
//#define DATA_BURST (SENSORS_MAX*VALUES_TO_READ_FROM_SENSOR_COUNT)

# if portNUM_PROCESSORS > 1
#define DATA_BURST (150)
#define DATA_BURST (SENSORS_MAX*VALUES_TO_READ_FROM_SENSOR_COUNT) // max burst of messages
#define DB_QUEUE_LEN (DATA_BURST*2)
#define MQTT_QUEUE_LEN (DATA_BURST*3)
#define RECOVERY_QUEUE_LEN (DATA_BURST*2)
#else
#define DATA_BURST (15)
#define DATA_BURST (15) // tipic burst of messages
#define DB_QUEUE_LEN (DATA_BURST)
#define MQTT_QUEUE_LEN (DATA_BURST*2)
#define RECOVERY_QUEUE_LEN (DATA_BURST)
#endif
#define DATA_BURST_RECOVERY (DATA_BURST)

#define DB_QUEUE_LEN (DATA_BURST)
#define MQTT_QUEUE_LEN (DATA_BURST*3)
#define DATA_BURST_RECOVERY (DATA_BURST) // messages in recovery packet

#define MQTT_QUEUE_SPACELEFT_MEASURE (DATA_BURST)
#define MQTT_QUEUE_SPACELEFT_PUBLISH (DATA_BURST/2)
#define MQTT_QUEUE_SPACELEFT_RECOVERY (DATA_BURST*2)
//This is mechanism that prevents a system from becoming overwhelmed
//when data arrives faster than it can be processed (queue pressure)
#define QUEUE_SPACELEFT_MEASURE (0) // limit for enqueue messages by measure thread
#define QUEUE_SPACELEFT_PUBLISH (DATA_BURST) // limit for publish thread to ignore incoming message and send it to db
#define QUEUE_SPACELEFT_RECOVERY (0) // limit for enqueue recovered messages by db thread

// SD card SPI PIN assignment
// Micro SD Card Shield
Expand Down
43 changes: 18 additions & 25 deletions platformio/stima_v3/stimawifi/src/db_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,15 @@ bool dbThread::archive_recovery(){

case ARCHIVE_RECOVERY_PUBLISH:

if (data->mqttqueue->NumSpacesLeft() < MQTT_QUEUE_SPACELEFT_RECOVERY){
if (data->recoveryqueue->NumSpacesLeft() <= QUEUE_SPACELEFT_RECOVERY){
data->logger->warning(F("db archive recovery no space in mqtt queue"));
archive_recovery_run=false;
break;
}

data->logger->notice(F("db archive recovery queuing message for publish %s:%s"),archive_recovery_message.topic,archive_recovery_message.payload);
archive_recovery_message.sent=true; // prevent publish task requeue for archive
if(!data->mqttqueue->Enqueue(&archive_recovery_message,pdMS_TO_TICKS(0))){
if(!data->recoveryqueue->Enqueue(&archive_recovery_message,pdMS_TO_TICKS(0))){
data->logger->warning(F("db archive recovery message for publish not queued"));
archive_recovery_run=false;
break;
Expand Down Expand Up @@ -655,8 +655,8 @@ bool dbThread::data_recovery(void){
// return false;
//}

if (data->mqttqueue->NumSpacesLeft() < MQTT_QUEUE_SPACELEFT_RECOVERY){
data->logger->warning(F("db recovery no space in mqtt queue"));
if (data->recoveryqueue->NumSpacesLeft() <= QUEUE_SPACELEFT_RECOVERY){
data->logger->warning(F("db recovery no space in recovery queue"));
return true;
}

Expand Down Expand Up @@ -705,7 +705,7 @@ bool dbThread::data_recovery(void){
strcpy(message.payload, (const char*)sqlite3_column_text(stmt, 2));

data->logger->notice(F("db recovery queuing message for publish %s:%s"),message.topic,message.payload);
if(!data->mqttqueue->Enqueue(&message,pdMS_TO_TICKS(0))){
if(!data->recoveryqueue->Enqueue(&message,pdMS_TO_TICKS(0))){
data->logger->warning(F("db recovery message for publish not queued"));
}
} else if(rc == SQLITE_DONE) {
Expand Down Expand Up @@ -833,8 +833,6 @@ bool dbThread::doDb(const mqttMessage_t& message) {
int rc;
char sql[] = "INSERT OR REPLACE INTO messages VALUES (?, strftime('%s',?), ?, ?)";
sqlite3_stmt* stmt; // will point to prepared stamement object

data->logger->notice(F("db spaceleft in mqtt queue %d"),data->mqttqueue->NumSpacesLeft());

sqlite3_prepare_v2(
db, // the handle to your (opened and ready) database
Expand Down Expand Up @@ -901,9 +899,6 @@ bool dbThread::doDb(const mqttMessage_t& message) {
sqlite_status=true;
data->status->database=ok;
data->logger->notice(F("db Data saved on SD %s:%s"),message.topic,message.payload);

mqttMessage_t tmpmsg;
data->dbqueue->Dequeue(&tmpmsg, pdMS_TO_TICKS( 0 )); // all done: dequeue the message

return true;
}
Expand Down Expand Up @@ -1176,12 +1171,15 @@ void dbThread::Run() {
// SetPriority(basePriority);

for(;;){



while (data->dbqueue->Peek(&message, pdMS_TO_TICKS( 1000 ))){ // peek one message
if (!doDb(message)) return; // return and terminate task if fatal error
if (!sqlite_status) sqlite_status = db_restart(); // try to restart SD card and sqlite
if (doDb(message)){
data->dbqueue->Dequeue(&message, pdMS_TO_TICKS( 0 )); // all done: dequeue the message
}
//threadPublish.mqttDisconnect();
if (!sqlite_status) esp_system_abort("SD do not restart; REBOOT");
if (!sqlite_status) sqlite_status = db_restart(); // try to restart SD card and sqlite
if (!sqlite_status) esp_system_abort("SD do not restart; REBOOT"); // return and terminate task if fatal error
}


Expand All @@ -1192,33 +1190,28 @@ void dbThread::Run() {
}

// check queue for rpc recovey
if(data->recoveryqueue->Dequeue(&rpcrecovery, pdMS_TO_TICKS( 0 )))recovery();
if(data->rpcRecoveryqueue->Dequeue(&rpcrecovery, pdMS_TO_TICKS( 0 )))recovery();

uint8_t basePriority = GetPriority();
SetPriority(TASK_BASE_PRIORITY); // slow down task; this take a long time and we do not want to freeze everything
while (db_recovery()); // run one subquery (one time step) of the big recovery on DB
while (archive_recovery()); // run until the mqtt queue is full (less then MQTT_QUEUE_SPACELEFT_RECOVERY)
while (archive_recovery()); // run until the mqtt queue is full (less/equal then QUEUE_SPACELEFT_RECOVERY)
SetPriority(basePriority);

while (data->dbqueue->Peek(&message, pdMS_TO_TICKS( 1000 ))){ // peek one message
if (!doDb(message)) return; // return and terminate task if fatal error
if (!sqlite_status) sqlite_status = db_restart(); // try to restart SD card and sqlite
//threadPublish.mqttDisconnect();
if (!sqlite_status) esp_system_abort("SD do not restart; REBOOT");
}


if (getArchiveRecoveryState() == ARCHIVE_RECOVERY_NONE and
getDbRecoveryState() == DB_RECOVERY_NONE) {
// check semaphore for data recovey
if(data->recoverysemaphore->Take(0)){
if(data->rpcRecoverysemaphore->Take(0)){
if (timeStatus() == timeSet) {
if (!data_purge()) data->logger->error(F("db purge DB")); // migrate old data to archive
}
if(!data_recovery()) data->logger->error(F("db recovery DB")); // try to publish unsent messages
}
}

data->logger->notice(F("db db queue space left %d"),data->dbqueue->NumSpacesLeft());
data->logger->notice(F("db recovery queue space left %d"),data->recoveryqueue->NumSpacesLeft());

// checks for heap and stack
//data->logger->notice(F("HEAP: %l"),esp_get_minimum_free_heap_size());
if( esp_get_minimum_free_heap_size() < HEAP_MIN_WARNING){
Expand Down
6 changes: 3 additions & 3 deletions platformio/stima_v3/stimawifi/src/db_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ struct db_data_t {
int id;
frtosLogging* logger;
Queue* dbqueue;
Queue* mqttqueue;
BinarySemaphore* recoverysemaphore;
BinaryQueue* recoveryqueue;
Queue* recoveryqueue;
BinarySemaphore* rpcRecoverysemaphore;
BinaryQueue* rpcRecoveryqueue;
dbStatus_t* status;
station_t* station;
File* logFile;
Expand Down
4 changes: 2 additions & 2 deletions platformio/stima_v3/stimawifi/src/measure_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void measureThread::enqueueMqttMessage(uint8_t i ) {


// if there are enough space left on the publish queue send it
if (data->mqttqueue->NumSpacesLeft() > MQTT_QUEUE_SPACELEFT_MEASURE){
if (data->mqttqueue->NumSpacesLeft() > QUEUE_SPACELEFT_MEASURE){
data->logger->notice(F("measure enqueue for mqtt: %s ; %s"), mqtt_message.topic, mqtt_message.payload);
if(!data->mqttqueue->Enqueue(&mqtt_message,pdMS_TO_TICKS(0))){
data->logger->error(F("measure enqueue for mqtt: %s ; %s"), mqtt_message.topic, mqtt_message.payload);
Expand Down Expand Up @@ -253,7 +253,7 @@ void measureThread::doMeasure() {

// check queue for rpc calibrate
rpcCalibrate_t rpccalibrate;
if(data->calibratequeue->Dequeue(&rpccalibrate, pdMS_TO_TICKS( 0 ))){
if(data->rpccalibratequeue->Dequeue(&rpccalibrate, pdMS_TO_TICKS( 0 ))){
for (uint8_t i = 0; i < data->sensors_count; i++) {
if ( strcmp(rpccalibrate.type,sensorm[i].getSensorDriver()->getType())==0) {
uint8_t rc = sensorm[i].sensor->setForcedRecalibrationFactor(rpccalibrate.value);
Expand Down
2 changes: 1 addition & 1 deletion platformio/stima_v3/stimawifi/src/measure_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct measure_data_t { // thread communication data
summarydata_t* summarydata;
MutexStandard* i2cmutex;
georef_t* georef;
BinaryQueue* calibratequeue;
BinaryQueue* rpccalibratequeue;
sensor_t sensors[SENSORS_MAX];
uint8_t sensors_count;
};
Expand Down
33 changes: 19 additions & 14 deletions platformio/stima_v3/stimawifi/src/publish_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void publishThread::Run() {
for(;;){

// if there are no enough space left on the mqtt queue send it to the DB
while (data->mqttqueue->NumSpacesLeft() < MQTT_QUEUE_SPACELEFT_PUBLISH){
while (data->mqttqueue->NumSpacesLeft() <= QUEUE_SPACELEFT_PUBLISH){
store();
data->status->publish.publish=error;
}
Expand Down Expand Up @@ -123,15 +123,24 @@ void publishThread::Run() {
}

if (status_connected){
mqttMessage_t mqttMessage;
// wait for message and peek it from the queue
while (data->mqttqueue->Peek(&mqttMessage, pdMS_TO_TICKS( 1000 ))){
// publish message
if (!doPublish()) break;
mqttMessage_t message;
data->mqttqueue->Peek(&message, pdMS_TO_TICKS( 1000 )); // wait for a new incoming message
while(data->mqttqueue->Peek(&message, pdMS_TO_TICKS( 0 ))){
if (!doPublish(message)) break; // publish message
data->mqttqueue->Dequeue(&message, pdMS_TO_TICKS( 0 ));
}
while (data->recoveryqueue->Peek(&message, pdMS_TO_TICKS( 0 ))){
if (!doPublish(message)) break; // publish message
data->recoveryqueue->Dequeue(&message, pdMS_TO_TICKS( 0 ));
while(data->mqttqueue->Peek(&message, pdMS_TO_TICKS( 0 ))){
if (!doPublish(message)) break; // publish message
data->mqttqueue->Dequeue(&message, pdMS_TO_TICKS( 0 ));
}
}
}

data->logger->notice(F("publish mqtt queue space left %d"),data->mqttqueue->NumSpacesLeft());
data->logger->notice(F("publish mqtt queue space left %d"),data->mqttqueue->NumSpacesLeft());
data->logger->notice(F("publish recovery queue space left %d"),data->recoveryqueue->NumSpacesLeft());

// check heap and stack
//data->logger->notice(F("HEAP: %l"),esp_get_minimum_free_heap_size());
Expand Down Expand Up @@ -604,13 +613,9 @@ void publishThread::store() {

// try to send message to the broker
// send the same message to the queue for DB with flag to describe if publish is completed with success
bool publishThread::doPublish() {
bool publishThread::doPublish(mqttMessage_t mqtt_message) {

bool rc=false;

mqttMessage_t mqtt_message;
data->mqttqueue->Dequeue(&mqtt_message, pdMS_TO_TICKS( 0 ));

bool resend = mqtt_message.sent;

if(mqttPublish(mqtt_message,false)){
Expand Down Expand Up @@ -710,7 +715,7 @@ int calibrateRpc(JsonObject params, JsonObject result) {
strncpy(rpccalibrate.type,type,4);
rpccalibrate.value = params["value"];

if(publishThread::global_data->calibratequeue->Enqueue(&rpccalibrate)){
if(publishThread::global_data->rpcCalibratequeue->Enqueue(&rpccalibrate)){
publishThread::global_data->logger->notice(F("enqueue rpc calibrate : %s ; %d"), rpccalibrate.type, rpccalibrate.value);
result[F("state")] = F("done");
return 0;
Expand Down Expand Up @@ -767,7 +772,7 @@ int recoveryDataRpc(JsonObject params, JsonObject result) {
//strcpy(rpcrecovery.dtstart,"2024-05-09T00:00:00") ;
//strcpy(rpcrecovery.dtend, "2024-05-09T01:00:00") ;

if(publishThread::global_data->recoveryqueue->Enqueue(&rpcrecovery)){
if(publishThread::global_data->rpcRecoveryqueue->Enqueue(&rpcrecovery)){
publishThread::global_data->logger->notice(F("enqueue rpc recovery : %s ; %s"), rpcrecovery.dtstart, rpcrecovery.dtend);
result[F("state")] = F("done");
return 0;
Expand Down
7 changes: 4 additions & 3 deletions platformio/stima_v3/stimawifi/src/publish_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ struct publish_data_t {
frtosLogging* logger;
Queue* mqttqueue;
Queue* dbqueue;
BinaryQueue* recoveryqueue;
BinaryQueue* calibratequeue;
Queue* recoveryqueue;
BinaryQueue* rpcRecoveryqueue;
BinaryQueue* rpcCalibratequeue;
stimawifiStatus_t* status;
station_t* station;
};
Expand Down Expand Up @@ -52,7 +53,7 @@ class publishThread : public Thread {
void reset_status_summary();
bool publish_constantdata();
void store();
bool doPublish();
bool doPublish(mqttMessage_t message);
publish_data_t* data;
IPStack ipstack;
MQTT::Client<IPStack, Countdown, MQTT_PACKET_SIZE, 2 > mqttclient;
Expand Down
19 changes: 10 additions & 9 deletions platformio/stima_v3/stimawifi/src/stimawifi.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,24 @@ gps_i2c_data_t gps_i2c_data={1,&frtosLog,&stimawifiStatus.gps,&georef,&frtosRTC,
gpsI2cThread threadGpsI2c(&gps_i2c_data);
#endif

Queue dbQueue(DB_QUEUE_LEN,sizeof(mqttMessage_t)); // ~ 1 minutes queue
Queue mqttQueue(MQTT_QUEUE_LEN,sizeof(mqttMessage_t)); // ~ 1.5 minutes queue
BinaryQueue recoveryQueue(sizeof(rpcRecovery_t));
BinaryQueue calibrateQueue(sizeof(rpcCalibrate_t));
BinarySemaphore recoverySemaphore(false);
Queue dbQueue(DB_QUEUE_LEN,sizeof(mqttMessage_t));
Queue recoveryQueue(RECOVERY_QUEUE_LEN,sizeof(mqttMessage_t));
Queue mqttQueue(MQTT_QUEUE_LEN,sizeof(mqttMessage_t));
BinaryQueue rpcRecoveryQueue(sizeof(rpcRecovery_t));
BinaryQueue rpcCalibrateQueue(sizeof(rpcCalibrate_t));
BinarySemaphore rpcRecoverySemaphore(false);
#if (ENABLE_SDCARD_LOGGING)
db_data_t db_data={1,&frtosLog,&dbQueue,&mqttQueue,&recoverySemaphore,&recoveryQueue,&stimawifiStatus.db,&station,&logFile};
db_data_t db_data={1,&frtosLog,&dbQueue,&recoveryQueue,&rpcRecoverySemaphore,&rpcRecoveryQueue,&stimawifiStatus.db,&station,&logFile};
#else
db_data_t db_data={1,&frtosLog,&dbQueue,&mqttQueue,&recoverySemaphore,&recoveryQueue,&stimawifiStatus.db,&station,NULL};
db_data_t db_data={1,&frtosLog,&dbQueue,&recoveryQueue,&rpcRecoverySemaphore,&rpcRecoveryQueue,&stimawifiStatus.db,&station,NULL};
#endif

dbThread threadDb(&db_data);

measure_data_t measure_data={1,&frtosLog,&mqttQueue,&dbQueue,&stimawifiStatus.measure,&station,&summarydata,&i2cmutex,&georef,&calibrateQueue};
measure_data_t measure_data={1,&frtosLog,&mqttQueue,&dbQueue,&stimawifiStatus.measure,&station,&summarydata,&i2cmutex,&georef,&rpcCalibrateQueue};
measureThread threadMeasure(&measure_data);

publish_data_t publish_data={1,&frtosLog,&mqttQueue,&dbQueue,&recoveryQueue,&calibrateQueue,&stimawifiStatus,&station};
publish_data_t publish_data={1,&frtosLog,&mqttQueue,&dbQueue,&recoveryQueue,&rpcRecoveryQueue,&rpcCalibrateQueue,&stimawifiStatus,&station};
publishThread threadPublish(&publish_data);

#if defined(ARDUINO_LOLIN_C3_MINI)
Expand Down
Loading
Loading