diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml index e27fb58..f429a6f 100644 --- a/man/mosquitto.conf.5.xml +++ b/man/mosquitto.conf.5.xml @@ -230,6 +230,24 @@ Reloaded on reload signal. + + [ true | false ] + + This option affects the scenario when a client + subscribes to a topic that has retained messages. It is + possible that the client that published the retained + message to the topic had access at the time they + published, but that access has been subsequently + removed. If is set + to true, the default, the source of a retained message + will be checked for access rights before it is + republished. When set to false, no check will be made + and the retained message will always be + published. + This option applies globally, regardless of the + option. + + prefix diff --git a/mosquitto.conf b/mosquitto.conf index df1aa8b..70f1f80 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -122,6 +122,15 @@ # This is a non-standard option explicitly disallowed by the spec. #upgrade_outgoing_qos false +# This option affects the scenario when a client subscribes to a topic that has +# retained messages. It is possible that the client that published the retained +# message to the topic had access at the time they published, but that access +# has been subsequently removed. If check_retain_source is set to true, the +# default, the source of a retained message will be checked for access rights +# before it is republished. When set to false, no check will be made and the +# retained message will always be published. This affects all listeners. +#check_retain_source true + # ================================================================= # Default listener # ================================================================= diff --git a/src/conf.c b/src/conf.c index 6edd705..a060827 100644 --- a/src/conf.c +++ b/src/conf.c @@ -971,6 +971,8 @@ int _config_read_file_core(struct mqtt3_config *config, bool reload, const char #else _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: TLS support not available."); #endif + }else if(!strcmp(token, "check_retain_source")){ + if(_conf_parse_bool(&token, "check_retain_source", &config->check_retain_source, saveptr)) return MOSQ_ERR_INVAL; }else if(!strcmp(token, "ciphers")){ #ifdef WITH_TLS if(reload) continue; // Listeners not valid for reloading. diff --git a/src/database.c b/src/database.c index 6de68a9..a952337 100644 --- a/src/database.c +++ b/src/database.c @@ -161,6 +161,7 @@ void mosquitto__db_msg_store_remove(struct mosquitto_db *db, struct mosquitto_ms db->msg_store_count--; if(store->source_id) _mosquitto_free(store->source_id); + if(store->source_username) _mosquitto_free(store->source_username); if(store->dest_ids){ for(i=0; idest_id_count; i++){ if(store->dest_ids[i]) _mosquitto_free(store->dest_ids[i]); @@ -518,24 +519,24 @@ int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *cont }else{ source_id = ""; } - if(mqtt3_db_message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1; + if(mqtt3_db_message_store(db, context, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1; return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &stored); } -int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id) +int mqtt3_db_message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id) { struct mosquitto_msg_store *temp; assert(db); assert(stored); - temp = _mosquitto_malloc(sizeof(struct mosquitto_msg_store)); + temp = _mosquitto_calloc(1, sizeof(struct mosquitto_msg_store)); if(!temp) return MOSQ_ERR_NOMEM; temp->ref_count = 0; - if(source){ - temp->source_id = _mosquitto_strdup(source); + if(source && source->id){ + temp->source_id = _mosquitto_strdup(source->id); }else{ temp->source_id = _mosquitto_strdup(""); } @@ -544,6 +545,18 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; } + + if(source && source->username){ + temp->source_username = _mosquitto_strdup(source->username); + if(!temp->source_username){ + _mosquitto_free(temp->source_id); + _mosquitto_free(temp); + return MOSQ_ERR_NOMEM; + } + } + if(source){ + temp->source_listener = source->listener; + } temp->source_mid = source_mid; temp->mid = 0; temp->qos = qos; @@ -552,6 +565,7 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t temp->topic = _mosquitto_strdup(topic); if(!temp->topic){ _mosquitto_free(temp->source_id); + _mosquitto_free(temp->source_username); _mosquitto_free(temp); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; @@ -564,6 +578,7 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t temp->payload = _mosquitto_malloc(sizeof(char)*payloadlen); if(!temp->payload){ if(temp->source_id) _mosquitto_free(temp->source_id); + if(temp->source_username) _mosquitto_free(temp->source_username); if(temp->topic) _mosquitto_free(temp->topic); if(temp->payload) _mosquitto_free(temp->payload); _mosquitto_free(temp); @@ -576,6 +591,7 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t if(!temp->source_id || (payloadlen && !temp->payload)){ if(temp->source_id) _mosquitto_free(temp->source_id); + if(temp->source_username) _mosquitto_free(temp->source_username); if(temp->topic) _mosquitto_free(temp->topic); if(temp->payload) _mosquitto_free(temp->payload); _mosquitto_free(temp); diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 8d19790..7d535cf 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -109,6 +109,7 @@ struct mqtt3_config { int auto_id_prefix_len; int autosave_interval; bool autosave_on_changes; + bool check_retain_source; char *clientid_prefixes; bool connection_messages; bool daemon; @@ -176,6 +177,8 @@ struct mosquitto_msg_store{ struct mosquitto_msg_store *prev; dbid_t db_id; char *source_id; + char *source_username; + struct _mqtt3_listener *source_listener; char **dest_ids; int dest_id_count; int ref_count; @@ -421,7 +424,7 @@ int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored); -int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); +int mqtt3_db_message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); void mosquitto__db_msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store); @@ -471,6 +474,7 @@ void mqtt3_bridge_packet_cleanup(struct mosquitto *context); /* ============================================================ * Security related functions * ============================================================ */ +int acl__find_acls(struct mosquitto_db *db, struct mosquitto *context); int mosquitto_security_module_init(struct mosquitto_db *db); int mosquitto_security_module_cleanup(struct mosquitto_db *db); diff --git a/src/persist.c b/src/persist.c index 7cf50b6..3f20b68 100644 --- a/src/persist.c +++ b/src/persist.c @@ -39,6 +39,8 @@ static uint32_t db_version; static int _db_restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos); +static int persist__read_string(FILE *db_fptr, char **str); +static int persist__write_string(FILE *db_fptr, const char *str, bool nullok); static struct mosquitto *_db_find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid) { @@ -148,10 +151,19 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr) }else{ tlen = 0; } - length = htonl(sizeof(dbid_t) + 2+strlen(stored->source_id) + + length = sizeof(dbid_t) + 2+strlen(stored->source_id) + sizeof(uint16_t) + sizeof(uint16_t) + 2+tlen + sizeof(uint32_t) + - stored->payloadlen + sizeof(uint8_t) + sizeof(uint8_t)); + stored->payloadlen + sizeof(uint8_t) + sizeof(uint8_t) + + 2*sizeof(uint16_t); + + if(stored->source_id){ + length += strlen(stored->source_id); + } + if(stored->source_username){ + length += strlen(stored->source_username); + } + length = htonl(length); i16temp = htons(DB_CHUNK_MSG_STORE); write_e(db_fptr, &i16temp, sizeof(uint16_t)); @@ -160,12 +172,15 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr) i64temp = stored->db_id; write_e(db_fptr, &i64temp, sizeof(dbid_t)); - slen = strlen(stored->source_id); - i16temp = htons(slen); - write_e(db_fptr, &i16temp, sizeof(uint16_t)); - if(slen){ - write_e(db_fptr, stored->source_id, slen); + if(persist__write_string(db_fptr, stored->source_id, false)) return 1; + if(persist__write_string(db_fptr, stored->source_username, true)) return 1; + if(stored->source_listener){ + i16temp = htons(stored->source_listener->port); + }else{ + i16temp = 0; } + write_e(db_fptr, &i16temp, sizeof(uint16_t)); + i16temp = htons(stored->source_mid); write_e(db_fptr, &i16temp, sizeof(uint16_t)); @@ -243,6 +258,60 @@ error: return 1; } + +static int persist__read_string(FILE *db_fptr, char **str) +{ + uint16_t i16temp; + uint16_t slen; + char *s = NULL; + + if(fread(&i16temp, 1, sizeof(uint16_t), db_fptr) != sizeof(uint16_t)){ + return MOSQ_ERR_INVAL; + } + + slen = ntohs(i16temp); + if(slen){ + s = _mosquitto_malloc(slen+1); + if(!s){ + fclose(db_fptr); + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } + if(fread(s, 1, slen, db_fptr) != slen){ + _mosquitto_free(s); + return MOSQ_ERR_NOMEM; + } + s[slen] = '\0'; + } + + *str = s; + return MOSQ_ERR_SUCCESS; +} + + +static int persist__write_string(FILE *db_fptr, const char *str, bool nullok) +{ + uint16_t i16temp, slen; + + if(str){ + slen = strlen(str); + i16temp = htons(slen); + write_e(db_fptr, &i16temp, sizeof(uint16_t)); + write_e(db_fptr, str, slen); + }else if(nullok){ + i16temp = htons(0); + write_e(db_fptr, &i16temp, sizeof(uint16_t)); + }else{ + return 1; + } + + return MOSQ_ERR_SUCCESS; +error: + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno)); + return 1; +} + + static int _db_subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, struct _mosquitto_subhier *node, const char *topic, int level) { struct _mosquitto_subhier *subhier; @@ -555,9 +624,9 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { dbid_t i64temp, store_id; uint32_t i32temp, payloadlen; - uint16_t i16temp, slen, source_mid; + uint16_t i16temp, source_mid, source_port = 0; uint8_t qos, retain, *payload = NULL; - char *source_id = NULL; + struct mosquitto source; char *topic = NULL; int rc = 0; struct mosquitto_msg_store *stored = NULL; @@ -574,41 +643,45 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) read_e(db_fptr, &i64temp, sizeof(dbid_t)); store_id = i64temp; - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - slen = ntohs(i16temp); - if(slen){ - source_id = _mosquitto_malloc(slen+1); - if(!source_id){ + memset(&source, 0, sizeof(struct mosquitto)); + + rc = persist__read_string(db_fptr, &source.id); + if(rc){ + _mosquitto_free(load); + return rc; + } + if(db_version == 4){ + rc = persist__read_string(db_fptr, &source.username); + if(rc){ _mosquitto_free(load); - fclose(db_fptr); - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + return rc; + } + read_e(db_fptr, &i16temp, sizeof(uint16_t)); + source_port = ntohs(i16temp); + if(source_port){ + for(int i=0; iconfig->listener_count; i++){ + if(db->config->listeners[i].port == source_port){ + source.listener = &db->config->listeners[i]; + break; + } + } } - read_e(db_fptr, source_id, slen); - source_id[slen] = '\0'; } + read_e(db_fptr, &i16temp, sizeof(uint16_t)); source_mid = ntohs(i16temp); /* This is the mid - don't need it */ read_e(db_fptr, &i16temp, sizeof(uint16_t)); - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - slen = ntohs(i16temp); - if(slen){ - topic = _mosquitto_malloc(slen+1); - if(!topic){ - _mosquitto_free(load); - fclose(db_fptr); - if(source_id) _mosquitto_free(source_id); - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } - read_e(db_fptr, topic, slen); - topic[slen] = '\0'; - }else{ - topic = NULL; + rc = persist__read_string(db_fptr, &topic); + if(rc){ + _mosquitto_free(load); + fclose(db_fptr); + _mosquitto_free(source.id); + return rc; } + read_e(db_fptr, &qos, sizeof(uint8_t)); read_e(db_fptr, &retain, sizeof(uint8_t)); @@ -624,7 +693,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) if(!payload){ _mosquitto_free(load); fclose(db_fptr); - if(source_id) _mosquitto_free(source_id); + if(source.id) _mosquitto_free(source.id); _mosquitto_free(topic); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; @@ -632,14 +701,14 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) read_e(db_fptr, payload, payloadlen); } - rc = mqtt3_db_message_store(db, source_id, source_mid, topic, qos, payloadlen, payload, retain, &stored, store_id); + rc = mqtt3_db_message_store(db, &source, source_mid, topic, qos, payloadlen, payload, retain, &stored, store_id); load->db_id = stored->db_id; load->store = stored; HASH_ADD(hh, db->msg_store_load, db_id, sizeof(dbid_t), load); - if(source_id) _mosquitto_free(source_id); + if(source.id) _mosquitto_free(source.id); _mosquitto_free(topic); _mosquitto_free(payload); @@ -648,7 +717,7 @@ error: strerror_r(errno, err, 256); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); fclose(db_fptr); - if(source_id) _mosquitto_free(source_id); + if(source.id) _mosquitto_free(source.id); if(topic) _mosquitto_free(topic); if(payload) _mosquitto_free(payload); return 1; @@ -679,35 +748,24 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) static int _db_sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { - uint16_t i16temp, slen; uint8_t qos; char *client_id; char *topic; int rc = 0; char err[256]; - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - slen = ntohs(i16temp); - client_id = _mosquitto_malloc(slen+1); - if(!client_id){ + rc = persist__read_string(db_fptr, &client_id); + if(rc){ fclose(db_fptr); - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + return rc; } - read_e(db_fptr, client_id, slen); - client_id[slen] = '\0'; - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - slen = ntohs(i16temp); - topic = _mosquitto_malloc(slen+1); - if(!topic){ - fclose(db_fptr); - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + rc = persist__read_string(db_fptr, &topic); + if(rc){ _mosquitto_free(client_id); - return MOSQ_ERR_NOMEM; + fclose(db_fptr); + return rc; } - read_e(db_fptr, topic, slen); - topic[slen] = '\0'; read_e(db_fptr, &qos, sizeof(uint8_t)); if(_db_restore_sub(db, client_id, topic, qos)){ @@ -756,7 +814,9 @@ int mqtt3_db_restore(struct mosquitto_db *db) * Is your DB change still compatible with previous versions? */ if(db_version > MOSQ_DB_VERSION && db_version != 0){ - if(db_version == 2){ + if(db_version == 3){ + /* Addition of source_username and source_port to msg_store chunk in v4, v1.5.6 */ + }else if(db_version == 2){ /* Addition of disconnect_t to client chunk in v3. */ }else{ fclose(fptr); diff --git a/src/persist.h b/src/persist.h index 808b05f..fb6f474 100644 --- a/src/persist.h +++ b/src/persist.h @@ -17,7 +17,7 @@ Contributors: #ifndef PERSIST_H #define PERSIST_H -#define MOSQ_DB_VERSION 3 +#define MOSQ_DB_VERSION 4 /* DB read/write */ const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'}; diff --git a/src/read_handle.c b/src/read_handle.c index ddc16ce..51e88d4 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -220,7 +220,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) } if(!stored){ dup = 0; - if(mqtt3_db_message_store(db, context->id, mid, topic, qos, payloadlen, payload, retain, &stored, 0)){ + if(mqtt3_db_message_store(db, context, mid, topic, qos, payloadlen, payload, retain, &stored, 0)){ _mosquitto_free(topic); if(payload) _mosquitto_free(payload); return 1; @@ -266,7 +266,7 @@ process_bad_message: case 2: mqtt3_db_message_store_find(context, mid, &stored); if(!stored){ - if(mqtt3_db_message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, 0)){ + if(mqtt3_db_message_store(db, context, mid, NULL, qos, 0, NULL, false, &stored, 0)){ return 1; } res = mqtt3_db_message_insert(db, context, mid, mosq_md_in, qos, false, stored); diff --git a/src/read_handle_server.c b/src/read_handle_server.c index 2b9c8f5..c075344 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -89,7 +89,6 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) uint8_t username_flag, password_flag; char *username = NULL, *password = NULL; int rc; - struct _mosquitto_acl_user *acl_tail; struct mosquitto_client_msg *msg_tail, *msg_prev; struct mosquitto *found_context; int slen; @@ -475,26 +474,8 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) do_disconnect(db, found_context); } - /* Associate user with its ACL, assuming we have ACLs loaded. */ - if(db->acl_list){ - acl_tail = db->acl_list; - while(acl_tail){ - if(context->username){ - if(acl_tail->username && !strcmp(context->username, acl_tail->username)){ - context->acl_list = acl_tail; - break; - } - }else{ - if(acl_tail->username == NULL){ - context->acl_list = acl_tail; - break; - } - } - acl_tail = acl_tail->next; - } - }else{ - context->acl_list = NULL; - } + rc = acl__find_acls(db, context); + if(rc) return rc; if(will_struct){ context->will = will_struct; diff --git a/src/security_default.c b/src/security_default.c index a1d3ec1..8a39995 100644 --- a/src/security_default.c +++ b/src/security_default.c @@ -482,6 +482,39 @@ static int _acl_cleanup(struct mosquitto_db *db, bool reload) return MOSQ_ERR_SUCCESS; } + +int acl__find_acls(struct mosquitto_db *db, struct mosquitto *context) +{ + struct _mosquitto_acl_user *acl_tail; + + /* Associate user with its ACL, assuming we have ACLs loaded. */ + if(db->acl_list){ + acl_tail = db->acl_list; + while(acl_tail){ + if(context->username){ + if(acl_tail->username && !strcmp(context->username, acl_tail->username)){ + context->acl_list = acl_tail; + break; + } + }else{ + if(acl_tail->username == NULL){ + context->acl_list = acl_tail; + break; + } + } + acl_tail = acl_tail->next; + } + if(context->username && context->acl_list == NULL){ + return MOSQ_ERR_INVAL; + } + }else{ + context->acl_list = NULL; + } + + return MOSQ_ERR_SUCCESS; +} + + static int _pwfile_parse(const char *file, struct _mosquitto_unpwd **root) { FILE *pwfile; diff --git a/src/subs.c b/src/subs.c index 4f64b3e..7aed30f 100644 --- a/src/subs.c +++ b/src/subs.c @@ -681,6 +681,26 @@ static int _retain_process(struct mosquitto_db *db, struct mosquitto_msg_store * return rc; } + /* Check for original source access */ + if(db->config->check_retain_source && retained->source_id){ + struct mosquitto retain_ctxt; + memset(&retain_ctxt, 0, sizeof(struct mosquitto)); + + retain_ctxt.id = retained->source_id; + retain_ctxt.username = retained->source_username; + retain_ctxt.listener = retained->source_listener; + + rc = acl__find_acls(db, &retain_ctxt); + if(rc) return rc; + + rc = mosquitto_acl_check(db, &retain_ctxt, retained->topic, MOSQ_ACL_WRITE); + if(rc == MOSQ_ERR_ACL_DENIED){ + return MOSQ_ERR_SUCCESS; + }else if(rc != MOSQ_ERR_SUCCESS){ + return rc; + } + } + if (db->config->upgrade_outgoing_qos){ qos = sub_qos; } else {