Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some Fixes and Improvements ("Named MQTT Topics") #12

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
25 changes: 19 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Each configuration option is also available as command line argument.
| `mqtt_tls_version` | 'TLSv1.2' | `--mqtt_tls_version` | The TLS version to use for MQTT. One of TLSv1, TLSv1.1, TLSv1.2. |
| `mqtt_verify_mode` | 'CERT_REQUIRED' | `--mqtt_verify_mode` | The SSL certificate verification mode. One of CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED. |
| `mqtt_ssl_ca_path` | - | `--mqtt_ssl_ca_path` | The SSL certificate authority file to verify the MQTT server. |
| `mqtt_retain` | false | | Retain messages published on mqtt |
| `mqtt_resend_retained` | false | | Resend retained messages received on mqtt to knx |
| `mqtt_tls_no_verify` | - | `--mqtt_tls_no_verify` | Do not verify SSL/TLS constraints like hostname. |
| `knx_host` | 'localhost' | `--knx_host` | The address of the KNX tunnel device. |
| `knx_port` | 3671 | `--knx_port` | The port of the KNX tunnel device. |
Expand All @@ -75,32 +77,43 @@ Feel free to add routing or other options and open a pull request for this.

Then you can configure your bus topology as items.

Each item need an `address` (the group address) and a `type`.

Optionally you can specify `override_topic`, then the representation on mqtt will not be `1/2/3` (Adress) but the provided topic name.

The default operating mode for an object is to listen on the KNX and publish the telegram values to MQTT.

That may be changed using the following settings:

* `mqtt_subscribe` (default: false): if set to `true`, changes on any related MQTT topic will be processed

```
...
"items": [
{
"address": "5/0/10",
"type": "DPTTemperature"
"type": "DPTTemperature",
"mqtt_subscribe": true
},
{
"address": "5/0/20",
"type": "DPTHumidity"
},
{
"override_topic": "use/this/on/mqtt",
"address": "5/0/29",
"type": "DPTHumidity"
},
...
]
...
```

