77 #include <PJONDefines.h>
78 #include <ReconnectingMqttClient.h>
83 #define MQTTT_DEFAULT_PORT 1883
84 #ifndef MQTTT_RESPONSE_TIMEOUT
85 #define MQTTT_RESPONSE_TIMEOUT (uint32_t) 10000
89 #ifndef MQTTT_BUFFER_SIZE
90 #define MQTTT_BUFFER_SIZE (uint32_t) PJON_PACKET_MAX_LENGTH
94 #ifndef MQTTT_KEY_SIZE
95 #define MQTTT_KEY_SIZE 15
97 #ifndef MQTTT_VALUE_SIZE
98 #define MQTTT_VALUE_SIZE 15
101 #define MQTTT_MODE_BUS_RAW 0
102 #define MQTTT_MODE_BUS_JSON 1
103 #define MQTTT_MODE_MIRROR_TRANSLATE 2
104 #define MQTTT_MODE_MIRROR_DIRECT 3
108 #define MQTTT_MODE MQTTT_MODE_BUS_RAW
112 #ifndef MQTTT_TRANSLATION_TABLE_SIZE
113 #define MQTTT_TRANSLATION_TABLE_SIZE 5
117 #ifndef MQTTT_RECEIVE_TIME
118 #define MQTTT_RECEIVE_TIME 0
121 #if defined(MQTTT_USE_MAC) && ((MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE) || (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT))
127 bool last_send_success =
false;
129 uint16_t incoming_packet_size = 0;
131 uint8_t packet_buffer[MQTTT_BUFFER_SIZE];
137 char *add_mac(
char *p)
139 sprintf(p,
"/%2X%2X%2X%2X%2X%2X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
145 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
146 char key[MQTTT_KEY_SIZE];
147 char value[MQTTT_VALUE_SIZE];
149 uint8_t translation_count = 0;
150 char pjon_keys[MQTTT_TRANSLATION_TABLE_SIZE][MQTTT_KEY_SIZE];
151 char mqtt_keys[MQTTT_TRANSLATION_TABLE_SIZE][MQTTT_KEY_SIZE];
153 bool translate(
char *key, uint8_t len,
bool to_mqtt)
155 for (uint8_t i=0; i<translation_count; i++) {
156 if (strcmp(key, to_mqtt ? pjon_keys[i] : mqtt_keys[i]) == 0) {
157 strncpy(key, to_mqtt ? mqtt_keys[i] : pjon_keys[i],
min(len, MQTTT_KEY_SIZE));
166 static void static_receiver(
const char *topic,
const uint8_t *payload, uint16_t len,
167 void *callback_object)
169 if (callback_object) {
170 ((
MQTTTranslate*)callback_object)->receiver(topic, payload, len);
174 #if (MQTTT_MODE == MQTTT_MODE_BUS_JSON)
175 bool find_next_json_key(
const char **p,
const char *
last)
177 while (**p && *p <
last && **p !=
'{' && **p !=
',') {
180 if (!**p || *p >=
last) {
184 while (**p && *p <
last && **p !=
'\"') {
187 if (**p !=
'\"' || *p >=
last) {
194 bool find_next_json_value(
const char **p,
const char *
last)
196 while (**p && *p <
last && **p !=
':') {
199 if (!**p || *p >=
last) {
203 while (**p && *p <
last && (**p ==
' ' || **p ==
'\t')) {
206 if (!**p || *p >=
last) {
209 if (**p ==
'\"' && *p <
last) {
216 void receiver(
const char *topic,
const uint8_t *payload, uint16_t len)
218 #if (MQTTT_MODE == MQTTT_MODE_BUS_RAW)
219 if(len <= MQTTT_BUFFER_SIZE) {
220 memcpy(packet_buffer, payload, len);
221 incoming_packet_size = len;
224 #if (MQTTT_MODE == MQTTT_MODE_BUS_JSON)
227 uint8_t sender_id = 0, receiver_id = 0;
228 const char *p = (
const char*)payload, *
last = p+len-1;
230 while (find_next_json_key(&p,
last)) {
231 if (strncmp(p,
"to\"", 3)==0 && find_next_json_value(&p,
last)) {
232 receiver_id = atoi(p);
233 }
else if (strncmp(p,
"from\"", 5)==0 && find_next_json_value(&p,
last)) {
235 }
else if (strncmp(p,
"data\"", 5)==0 && find_next_json_value(&p,
last)) {
237 while (p2-(
const char*)payload+1 < len && *p2 && *p2 !=
'\"') {
242 payload = (uint8_t*)p;
247 if (receiver_id == 0 || !found) {
252 if (sender_id != 0) {
253 header |= PJON_TX_INFO_BIT;
255 incoming_packet_size = PJONTools::compose_packet(sender_id, bus_id, receiver_id,
256 bus_id, packet_buffer, payload, len, h);
258 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE || MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
259 uint8_t receiver_id = my_id;
262 const char *device_start = strstr(topic,
"/");
265 const char *device_start = strstr(topic,
"/device");
267 receiver_id = (uint8_t) atoi(&device_start[7]);
271 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
278 const char *start = (
const char*)memchr(device_start+1,
'/',
279 (
const char*)payload-device_start+len-2);
282 start = (
const char*)memchr(start+1,
'/', (
const char*)payload-start+len-2);
285 uint8_t l =
min(start - device_start + len -1,
sizeof key -1);
286 strncpy(key, start+1, l);
288 translate(key,
sizeof key,
false);
289 l =
min(len,
sizeof value-1);
290 strncpy(value, (
const char*)payload, l);
296 incoming_packet_size = PJONTools::compose_packet(receiver_id, bus_id, receiver_id,
297 bus_id, packet_buffer, s.c_str(), s.length()+1, header);
302 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
304 incoming_packet_size = PJONTools::compose_packet(receiver_id, bus_id, receiver_id,
305 bus_id, packet_buffer, payload, len, header);
310 ReconnectingMqttClient mqttclient;
313 String topic =
"pjon";
314 uint8_t my_id = PJON_NOT_ASSIGNED;
315 uint8_t bus_id[4] = {0,0,0,0};
317 bool lowercase_topics =
true;
318 bool subscribe_all =
false;
320 void set_config(uint8_t
id,
const uint8_t bus_id[4], uint8_t header)
323 if (bus_id != NULL) {
324 memcpy(this->bus_id, bus_id, 4);
326 this->header = header;
328 void set_qos(uint8_t qos)
332 void set_retain(
bool retain)
334 this->retain = retain;
336 void set_topic(
const char *topic)
346 void set_subscribe_all(
bool yes)
355 const uint8_t server_ip[4],
356 const uint16_t server_port,
357 const char *client_id
360 mqttclient.set_address(server_ip, server_port, client_id);
366 uint32_t back_off(uint8_t attempts)
374 bool begin(uint8_t device_id = 0)
380 mqttclient.set_receive_callback(static_receiver,
this);
381 char *p = (
char*)packet_buffer;
382 strcpy(p, topic.c_str());
392 strcpy(p,
"/device");
394 p += mqttclient.uint8toa(device_id, p);
397 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
398 strcat(p,
"/input/+");
400 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
403 mqttclient.subscribe((
const char*)packet_buffer, qos);
404 return mqttclient.connect();
412 return mqttclient.connect();
418 static uint8_t get_max_attempts()
426 static uint16_t get_receive_time()
428 return MQTTT_RECEIVE_TIME;
434 void handle_collision() { };
439 uint16_t receive_frame(uint8_t *
data, uint16_t max_length)
441 if (incoming_packet_size == 0) {
444 if (incoming_packet_size > 0 && incoming_packet_size <= max_length) {
445 memcpy(
data, packet_buffer, incoming_packet_size);
446 uint16_t len = incoming_packet_size;
447 incoming_packet_size = 0;
456 uint16_t receive_response()
458 return last_send_success ? PJON_ACK : PJON_FAIL;
464 void send_response(uint8_t response)
471 void send_frame(uint8_t *
data, uint16_t length)
474 PJONTools::parse_header(
data, _packet_info);
477 uint8_t len = strlen(topic.c_str());
478 if (len >= SMCTOPICSIZE) {
481 strcpy(mqttclient.topic_buf(), topic.c_str());
482 char *p = &mqttclient.topic_buf()[len];
484 if (p-mqttclient.topic_buf()+7+7 >= SMCTOPICSIZE) {
488 strcpy(p,
"/output");
491 if (p-mqttclient.topic_buf()+7+3+7 >= SMCTOPICSIZE) {
494 strcpy(p,
"/device");
496 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE || MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
497 p += mqttclient.uint8toa(_packet_info.tx.id, p);
498 strcat(p,
"/output");
500 #else // One of the bus modes, publish to receiver device
501 mqttclient.uint8toa(_packet_info.rx.id, p);
504 #if (MQTTT_MODE != MQTTT_MODE_BUS_RAW)
505 uint8_t overhead = PJONTools::packet_overhead(_packet_info.header);
506 uint8_t crc_size = PJONTools::crc_overhead(_packet_info.header);
510 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
515 uint8_t send_cnt = 0;
516 const char *d = (
const char*)&
data[overhead - crc_size], *v = d, *c, *e;
517 uint16_t plen = length - overhead;
518 while (v && (c = find_value_separator(v, d-v+plen))) {
519 if (e = (
const char *)memchr(v,
'=', (uint16_t)(c-v))) {
520 uint8_t l =
min(e-v,
sizeof key-1);
523 l =
min(c-e-1,
sizeof value-1);
524 strncpy(value, e+1, l);
526 if (!translate(key,
sizeof key,
true))
527 if (lowercase_topics)
for (
char *k=key; *k!=0; k++) {
530 if (p-mqttclient.topic_buf()+1+strlen(key) >= SMCTOPICSIZE) {
535 send_cnt += mqttclient.publish(mqttclient.topic_buf(), (uint8_t*)value, strlen(value), retain, qos);
536 v = c-d >= plen ? NULL : c+1;
539 last_send_success = send_cnt > 0;
542 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
547 #if (MQTTT_MODE == MQTTT_MODE_BUS_JSON)
550 p = (
char *) packet_buffer;
551 if (6+3+8+3+9+payload_len+2 >= MQTTT_BUFFER_SIZE) {
554 strcpy(p,
"{\"to\":");
556 p += mqttclient.uint8toa(_packet_info.rx.id, p);
557 strcpy(p,
",\"from\":");
559 p += mqttclient.uint8toa(_packet_info.tx.id, p);
560 strcpy(p,
",\"data\":\"");
562 uint8_t payload_len = length - overhead;
563 strncpy(p, (
const char*)&
data[overhead - crc_size], payload_len);
568 data = packet_buffer;
569 length = ((uint8_t*)p - packet_buffer);
573 last_send_success = mqttclient.publish(mqttclient.topic_buf(),
data, length, retain, qos);
576 const char *find_value_separator(
const char *value, uint16_t len)
579 const char *p = value;
580 while (p != NULL && (p-value < len) && *p !=
',' && *p != 0) {
586 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
587 bool add_translation(
const char *pjon_key,
const char *mqtt_key)
589 if (translation_count >= MQTTT_TRANSLATION_TABLE_SIZE) {
592 strncpy(pjon_keys[translation_count], pjon_key, MQTTT_KEY_SIZE);
593 pjon_keys[translation_count][MQTTT_KEY_SIZE-1] = 0;
594 strncpy(mqtt_keys[translation_count], mqtt_key, MQTTT_KEY_SIZE);
595 mqtt_keys[translation_count][MQTTT_KEY_SIZE-1] = 0;
596 for (
char *p=mqtt_keys[translation_count]; *p!=0; p++) {