19
19
#include < mutex>
20
20
#include < fcntl.h>
21
21
22
- #define RSCP2P " 1.5 "
23
- #define RSCP2P_LONG " 1.5 .3.33 "
22
+ #define RSCP2P " 1.6 "
23
+ #define RSCP2P_LONG " 1.6 .3.34 "
24
24
25
25
#define AES_KEY_SIZE 32
26
26
#define AES_BLOCK_SIZE 32
@@ -739,15 +739,15 @@ void pushNotSupportedTag(uint32_t container, uint32_t tag) {
739
739
return ;
740
740
}
741
741
742
- bool existsAdditionalTag (uint32_t container, uint32_t tag, int index) {
742
+ bool existsAdditionalTag (uint32_t container, uint32_t tag, int index, int order ) {
743
743
for (std::vector<RSCP_MQTT::additional_tags_t >::iterator it = RSCP_MQTT::AdditionalTags.begin (); it != RSCP_MQTT::AdditionalTags.end (); ++it) {
744
- if ((it->req_container == container) && (it->req_tag == tag) && (it->req_index == index )) return (true );
744
+ if ((it->req_container == container) && (it->req_tag == tag) && (it->req_index == index ) && (it-> order == order) ) return (true );
745
745
}
746
746
return (false );
747
747
}
748
748
749
749
void pushAdditionalTag (uint32_t req_container, uint32_t req_tag, int req_index, int order, bool one_shot) {
750
- if (existsAdditionalTag (req_container, req_tag, req_index)) return ;
750
+ if (existsAdditionalTag (req_container, req_tag, req_index, order )) return ;
751
751
RSCP_MQTT::additional_tags_t v;
752
752
v.req_container = req_container;
753
753
v.req_tag = req_tag;
@@ -759,38 +759,40 @@ void pushAdditionalTag(uint32_t req_container, uint32_t req_tag, int req_index,
759
759
return ;
760
760
}
761
761
762
- bool updateRawData (char *topic, char *payload) {
762
+ void initRawData () {
763
+ for (std::vector<RSCP_MQTT::raw_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
764
+ it->handled = false ;
765
+ it->changed = false ;
766
+ }
767
+ return ;
768
+ }
769
+
770
+ int mergeRawData (char *topic, char *payload, bool *changed) {
771
+ int i = 0 ;
763
772
if (topic && payload) {
764
- for (std::vector<RSCP_MQTT::mqtt_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
765
- if (!strcmp (it->topic , topic)) {
773
+ for (std::vector<RSCP_MQTT::raw_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
774
+ if (!strcmp (it->topic , topic) && !it-> handled && (i == it-> nr ) ) {
766
775
if (strcmp (it->payload , payload)) {
767
776
if (strlen (it->payload ) != strlen (payload)) it->payload = (char *)realloc (it->payload , strlen (payload) + 1 );
768
777
strcpy (it->payload , payload);
778
+ it->changed = true ;
779
+ *changed = true ;
769
780
}
770
- return (true );
781
+ it->handled = true ;
782
+ return (i);
771
783
}
784
+ if (!strcmp (it->topic , topic) && it->handled ) i++;
772
785
}
773
- }
774
- return (false );
775
- }
776
-
777
- void insertRawData (char *topic, char *payload) {
778
- if (topic && payload) {
779
- RSCP_MQTT::mqtt_data_t v;
786
+ RSCP_MQTT::raw_data_t v;
780
787
v.topic = strdup (topic);
781
788
v.payload = strdup (payload);
789
+ v.handled = true ;
790
+ v.changed = true ;
791
+ v.nr = i;
782
792
RSCP_MQTT::rawData.push_back (v);
793
+ *changed = true ;
783
794
}
784
- return ;
785
- }
786
-
787
- char *readRawData (char *topic) {
788
- for (std::vector<RSCP_MQTT::mqtt_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
789
- if (!strcmp (it->topic , topic)) {
790
- return (it->payload );
791
- }
792
- }
793
- return (NULL );
795
+ return (i);
794
796
}
795
797
796
798
void refreshCache (std::vector<RSCP_MQTT::cache_t > & v, char *payload) {
@@ -942,6 +944,13 @@ float getFloatValue(std::vector<RSCP_MQTT::cache_t> & c, uint32_t container, uin
942
944
return (value);
943
945
}
944
946
947
+ void resetHandleFlag (std::vector<RSCP_MQTT::cache_t > & c) {
948
+ for (std::vector<RSCP_MQTT::cache_t >::iterator it = c.begin (); it != c.end (); ++it) {
949
+ it->handled = false ;
950
+ }
951
+ return ;
952
+ }
953
+
945
954
void preparePayload (RscpProtocol *protocol, SRscpValue *response, char **buf) {
946
955
switch (response->dataType ) {
947
956
case RSCP::eTypeBool: {
@@ -1000,8 +1009,7 @@ int storeResponseValue(std::vector<RSCP_MQTT::cache_t> & c, RscpProtocol *protoc
1000
1009
int rc = -1 ;
1001
1010
1002
1011
for (std::vector<RSCP_MQTT::cache_t >::iterator it = c.begin (); it != c.end (); ++it) {
1003
- if ((it->container > container) && (it->tag > response->tag )) break ;
1004
- if ((!it->container || (it->container == container)) && (it->tag == response->tag ) && (it->index == index )) {
1012
+ if ((!it->container || (it->container == container)) && (it->tag == response->tag ) && (it->index == index ) && !it->handled ) {
1005
1013
switch (response->dataType ) {
1006
1014
case RSCP::eTypeBool: {
1007
1015
if (protocol->getValueAsBool (response)) strcpy (buf, " true" );
@@ -1144,6 +1152,8 @@ int storeResponseValue(std::vector<RSCP_MQTT::cache_t> & c, RscpProtocol *protoc
1144
1152
if ((atoi (it->payload ) == 0 ) && (battery_soc > 1 )) snprintf (it->payload , PAYLOAD_SIZE, " %d" , battery_soc--);
1145
1153
else battery_soc = atoi (it->payload );
1146
1154
}
1155
+ it->handled = true ;
1156
+ break ;
1147
1157
}
1148
1158
}
1149
1159
return (rc);
@@ -1970,21 +1980,24 @@ void createRequest(SRscpFrameBuffer * frameBuffer) {
1970
1980
return ;
1971
1981
}
1972
1982
1973
- void publishRaw (RscpProtocol *protocol, SRscpValue *response, char *topic) {
1974
- char *payload_new = (char *)malloc (PAYLOAD_SIZE * sizeof (char ) + 1 );
1975
- char *payload_old = readRawData (topic);
1976
- memset (payload_new, 0 , PAYLOAD_SIZE);
1977
- preparePayload (protocol, response, &payload_new);
1978
- if (payload_old && payload_new && strcmp (payload_new, " " ) && strcmp (payload_old, payload_new)) {
1979
- publishImmediately (topic, payload_new);
1980
- updateRawData (topic, payload_new);
1981
- } else if (!payload_old && payload_new && strcmp (payload_new, " " )) {
1982
- publishImmediately (topic, payload_new);
1983
- insertRawData (topic, payload_new);
1984
- }
1985
- if (payload_new) free (payload_new);
1986
- return ;
1987
- }
1983
+ void publishRaw (RscpProtocol *protocol, SRscpValue *response, char *topic_in) {
1984
+ char topic[TOPIC_SIZE];
1985
+ char *payload = (char *)malloc (PAYLOAD_SIZE * sizeof (char ) + 1 );
1986
+ bool changed = false ;
1987
+ memset (payload, 0 , PAYLOAD_SIZE);
1988
+ preparePayload (protocol, response, &payload);
1989
+
1990
+ int nr = mergeRawData (topic_in, payload, &changed);
1991
+ if (nr > 0 ) {
1992
+ if (snprintf (topic, TOPIC_SIZE, " %s/%d" , topic_in, nr) >= TOPIC_SIZE) {
1993
+ logMessage (cfg.logfile , (char *)__FILE__, __LINE__, (char *)" publishRaw: Buffer overflow\n " );
1994
+ return ;
1995
+ }
1996
+ if (changed) publishImmediately (topic, payload);
1997
+ } else if (changed) publishImmediately (topic_in, payload);
1998
+ if (payload) free (payload);
1999
+ return ;
2000
+ }
1988
2001
1989
2002
void handleRaw (RscpProtocol *protocol, SRscpValue *response, uint32_t *cache, int level) {
1990
2003
int l = level + 1 ;
@@ -2460,6 +2473,7 @@ static int processReceiveBuffer(const unsigned char * ucBuffer, int iLength) {
2460
2473
battery_nr = 0 ;
2461
2474
pm_nr = 0 ;
2462
2475
wb_nr = 0 ;
2476
+ if (cfg.raw_mode ) initRawData ();
2463
2477
for (size_t i = 0 ; i < frame.data .size (); i++)
2464
2478
handleResponseValue (&protocol, &frame.data [i]);
2465
2479
@@ -2591,6 +2605,8 @@ static void mainLoop(void) {
2591
2605
2592
2606
gettimeofday (&start, NULL );
2593
2607
2608
+ resetHandleFlag (RSCP_MQTT::RscpMqttCache);
2609
+
2594
2610
// create an RSCP frame with requests to some example data
2595
2611
createRequest (&frameBuffer);
2596
2612
0 commit comments