Each item need an `address` (the group address) and a `type`.
Unfortunately, the list of types is not part of the xknx documentation.
But the examples in the file I provide with the project may fit for the most purposes.
All supported types can be found in the [xknx sources](https://github.com/XKNX/xknx/blob/main/xknx/dpt/__init__.py).

The default operating mode for an object is to listen on the KNX and publish the telegram values to MQTT.

That may be changed using the following settings:

* `mqtt_subscribe` (default: false): if set to `true`, changes on any related MQTT topic will be processed

### Publishing

Expand Down
72 changes: 56 additions & 16 deletions knx2mqtt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ from xknx.io.knxip_interface import ConnectionType, ConnectionConfig
from xknx.telegram.address import GroupAddress, IndividualAddress
from xknx.telegram.apci import GroupValueWrite
from xknx.telegram import Telegram, TelegramDirection
from logging.handlers import TimedRotatingFileHandler


XKNX_DPT_MODULE_STR = "xknx.dpt"
Expand All @@ -41,7 +42,7 @@ knx_tunnel = None
mqtt_client = None
daemon_args = None
item_states = None

item_redirects = {}

def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
Expand All @@ -62,7 +63,7 @@ def extract_payload_from_telegram(group_address, telegram):
dpt_type = get_dpt_type_for_address(group_address)
payload = telegram.payload

logging.info("Address: {}, DPT Type: {}, Payload: {}".format(group_address, dpt_type, payload))
logging.info("Extracting KNX Payload: Address: {}, DPT Type: {}, Payload: {}".format(group_address, dpt_type, payload))

value = None

Expand All @@ -83,7 +84,7 @@ def extract_payload_from_telegram(group_address, telegram):
def create_payload_for_telegram(group_address, value):
dpt_type = get_dpt_type_for_address(group_address)

logging.info("Address: {}, DPT Type: {}, Value: {}".format(group_address, dpt_type, value))
logging.info("Creating Payload for KNX: Address: {}, DPT Type: {}, Value: {}".format(group_address, dpt_type, value))

payload = None

Expand All @@ -92,7 +93,7 @@ def create_payload_for_telegram(group_address, value):
payload = DPTBinary(int(str(value).lower() in ['true', '1', 'on', 'yes']))
else:
dpt_class = getattr(importlib.import_module(XKNX_DPT_MODULE_STR), dpt_type)
payload = DPTArray(dpt_class.to_knx(value))
payload = dpt_class.to_knx(value)
except Exception as e:
eprint(traceback.format_exc())

Expand All @@ -102,7 +103,7 @@ def create_payload_for_telegram(group_address, value):
def publish_to_knx(address, payload):
global daemon_args, knx_tunnel

logging.info("Address: {}, Payload: {}".format(address, payload))
logging.info("Publish to KNX: Address: {}, Payload: {}".format(address, payload))

source_address = IndividualAddress(daemon_args.knx_individual_address)
group_address = GroupAddress(address)
Expand All @@ -123,14 +124,17 @@ def publish_to_knx(address, payload):


def publish_to_mqtt(address, value):
global mqtt_client, daemon_args, item_states
global mqtt_client, daemon_args, item_states, item_redirects

topic = "{}/{}".format(daemon_args.mqtt_topic, address)
if address in item_redirects:
topic = "{}/{}".format(daemon_args.mqtt_topic, item_redirects[address])
else:
topic = "{}/{}".format(daemon_args.mqtt_topic, address)

item_states[address] = str(value)

logging.info("Topic: {}, Payload: {}".format(topic, value))
mqtt_client.publish(topic, str(value))
logging.info("Publisht to MQTT: Topic: {}, Payload: {}".format(topic, value))
mqtt_client.publish(topic, str(value), retain=daemon_args.mqtt_retain)
if daemon_args.timestamp:
mqtt_client.publish("{}/timestamp".format(topic), time.time(), retain=True)

Expand All @@ -157,16 +161,23 @@ def on_mqtt_connect(client, userdata, flags, reason_code, properties):


def on_mqtt_received(client, userdata, message):
global daemon_args, item_states
global daemon_args, item_states, item_redirects

try:
group_address = message.topic.replace("{}/".format(daemon_args.mqtt_topic), '')
value = str(message.payload.decode())

if group_address in item_redirects:
group_address = item_redirects[group_address]

if item_states[group_address] == value:
logging.info("Received value for {} is last state sent: {}".format(group_address, value))
return


if message.retain and not daemon_args.mqtt_resend_retained:
logging.info("Received value for {} is a retained value. Not resending due to config: {}".format(group_address, value))
return

payload = create_payload_for_telegram(group_address, value)

publish_to_knx(group_address, payload)
Expand Down Expand Up @@ -302,6 +313,10 @@ def parse_config():

with open(daemon_args.config, "r") as config_file:
data = json.load(config_file)

daemon_args.mqtt_retain = data['mqtt_retain'] if 'mqtt_retain' in data else False
daemon_args.mqtt_resend_retained = data['mqtt_resend_retained'] if 'mqtt_resend_retained' in data else False

if 'mqtt_host' in data:
daemon_args.mqtt_host = data['mqtt_host']
if 'mqtt_port' in data:
Expand Down Expand Up @@ -345,7 +360,7 @@ def parse_config():


def init_items():
global daemon_args, item_states
global daemon_args, item_states, item_redirects

daemon_args.dpt_types = {}
daemon_args.mqtt_subscribe = []
Expand All @@ -363,8 +378,17 @@ def init_items():

if 'type' in item:
daemon_args.dpt_types[item['address']] = item['type']
if item['mqtt_subscribe']:
daemon_args.mqtt_subscribe.append(item['address'])

if 'override_topic' in item:
item_redirects[item['address']] = item['override_topic']
item_redirects[item['override_topic']] = item['address']

if item['mqtt_subscribe']:
daemon_args.mqtt_subscribe.append(item['override_topic'])
else:
if item['mqtt_subscribe']:
daemon_args.mqtt_subscribe.append(item['address'])

if item['knx_subscribe']:
daemon_args.knx_subscribe.append(GroupAddress(item['address']))

Expand All @@ -374,15 +398,31 @@ def main():
daemon_args = parse_args()
parse_config()
init_items()

# Verbosity
if daemon_args.verbose:
logging.basicConfig(level=logging.DEBUG)
loglevel = logging.DEBUG
else:
loglevel = logging.INFO

logDir = "/var/log/knx2mqtt"

if not os.path.exists(logDir):
os.mkdir(logDir)

logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=loglevel,
handlers=[
TimedRotatingFileHandler(logDir + "/current.log", when="midnight", interval=1, backupCount=14),
logging.StreamHandler()
])

# MQTT connection
mqtt_client = init_mqtt()
mqtt_client.loop_start()
# KNX connection
start_knx()


if __name__ == "__main__":
main()
21 changes: 14 additions & 7 deletions knx2mqtt.conf.example
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
{
"mqtt_host": "localhost",
"mqtt_port": "1883"
"mqtt_port": "keepalive"
"mqtt_clientid": "knx2mqtt"
"mqtt_port": 1883,
"mqtt_clientid": "knx2mqtt",
"mqtt_keepalive": 30,
"mqtt_user": "knx2mqtt",
"mqtt_password": "t0p_s3cr3t",
"mqtt_topic": "bus/knx",
"mqtt_tls": "false",
"mqtt_tls": false,
"mqtt_retain": false,
"mqtt_resend_retained": false,
"mqtt_tls_version": "TLSv1.2",
"mqtt_verify_mode": "CERT_NONE",
"mqtt_ssl_ca_path": "/etc/ssl/myca.pem",
"mqtt_tls_no_verify": "false",
"mqtt_tls_no_verify": false,
"knx_host": "10.0.0.11",
"knx_local_ip": "10.0.7.12",
"knx_individual_address": "15.15.250",
"knx_no_queue": "true",
"verbose": "false",
"knx_no_queue": true,
"verbose": false,
"items": [
{
"address": "4/0/11",
Expand All @@ -37,6 +39,11 @@
{
"address": "5/0/18",
"type": "DPTPartsPerMillion"
},
{
"override_topic": "use/this/on/mqtt/instead/of/address",
"address": "6/1/29",
"type": "DPTTemperature"
}
]
}
4 changes: 2 additions & 2 deletions run
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
# (c) Gerrit Beine, 2019-2023
#

. venv/bin/activate
. /usr/local/lib/knx2mqtt/venv/bin/activate

exec /usr/bin/env python knx2mqtt
exec /usr/bin/env python3 /usr/local/lib/knx2mqtt/knx2mqtt