update alot

This commit is contained in:
mreenen 2023-03-05 17:32:32 +01:00
parent 9e2a88bb5f
commit 5d9a1be202
10 changed files with 366 additions and 199 deletions

View File

@ -1,9 +1,21 @@
SRC := src/main.c src/mqtt.c
SRC := src/main.c src/mqtt.c src/module.c src/modules/*.c
INC := -I src -I include/paho-mqtt
LIBS := -Llibs/paho-mqtt -lpthread -lpaho-mqtt3a -lpaho-mqtt3c
LIBS := -Llibs/paho-mqtt -lpthread -lpaho-mqtt3a -lpaho-mqtt3c -lpthread
all: build
build: $(SRC)
gcc $(SRC) $(LIBS) -o relayClient $(INC)
gcc $(SRC) $(LIBS) -o mqttClient $(INC)
debug: $(SRC)
gcc $(SRC) $(LIBS) -g -o mqttClient $(INC)
install:
mkdir -p $$HOME/.local/bin
cp mqttClient $$HOME/.local/bin/mqttClient
chmod +x $$HOME/.local/bin/mqttClient
# sed -e "s/<username>/$$(id -un)/g" mqttClient.service >/etc/systemd/system/mqttClient.service
clean:
rm mqttClient

BIN
mqttClient Executable file

Binary file not shown.

View File

@ -1,18 +1,17 @@
# systemd service file to start relayClient
[Unit]
Description=MQTT client for switching relays
Description=MQTT client
#After=network.target
After=mosquitto.service
[Service]
Type=simple
# Run as normal pi user - change to the user name you wish to run relayClient
User=mreenen
Group=mreenen
WorkingDirectory=/home/mreenen/relayClient
User=<username>
Group=<username>
WorkingDirectory=/home/<username>
ExecStart=/home/mreenen/relayClient/relayClient
ExecStart=/home/<username>/.local/bin/relayClient
# Use SIGINT to stop
KillSignal=SIGINT
# Auto restart on crash

View File

@ -1,12 +1,12 @@
#if !defined(CONF_H)
#define CONF_H
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "mqttClock"
#define TOPIC "coolhaven/clock"
#define ADDRESS "tcp://10.2.0.3:1883"
#define CLIENTID "mqttClient"
#define BASE_TOPIC "/cool/"
#define TIMEOUT 10000L
#define RECON_TIMEOUT 60
#define RECON_TIMEOUT 60 // seconds
#if defined(_WIN32)
#define SLEEP(n) Sleep(n*10)

View File

@ -7,64 +7,24 @@
#include "MQTTAsync.h"
#include "mqtt.h"
void onConnect(void* context){
return;
}
void onMessage(char* topicName, int topicLen, MQTTAsync_message* message){
return;
}
void sendTick(char* name, struct tm* t)
{
char topic[100] = TOPIC "/";
strcat(&topic[0], name);
char payload[50];
sprintf(&payload[0], "%04d-%02d-%02dT%02d:%02d:%02d",
t->tm_year,
t->tm_mon + 1,
t->tm_mday,
t->tm_hour,
t->tm_min,
t->tm_sec
);
MQTT_publish(topic, &payload[0], 1);
}
#include "module.h"
int main(int argc, char* argv[]){
printf("==============================\n"
"=== mqttClock ================\n"
"=== mqttClient ===============\n"
"==============================\n\n");
clientConf_t* client = MQTT_connect(&onConnect, &onMessage);
if(client != NULL){
int lastMin = 0;
int lastHour = 0;
clientConf_t* client = MQTT_connect(NULL);
if(client != NULL)
{
usleep((int)500e3);
Modules_Init();
Modules_StartAll();
int returnCode = 1;
while (returnCode = 1)
while(returnCode == 1)
{
time_t rowNow = time(NULL);
struct tm* now = gmtime(&rowNow);
sendTick("second", now);
if (now->tm_min != lastMin)
{
sendTick("minute", now);
lastMin = now->tm_min;
if (now->tm_hour != lastHour)
{
sendTick("hour", now);
lastHour = now->tm_hour;
}
}
char ch = getchar();
switch (ch) {
case 'q':
@ -76,7 +36,7 @@ int main(int argc, char* argv[]){
}
else
{
printf("failt to connect to mqtt\n");
printf("CRITICAL: main(): failt to connect to mqtt\n");
return -1;
}

66
src/module.c Normal file
View File

@ -0,0 +1,66 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "module.h"
Module_t *Modules = NULL;
unsigned int Modules_len = 0;
unsigned int Modules_alloc = 0;
void Modules_Add(Module_t module)
{
printf("INFO: Modules_Add(): add module '%s' to the list\n", module.Name);
if (Modules == NULL)
{
Modules = (Module_t*) malloc(sizeof(Module_t) * 5);
Modules_alloc = 5;
}
if (Modules_len >= Modules_alloc)
{
Module_t *new_subs = (Module_t *) malloc(sizeof(Module_t) * (Modules_alloc + 5));
memcpy(new_subs, Modules, sizeof(Module_t) * Modules_alloc);
Module_t *old_subs = Modules;
Modules = new_subs;
free(old_subs);
}
memcpy(Modules + sizeof(Module_t)*Modules_len, &module, sizeof(Module_t));
Modules_len++;
}
void Modules_Init()
{
// extern Module_t Module_Clock();
// Modules_Add(Module_Clock());
extern Module_t Module_Button();
Modules_Add(Module_Button());
}
void Modules_StartOne(Module_t module)
{
printf("INFO: Modules_StartOne(): starting module '%s'\n", module.Name);
(*(module.Start))();
}
void Modules_StopOne(Module_t module)
{
(*(module.Stop))();
}
void Modules_StartAll()
{
for (int i=0; i<Modules_len; i++)
{
Modules_StartOne(*(Modules + sizeof(Module_t)*i));
}
}
void Modules_StopAll()
{
for (int i=0; i<Modules_len; i++)
{
Modules_StopOne(*(Modules + sizeof(Module_t)*i));
}
}

19
src/module.h Normal file
View File

@ -0,0 +1,19 @@
#if !defined(MODULE_BASE_H)
#define MODULE_BASE_H
typedef struct Module_s {
char *Name;
void (*Start)();
void (*Stop)();
} Module_t;
void Modules_Init();
void Modules_Add(Module_t module);
void Modules_StartOne(Module_t module);
void Modules_StopOne(Module_t module);
void Modules_StartAll();
void Modules_StopAll();
#endif

33
src/modules/buttons.c Normal file
View File

@ -0,0 +1,33 @@
#include <pthread.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include "conf.h"
#include "module.h"
#include "mqtt.h"
void button1(char *topicName, int topicLen, MQTTAsync_message *message)
{
printf("DEBUG: buttons.button1(): payload = %s", (char*)message->payload);
}
void Buttons_Stop()
{
return;
}
void Buttons_Start()
{
MQTT_subscribe("button1", 1, &button1);
return;
}
Module_t Module_Button()
{
Module_t module;
module.Name = "buttons";
module.Start = &Buttons_Start;
module.Stop = &Buttons_Stop;
return module;
}

View File

@ -1,5 +1,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "conf.h"
@ -8,136 +9,17 @@
#include "MQTTAsync.h"
#include "mqtt.h"
clientConf_t* client;
void reconnect(){
int rc;
if(client->last_reconn.t + RECON_TIMEOUT > time(NULL)){
if(client->last_reconn.c < 10){
printf("Wait %d seconds until reconect. reconect counter on %d\n", RECON_TIMEOUT, client->last_reconn.c);
SLEEP(RECON_TIMEOUT);
} else {
printf("Wait one hour until reconect. reconect counter on %d\n", client->last_reconn.c);
SLEEP(3600);
}
}
printf("Reconnecting to MQTT server\n");
rc = MQTTAsync_connect(client->client, &(client->conn_opts));
if (rc != MQTTASYNC_SUCCESS){
printf("ERROR: Failed to reconnect, return code %d\n", rc);
client->last_reconn.t = time(NULL);
client->last_reconn.c++;
} else {
client->last_reconn.c = 0;
}
}
void connlost(void *context, char *cause){
printf("\nERROR: Lost connection to MQTT\n");
if(cause){
printf(" cause: %s\n", cause);
}
reconnect();
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
(*(client->onMessage))(topicName, topicLen, message);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response){
printf("Connect failed, rc %d\n", response->code);
reconnect();
}
void onConn(void* context, MQTTAsync_successData* response){
printf("connected to MQTT server\n");
(*(client->onConnect))(client);
}
clientConf_t* MQTT_connect(void (*onConnect_cb)(void* context), void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message)){
client = (clientConf_t*) malloc(sizeof(clientConf_t)); // keep this util i say so
int rc;
int ch;
rc = MQTTAsync_create(&(client->client), ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTASYNC_SUCCESS){
printf("Failed to create client, return code %d\n", rc);
free(client);
return NULL;
}
client->onMessage = onMessage;
client->onConnect = onConnect_cb;
client->last_reconn.c = 0;
client->last_reconn.t = 0;
rc = MQTTAsync_setCallbacks(client->client, client, &connlost, &msgarrvd, NULL);
if (rc != MQTTASYNC_SUCCESS){
printf("Failed to set callbacks, return code %d\n", rc);
MQTTAsync_destroy(&(client->client));
free(client);
return NULL;
}
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
client->conn_opts = conn_opts;
client->conn_opts.keepAliveInterval = 20;
client->conn_opts.cleansession = 1;
client->conn_opts.onSuccess = onConn;
client->conn_opts.onFailure = onConnectFailure;
client->conn_opts.context = &client;
printf("connecting to MQTT server (%s)\n", ADDRESS);
rc = MQTTAsync_connect(client->client, &(client->conn_opts));
if(rc != MQTTASYNC_SUCCESS){
printf("Failed to start connect, return code %d\n", rc);
MQTTAsync_destroy(&(client->client));
free(client);
return NULL;
}
return client;
}
void onDisconnectFailure(void* context, MQTTAsync_failureData* response){
printf("Faild to discontect from MQTT, rc %d\n", response->code);
MQTTAsync_destroy(client->client);
free(client);
}
void onDisconnect(void* context, MQTTAsync_successData* response){
printf("disconnected from MQTT server\n");
MQTTAsync_destroy(client->client);
free(client);
}
void MQTT_disconnect(){
int rc;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
disc_opts.onSuccess = &onDisconnect;
disc_opts.onFailure = &onDisconnectFailure;
rc = MQTTAsync_disconnect(client->client, &disc_opts);
if (rc != MQTTASYNC_SUCCESS){
printf("Failed to start disconnect, return code %d\n", rc);
MQTTAsync_destroy(client->client);
return;
}
}
typedef struct Subscription_s {
char* topic;
char qos;
void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message);
} Subscription_t;
clientConf_t* Client;
Subscription_t *Subscriptions = NULL;
unsigned int Subscriptions_len = 0;
unsigned int Subscriptions_aloc = 0;
void onSubscribe(void* context, MQTTAsync_successData* response){
// printf("Successful subscribed to %s\n", (char*) context);
@ -148,15 +30,57 @@ void onSubscribeFailure(void* context, MQTTAsync_failureData* response){
printf("ERROR: Subscribe failed, rc %d\n", response->code);
}
void MQTT_subscribe(char* topic, int qos){
void MQTT_subscribe(char* topic, int qos, void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message))
{
int rc;
printf("Subscribing to topic %s (QoS%d)\n", topic, qos);
if (Subscriptions == NULL)
{
Subscriptions = (Subscription_t *) malloc(sizeof(Subscription_t) * 5);
Subscriptions_aloc = 5;
}
if (Subscriptions_len >= Subscriptions_aloc)
{
Subscription_t *new_subs = (Subscription_t *) malloc(sizeof(Subscription_t) * (Subscriptions_aloc + 5));
memcpy(new_subs, Subscriptions, sizeof(Subscription_t) * Subscriptions_aloc);
Subscription_t *old_subs = Subscriptions;
Subscriptions = new_subs;
free(old_subs);
}
Subscription_t sub;
sub.qos = qos;
sub.onMessage = onMessage;
if (topic[0] != '/')
{
char *newTopic = malloc(strlen(BASE_TOPIC) + strlen(topic) + 1);
memcpy(newTopic, BASE_TOPIC, strlen(BASE_TOPIC));
memcpy(newTopic + strlen(BASE_TOPIC), topic, strlen(topic) + 1);
sub.topic = newTopic;
}
else
{
// copy topic to allow passing local variable
char *newTopic = malloc(strlen(topic) + 1);
memcpy(newTopic, topic, strlen(topic) + 1);
sub.topic = newTopic;
}
memcpy(Subscriptions + sizeof(Subscription_t)*(Subscriptions_len), &sub, sizeof(Subscription_t));
Subscriptions_len++;
printf("INFO: MQTT_subscribe(): Subscribing to topic %s (QoS%d)\n", sub.topic, sub.qos);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &onSubscribe;
opts.onFailure = &onSubscribeFailure;
opts.context = topic;
rc = MQTTAsync_subscribe(client->client, topic, qos, &opts);
opts.context = &sub;
rc = MQTTAsync_subscribe(Client->client, "#", qos, &opts);
// rc = MQTTAsync_subscribe(Client->client, topic, qos, &opts);
if (rc != MQTTASYNC_SUCCESS){
printf("ERROR: Failed to start subscribe, return code %d\n", rc);
}
@ -164,5 +88,159 @@ void MQTT_subscribe(char* topic, int qos){
void MQTT_publish(char* topic, char* payload, int qos)
{
MQTTClient_publish(client->client, topic, len(topic), payload, qos, 0, NULL);
printf("Publishing to %s", topic);
MQTTClient_publish(Client->client, topic, strlen(payload), payload, qos, 0, NULL);
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message){
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
//(*(Client->onMessage))(topicName, topicLen, message);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void reconnect(){
int rc;
if(Client->last_reconn.t + RECON_TIMEOUT > time(NULL)){
if(Client->last_reconn.c < 10){
printf("INFO: MQTT.reconnect(): Wait %d seconds until reconect. reconect counter on %d\n", RECON_TIMEOUT, Client->last_reconn.c);
sleep(RECON_TIMEOUT);
} else {
printf("INFO: MQTT.reconnect(): Wait min minutes until reconect. reconect counter on %d\n", Client->last_reconn.c);
sleep(60 * 10);
}
}
printf("INFO: MQTT.reconnect(): Reconnecting to MQTT server\n");
rc = MQTTAsync_connect(Client->client, &(Client->conn_opts));
if (rc != MQTTASYNC_SUCCESS){
printf("ERROR: MQTT.reconnect(): Failed to reconnect, return code %d\n", rc);
} else {
printf("INFO: MQTT.reconnect(): Reconnect sucsesfull\n");
}
Client->last_reconn.t = time(NULL);
Client->last_reconn.c++;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response){
printf("ERROR: MQTT.onConnectFailure(): Connection failed, rc %d\n", response->code);
reconnect();
}
void connlost(void *context, char *cause){
printf("\nERROR: MQTT.connlost(): Lost connection to MQTT\n");
if (cause != NULL)
{
printf(" cause: %s\n", cause);
}
reconnect();
}
void onConn(void* context, MQTTAsync_successData* response){
printf("INFO: MQTT.onConn(): connected to MQTT server\n");
Client->last_reconn.c = 0;
int rc;
for (int i=0; i < Subscriptions_len; i++)
{
Subscription_t sub = *(Subscriptions + sizeof(Subscription_t)*i);
printf("INFO: MQTT.onConn(): Subscribing to topic %s (QoS%d)\n", sub.topic, sub.qos);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &onSubscribe;
opts.onFailure = &onSubscribeFailure;
opts.context = &sub;
rc = MQTTAsync_subscribe(Client->client, sub.topic, sub.qos, &opts);
if (rc != MQTTASYNC_SUCCESS){
printf("ERROR: MQTT.onConn(): Failed to start subscribe, return code %d\n", rc);
}
}
if (Client->onConnect != NULL)
{
(*(Client->onConnect))(Client);
}
}
clientConf_t* MQTT_connect(void (*onConnect_cb)(void* context)){
Client = (clientConf_t*) malloc(sizeof(clientConf_t)); // keep this util i say so
int rc;
int ch;
rc = MQTTAsync_create(&(Client->client), ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTASYNC_SUCCESS){
printf("Failed to create client, return code %d\n", rc);
free(Client);
return NULL;
}
Client->onConnect = onConnect_cb;
Client->last_reconn.c = 0;
Client->last_reconn.t = time(NULL);
rc = MQTTAsync_setCallbacks(Client->client, Client, &connlost, &msgarrvd, NULL);
if (rc != MQTTASYNC_SUCCESS){
printf("Failed to set callbacks, return code %d\n", rc);
MQTTAsync_destroy(&(Client->client));
free(Client);
return NULL;
}
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
Client->conn_opts = conn_opts;
Client->conn_opts.keepAliveInterval = 20;
Client->conn_opts.cleansession = 1;
Client->conn_opts.onSuccess = onConn;
Client->conn_opts.onFailure = onConnectFailure;
Client->conn_opts.context = &Client;
printf("connecting to MQTT server (%s)\n", ADDRESS);
rc = MQTTAsync_connect(Client->client, &(Client->conn_opts));
if(rc != MQTTASYNC_SUCCESS){
printf("Failed to start connect, return code %d\n", rc);
MQTTAsync_destroy(&(Client->client));
free(Client);
return NULL;
}
return Client;
}
void onDisconnectFailure(void* context, MQTTAsync_failureData* response){
printf("Faild to discontect from MQTT, rc %d\n", response->code);
MQTTAsync_destroy(Client->client);
free(Client);
}
void onDisconnect(void* context, MQTTAsync_successData* response){
printf("disconnected from MQTT server\n");
MQTTAsync_destroy(Client->client);
free(Client);
}
void MQTT_disconnect(){
int rc;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
disc_opts.onSuccess = &onDisconnect;
disc_opts.onFailure = &onDisconnectFailure;
rc = MQTTAsync_disconnect(Client->client, &disc_opts);
if (rc != MQTTASYNC_SUCCESS){
printf("Failed to start disconnect, return code %d\n", rc);
MQTTAsync_destroy(Client->client);
return;
}
}

View File

@ -16,10 +16,10 @@ typedef struct clientConf_s {
} clientConf_t;
clientConf_t* MQTT_connect(void (*onConnect)(void* context), void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message));
clientConf_t* MQTT_connect(void (*onConnect)(void* context));
void MQTT_disconnect();
void MQTT_subscribe(char* topic, int qos);
void MQTT_subscribe(char* topic, int qos, void (*onMessage)(char* topicName, int topicLen, MQTTAsync_message* message));
void MQTT_publish(char* topic, char* payload, int qos);
#endif