diff --git a/Makefile b/Makefile index b4d97c7..ea37fca 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,21 @@ -SRC := src/main.c src/mqtt.c +SRC := src/main.c src/mqtt.c src/module.c src/modules/*.c INC := -I src -I include/paho-mqtt -LIBS := -Llibs/paho-mqtt -lpthread -lpaho-mqtt3a -lpaho-mqtt3c +LIBS := -Llibs/paho-mqtt -lpthread -lpaho-mqtt3a -lpaho-mqtt3c -lpthread all: build build: $(SRC) - gcc $(SRC) $(LIBS) -o relayClient $(INC) + gcc $(SRC) $(LIBS) -o mqttClient $(INC) + +debug: $(SRC) + gcc $(SRC) $(LIBS) -g -o mqttClient $(INC) + +install: + mkdir -p $$HOME/.local/bin + cp mqttClient $$HOME/.local/bin/mqttClient + chmod +x $$HOME/.local/bin/mqttClient + # sed -e "s//$$(id -un)/g" mqttClient.service >/etc/systemd/system/mqttClient.service + +clean: + rm mqttClient diff --git a/mqttClient b/mqttClient new file mode 100755 index 0000000..bd6b0e1 Binary files /dev/null and b/mqttClient differ diff --git a/relayClient.service b/relayClient.service index c77edf9..64c6cdc 100644 --- a/relayClient.service +++ b/relayClient.service @@ -1,18 +1,17 @@ # systemd service file to start relayClient [Unit] -Description=MQTT client for switching relays +Description=MQTT client #After=network.target After=mosquitto.service [Service] Type=simple -# Run as normal pi user - change to the user name you wish to run relayClient -User=mreenen -Group=mreenen -WorkingDirectory=/home/mreenen/relayClient +User= +Group= +WorkingDirectory=/home/ -ExecStart=/home/mreenen/relayClient/relayClient +ExecStart=/home//.local/bin/relayClient # Use SIGINT to stop KillSignal=SIGINT # Auto restart on crash diff --git a/src/conf.h b/src/conf.h index 47b2c98..2920098 100644 --- a/src/conf.h +++ b/src/conf.h @@ -1,12 +1,12 @@ #if !defined(CONF_H) #define CONF_H -#define ADDRESS "tcp://localhost:1883" -#define CLIENTID "mqttClock" -#define TOPIC "coolhaven/clock" +#define ADDRESS "tcp://10.2.0.3:1883" +#define CLIENTID "mqttClient" +#define BASE_TOPIC "/cool/" #define TIMEOUT 10000L -#define RECON_TIMEOUT 60 +#define RECON_TIMEOUT 60 // seconds #if defined(_WIN32) #define SLEEP(n) Sleep(n*10) diff --git a/src/main.c b/src/main.c index 1892e76..8daa456 100644 --- a/src/main.c +++ b/src/main.c @@ -7,64 +7,24 @@ #include "MQTTAsync.h" #include "mqtt.h" - -void onConnect(void* context){ - return; -} - -void onMessage(char* topicName, int topicLen, MQTTAsync_message* message){ - return; -} - -void sendTick(char* name, struct tm* t) -{ - - char topic[100] = TOPIC "/"; - strcat(&topic[0], name); - - char payload[50]; - sprintf(&payload[0], "%04d-%02d-%02dT%02d:%02d:%02d", - t->tm_year, - t->tm_mon + 1, - t->tm_mday, - t->tm_hour, - t->tm_min, - t->tm_sec - ); - - MQTT_publish(topic, &payload[0], 1); -} +#include "module.h" int main(int argc, char* argv[]){ printf("==============================\n" - "=== mqttClock ================\n" + "=== mqttClient ===============\n" "==============================\n\n"); - clientConf_t* client = MQTT_connect(&onConnect, &onMessage); - - if(client != NULL){ - int lastMin = 0; - int lastHour = 0; + clientConf_t* client = MQTT_connect(NULL); + if(client != NULL) + { + usleep((int)500e3); + Modules_Init(); + Modules_StartAll(); + int returnCode = 1; - while (returnCode = 1) + while(returnCode == 1) { - time_t rowNow = time(NULL); - struct tm* now = gmtime(&rowNow); - - sendTick("second", now); - if (now->tm_min != lastMin) - { - sendTick("minute", now); - lastMin = now->tm_min; - - if (now->tm_hour != lastHour) - { - sendTick("hour", now); - lastHour = now->tm_hour; - } - } - char ch = getchar(); switch (ch) { case 'q': @@ -76,7 +36,7 @@ int main(int argc, char* argv[]){ } else { - printf("failt to connect to mqtt\n"); + printf("CRITICAL: main(): failt to connect to mqtt\n"); return -1; } diff --git a/src/module.c b/src/module.c new file mode 100644 index 0000000..28c89f2 --- /dev/null +++ b/src/module.c @@ -0,0 +1,66 @@ +#include +#include +#include + +#include "module.h" + +Module_t *Modules = NULL; +unsigned int Modules_len = 0; +unsigned int Modules_alloc = 0; + +void Modules_Add(Module_t module) +{ + printf("INFO: Modules_Add(): add module '%s' to the list\n", module.Name); + if (Modules == NULL) + { + Modules = (Module_t*) malloc(sizeof(Module_t) * 5); + Modules_alloc = 5; + } + + if (Modules_len >= Modules_alloc) + { + Module_t *new_subs = (Module_t *) malloc(sizeof(Module_t) * (Modules_alloc + 5)); + memcpy(new_subs, Modules, sizeof(Module_t) * Modules_alloc); + Module_t *old_subs = Modules; + Modules = new_subs; + free(old_subs); + } + + memcpy(Modules + sizeof(Module_t)*Modules_len, &module, sizeof(Module_t)); + Modules_len++; +} + +void Modules_Init() +{ + // extern Module_t Module_Clock(); + // Modules_Add(Module_Clock()); + extern Module_t Module_Button(); + Modules_Add(Module_Button()); +} + +void Modules_StartOne(Module_t module) +{ + printf("INFO: Modules_StartOne(): starting module '%s'\n", module.Name); + (*(module.Start))(); +} + +void Modules_StopOne(Module_t module) +{ + (*(module.Stop))(); +} + +void Modules_StartAll() +{ + for (int i=0; i +#include +#include +#include + +#include "conf.h" +#include "module.h" +#include "mqtt.h" + +void button1(char *topicName, int topicLen, MQTTAsync_message *message) +{ + printf("DEBUG: buttons.button1(): payload = %s", (char*)message->payload); +} + +void Buttons_Stop() +{ + return; +} + +void Buttons_Start() +{ + MQTT_subscribe("button1", 1, &button1); + return; +} + +Module_t Module_Button() +{ + Module_t module; + module.Name = "buttons"; + module.Start = &Buttons_Start; + module.Stop = &Buttons_Stop; + return module; +} diff --git a/src/mqtt.c b/src/mqtt.c index 5783327..4624a8d 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -1,5 +1,6 @@ #include #include +#include #include #include "conf.h" @@ -8,136 +9,17 @@ #include "MQTTAsync.h" #include "mqtt.h" -clientConf_t* client; - -void reconnect(){ - int rc; - - if(client->last_reconn.t + RECON_TIMEOUT > time(NULL)){ - if(client->last_reconn.c < 10){ - printf("Wait %d seconds until reconect. reconect counter on %d\n", RECON_TIMEOUT, client->last_reconn.c); - SLEEP(RECON_TIMEOUT); - } else { - printf("Wait one hour until reconect. reconect counter on %d\n", client->last_reconn.c); - SLEEP(3600); - } - } - - printf("Reconnecting to MQTT server\n"); - rc = MQTTAsync_connect(client->client, &(client->conn_opts)); - if (rc != MQTTASYNC_SUCCESS){ - printf("ERROR: Failed to reconnect, return code %d\n", rc); - client->last_reconn.t = time(NULL); - client->last_reconn.c++; - } else { - client->last_reconn.c = 0; - } -} - -void connlost(void *context, char *cause){ - printf("\nERROR: Lost connection to MQTT\n"); - if(cause){ - printf(" cause: %s\n", cause); - } - - reconnect(); -} - -int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message){ - printf("Message arrived\n"); - printf(" topic: %s\n", topicName); - printf(" message: %.*s\n", message->payloadlen, (char*)message->payload); - - (*(client->onMessage))(topicName, topicLen, message); - - MQTTAsync_freeMessage(&message); - MQTTAsync_free(topicName); - return 1; -} - -void onConnectFailure(void* context, MQTTAsync_failureData* response){ - printf("Connect failed, rc %d\n", response->code); - reconnect(); -} - -void onConn(void* context, MQTTAsync_successData* response){ - printf("connected to MQTT server\n"); - (*(client->onConnect))(client); -} - - -clientConf_t* MQTT_connect(void (*onConnect_cb)(void* context), void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message)){ - client = (clientConf_t*) malloc(sizeof(clientConf_t)); // keep this util i say so - int rc; - int ch; - - rc = MQTTAsync_create(&(client->client), ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); - if (rc != MQTTASYNC_SUCCESS){ - printf("Failed to create client, return code %d\n", rc); - free(client); - return NULL; - } - - client->onMessage = onMessage; - client->onConnect = onConnect_cb; - client->last_reconn.c = 0; - client->last_reconn.t = 0; - - rc = MQTTAsync_setCallbacks(client->client, client, &connlost, &msgarrvd, NULL); - if (rc != MQTTASYNC_SUCCESS){ - printf("Failed to set callbacks, return code %d\n", rc); - MQTTAsync_destroy(&(client->client)); - free(client); - return NULL; - } - - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - client->conn_opts = conn_opts; - client->conn_opts.keepAliveInterval = 20; - client->conn_opts.cleansession = 1; - client->conn_opts.onSuccess = onConn; - client->conn_opts.onFailure = onConnectFailure; - client->conn_opts.context = &client; - printf("connecting to MQTT server (%s)\n", ADDRESS); - rc = MQTTAsync_connect(client->client, &(client->conn_opts)); - if(rc != MQTTASYNC_SUCCESS){ - printf("Failed to start connect, return code %d\n", rc); - MQTTAsync_destroy(&(client->client)); - free(client); - return NULL; - } - - return client; -} - - - -void onDisconnectFailure(void* context, MQTTAsync_failureData* response){ - printf("Faild to discontect from MQTT, rc %d\n", response->code); - MQTTAsync_destroy(client->client); - free(client); -} - -void onDisconnect(void* context, MQTTAsync_successData* response){ - printf("disconnected from MQTT server\n"); - MQTTAsync_destroy(client->client); - free(client); -} - -void MQTT_disconnect(){ - int rc; - MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; - disc_opts.onSuccess = &onDisconnect; - disc_opts.onFailure = &onDisconnectFailure; - rc = MQTTAsync_disconnect(client->client, &disc_opts); - if (rc != MQTTASYNC_SUCCESS){ - printf("Failed to start disconnect, return code %d\n", rc); - MQTTAsync_destroy(client->client); - return; - } -} +typedef struct Subscription_s { + char* topic; + char qos; + void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message); +} Subscription_t; +clientConf_t* Client; +Subscription_t *Subscriptions = NULL; +unsigned int Subscriptions_len = 0; +unsigned int Subscriptions_aloc = 0; void onSubscribe(void* context, MQTTAsync_successData* response){ // printf("Successful subscribed to %s\n", (char*) context); @@ -148,15 +30,57 @@ void onSubscribeFailure(void* context, MQTTAsync_failureData* response){ printf("ERROR: Subscribe failed, rc %d\n", response->code); } -void MQTT_subscribe(char* topic, int qos){ +void MQTT_subscribe(char* topic, int qos, void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message)) +{ int rc; - printf("Subscribing to topic %s (QoS%d)\n", topic, qos); + + if (Subscriptions == NULL) + { + Subscriptions = (Subscription_t *) malloc(sizeof(Subscription_t) * 5); + Subscriptions_aloc = 5; + } + + if (Subscriptions_len >= Subscriptions_aloc) + { + Subscription_t *new_subs = (Subscription_t *) malloc(sizeof(Subscription_t) * (Subscriptions_aloc + 5)); + memcpy(new_subs, Subscriptions, sizeof(Subscription_t) * Subscriptions_aloc); + Subscription_t *old_subs = Subscriptions; + Subscriptions = new_subs; + free(old_subs); + } + + Subscription_t sub; + sub.qos = qos; + sub.onMessage = onMessage; + + if (topic[0] != '/') + { + char *newTopic = malloc(strlen(BASE_TOPIC) + strlen(topic) + 1); + memcpy(newTopic, BASE_TOPIC, strlen(BASE_TOPIC)); + memcpy(newTopic + strlen(BASE_TOPIC), topic, strlen(topic) + 1); + sub.topic = newTopic; + } + else + { + // copy topic to allow passing local variable + char *newTopic = malloc(strlen(topic) + 1); + memcpy(newTopic, topic, strlen(topic) + 1); + sub.topic = newTopic; + } + + memcpy(Subscriptions + sizeof(Subscription_t)*(Subscriptions_len), &sub, sizeof(Subscription_t)); + Subscriptions_len++; + + printf("INFO: MQTT_subscribe(): Subscribing to topic %s (QoS%d)\n", sub.topic, sub.qos); + MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &onSubscribe; opts.onFailure = &onSubscribeFailure; - opts.context = topic; - rc = MQTTAsync_subscribe(client->client, topic, qos, &opts); + opts.context = ⊂ + rc = MQTTAsync_subscribe(Client->client, "#", qos, &opts); + // rc = MQTTAsync_subscribe(Client->client, topic, qos, &opts); + if (rc != MQTTASYNC_SUCCESS){ printf("ERROR: Failed to start subscribe, return code %d\n", rc); } @@ -164,5 +88,159 @@ void MQTT_subscribe(char* topic, int qos){ void MQTT_publish(char* topic, char* payload, int qos) { - MQTTClient_publish(client->client, topic, len(topic), payload, qos, 0, NULL); + printf("Publishing to %s", topic); + MQTTClient_publish(Client->client, topic, strlen(payload), payload, qos, 0, NULL); } + +int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message){ + printf("Message arrived\n"); + printf(" topic: %s\n", topicName); + printf(" message: %.*s\n", message->payloadlen, (char*)message->payload); + + //(*(Client->onMessage))(topicName, topicLen, message); + + MQTTAsync_freeMessage(&message); + MQTTAsync_free(topicName); + return 1; +} + +void reconnect(){ + int rc; + + if(Client->last_reconn.t + RECON_TIMEOUT > time(NULL)){ + if(Client->last_reconn.c < 10){ + printf("INFO: MQTT.reconnect(): Wait %d seconds until reconect. reconect counter on %d\n", RECON_TIMEOUT, Client->last_reconn.c); + sleep(RECON_TIMEOUT); + } else { + printf("INFO: MQTT.reconnect(): Wait min minutes until reconect. reconect counter on %d\n", Client->last_reconn.c); + sleep(60 * 10); + } + } + + printf("INFO: MQTT.reconnect(): Reconnecting to MQTT server\n"); + rc = MQTTAsync_connect(Client->client, &(Client->conn_opts)); + if (rc != MQTTASYNC_SUCCESS){ + printf("ERROR: MQTT.reconnect(): Failed to reconnect, return code %d\n", rc); + } else { + printf("INFO: MQTT.reconnect(): Reconnect sucsesfull\n"); + } + Client->last_reconn.t = time(NULL); + Client->last_reconn.c++; +} + +void onConnectFailure(void* context, MQTTAsync_failureData* response){ + printf("ERROR: MQTT.onConnectFailure(): Connection failed, rc %d\n", response->code); + reconnect(); +} + +void connlost(void *context, char *cause){ + printf("\nERROR: MQTT.connlost(): Lost connection to MQTT\n"); + if (cause != NULL) + { + printf(" cause: %s\n", cause); + } + + reconnect(); +} + +void onConn(void* context, MQTTAsync_successData* response){ + printf("INFO: MQTT.onConn(): connected to MQTT server\n"); + + Client->last_reconn.c = 0; + + int rc; + for (int i=0; i < Subscriptions_len; i++) + { + Subscription_t sub = *(Subscriptions + sizeof(Subscription_t)*i); + + printf("INFO: MQTT.onConn(): Subscribing to topic %s (QoS%d)\n", sub.topic, sub.qos); + + MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; + opts.onSuccess = &onSubscribe; + opts.onFailure = &onSubscribeFailure; + opts.context = ⊂ + rc = MQTTAsync_subscribe(Client->client, sub.topic, sub.qos, &opts); + + if (rc != MQTTASYNC_SUCCESS){ + printf("ERROR: MQTT.onConn(): Failed to start subscribe, return code %d\n", rc); + } + } + + if (Client->onConnect != NULL) + { + (*(Client->onConnect))(Client); + } +} + + +clientConf_t* MQTT_connect(void (*onConnect_cb)(void* context)){ + Client = (clientConf_t*) malloc(sizeof(clientConf_t)); // keep this util i say so + int rc; + int ch; + + rc = MQTTAsync_create(&(Client->client), ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); + if (rc != MQTTASYNC_SUCCESS){ + printf("Failed to create client, return code %d\n", rc); + free(Client); + return NULL; + } + + Client->onConnect = onConnect_cb; + Client->last_reconn.c = 0; + Client->last_reconn.t = time(NULL); + + rc = MQTTAsync_setCallbacks(Client->client, Client, &connlost, &msgarrvd, NULL); + if (rc != MQTTASYNC_SUCCESS){ + printf("Failed to set callbacks, return code %d\n", rc); + MQTTAsync_destroy(&(Client->client)); + free(Client); + return NULL; + } + + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + Client->conn_opts = conn_opts; + Client->conn_opts.keepAliveInterval = 20; + Client->conn_opts.cleansession = 1; + Client->conn_opts.onSuccess = onConn; + Client->conn_opts.onFailure = onConnectFailure; + Client->conn_opts.context = &Client; + printf("connecting to MQTT server (%s)\n", ADDRESS); + rc = MQTTAsync_connect(Client->client, &(Client->conn_opts)); + if(rc != MQTTASYNC_SUCCESS){ + printf("Failed to start connect, return code %d\n", rc); + MQTTAsync_destroy(&(Client->client)); + free(Client); + return NULL; + } + + return Client; +} + + + +void onDisconnectFailure(void* context, MQTTAsync_failureData* response){ + printf("Faild to discontect from MQTT, rc %d\n", response->code); + MQTTAsync_destroy(Client->client); + free(Client); +} + +void onDisconnect(void* context, MQTTAsync_successData* response){ + printf("disconnected from MQTT server\n"); + MQTTAsync_destroy(Client->client); + free(Client); +} + +void MQTT_disconnect(){ + int rc; + MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; + disc_opts.onSuccess = &onDisconnect; + disc_opts.onFailure = &onDisconnectFailure; + rc = MQTTAsync_disconnect(Client->client, &disc_opts); + if (rc != MQTTASYNC_SUCCESS){ + printf("Failed to start disconnect, return code %d\n", rc); + MQTTAsync_destroy(Client->client); + return; + } +} + + diff --git a/src/mqtt.h b/src/mqtt.h index 9d6068b..f551adc 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -16,10 +16,10 @@ typedef struct clientConf_s { } clientConf_t; -clientConf_t* MQTT_connect(void (*onConnect)(void* context), void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message)); +clientConf_t* MQTT_connect(void (*onConnect)(void* context)); void MQTT_disconnect(); -void MQTT_subscribe(char* topic, int qos); +void MQTT_subscribe(char* topic, int qos, void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message)); void MQTT_publish(char* topic, char* payload, int qos); #endif