MQTT 프로토콜과 RS-485를 활용하여 스마트 홈 장치들의 상태를 관리하고 제어하는 스크립트입니다.
1. 라이브러리 임포트
socket
,threading
,serial
: 네트워크 통신과 멀티스레딩을 위한 기본 라이브러리.paho.mqtt.client
: MQTT 프로토콜을 위한 클라이언트 라이브러리.json
,sys
,time
,logging
: 설정 파일 처리, 시스템 관련 작업, 로깅을 위한 라이브러리.
2. 디바이스 설정
RS485_DEVICE
: 각 장치의 ID, 명령어 코드 및 상태를 정의.DISCOVERY_DEVICE
,DISCOVERY_PAYLOAD
: Home Assistant와의 연동을 위한 MQTT 디스커버리 정보.
3. MQTT 및 로깅 초기화
mqtt
: MQTT 클라이언트 객체 생성 및 연결 설정.- 로깅 설정: 로그 파일 생성 및 포맷 설정.
4. 네트워크 소켓 및 시리얼 통신 클래스 (EzVilleSocket)
EzVilleSocket
클래스는 소켓 연결, 데이터 수신 및 송신 관련 메소드를 포함합니다. 여기서는 RS-485 프로토콜을 통해 장치와의 통신을 처리합니다.- 자이 스마트홈(Xi SmartHome) 이지빌(ezVille) 월패드 기준이지만, 코콤(Kocom)이나 삼성(Samsung) SDS 등 다양한 아파트 월패드에서도 활용이 가능합니다.
5. MQTT 메시지 처리
mqtt_on_connect
,mqtt_on_message
,mqtt_on_disconnect
: MQTT 서버 연결, 메시지 수신 및 연결 해제시 동작 정의.mqtt_device
: MQTT를 통해 수신된 메시지를 기반으로 RS-485 명령 생성 및 전송.
6. RS-485 데이터 패킷 처리
serial_generate_checksum
,serial_verify_checksum
: 패킷의 체크섬 생성 및 검증.serial_receive_state
: RS-485로부터 수신된 상태 데이터 처리 및 MQTT를 통해 Home Assistant로 전송.
7. 데몬 및 이벤트 루프
daemon
: 연속적인 RS-485 통신 및 MQTT 메시지 처리를 위한 루프.init_connect
: RS-485 통신 초기화 및 상태 덤프.
8. 메인 실행
- 스크립트 실행 시, MQTT 루프 시작, 네트워크 소켓 설정, 백그라운드 스레드 시작 등 초기화 작업 실행.
이 스크립트는 다양한 스마트 홈 장치의 상태를 모니터링하고 제어하는 복합적인 기능을 수행하며, 각 장치의 상태 변경 시 Home Assistant에 자동으로 업데이트되도록 설계되어 있습니다. 모든 기능은 실시간으로 수행되며, MQTT를 통한 원격 제어와 로컬 네트워크를 통한 직접 제어가 결합되어 있습니다.
아래는 코드를 단계별로 분리하고 각 모듈의 기능에 따라 구분하였으며, 향후 유지보수와 확장성을 고려한 구조입니다.
1. 설정 파일 관리 (config_manager.py)
설정 파일을 관리하는 코드입니다. 외부 JSON 설정 파일을 로드하여 필요한 구성 정보를 제공합니다.
import json
import os
class ConfigManager:
def __init__(self, default_config_path, user_config_path):
self.default_config = self.load_config(default_config_path)
self.user_config = self.load_config(user_config_path)
self.config = self.merge_configs(self.default_config, self.user_config)
@staticmethod
def load_config(path):
with open(path, 'r', encoding='utf-8') as file:
return json.load(file)
def merge_configs(self, default, user):
for key, value in user.items():
if isinstance(value, dict):
default[key].update(value)
else:
default[key] = value
return default
def get_config(self):
return self.config
2. 로깅 설정 (logger_manager.py)
로깅을 설정하는 코드입니다. 파일 및 콘솔 로깅을 설정하고, 로그 포맷을 정의합니다.
import logging
from logging.handlers import TimedRotatingFileHandler
import os
def init_logger(level=logging.INFO, log_to_file=False, filename='app.log'):
logger = logging.getLogger(__name__)
logger.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if log_to_file:
os.makedirs(os.path.dirname(filename), exist_ok=True)
file_handler = TimedRotatingFileHandler(filename, when='midnight', backupCount=7)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
3. MQTT 통신 관리 (mqtt_manager.py)
MQTT 통신을 관리하는 코드입니다. MQTT 서버에 연결하고 메시지를 수신 및 발행합니다.
import paho.mqtt.client as mqtt
class MQTTManager:
def __init__(self, config):
self.client = mqtt.Client()
self.config = config
self.setup_callbacks()
def setup_callbacks(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flags, rc):
print("Connected with result code "+str(rc))
self.client.subscribe(self.config['topic'])
def on_message(self, client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
def start(self):
self.client.connect(self.config['server'], self.config['port'], 60)
self.client.loop_forever()
def publish(self, topic, payload):
self.client.publish(topic, payload)
4. 주 프로그램 (main.py)
메인 프로그램 코드입니다. 필요한 설정을 로드하고 로깅을 설정하며, MQTT 통신을 시작합니다.
from config_manager import ConfigManager
from logger_manager import init_logger
from mqtt_manager import MQTTManager
def main():
config_manager = ConfigManager('default_config.json', 'user_config.json')
config = config_manager.get_config()
logger = init_logger(level=config['logging']['level'], log_to_file=config['logging']['log_to_file'], filename=config['logging']['filename'])
mqtt_config = config['mqtt']
mqtt_manager = MQTTManager(mqtt_config)
mqtt_manager.start()
if __name__ == "__main__":
main()
5. MQTT 메시지 처리 (mqtt_manager.py 확장)
MQTT 메시지를 수신하고 발송하는 구체적인 로직을 추가합니다. 이 과정에서 MQTT 토픽을 구독하고, 특정 이벤트에 따라 메시지를 발행하는 기능을 포함합니다.
import paho.mqtt.client as mqtt
class MQTTManager:
def __init__(self, config, message_handler):
self.client = mqtt.Client()
self.config = config
self.message_handler = message_handler
self.setup_callbacks()
def setup_callbacks(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Connected successfully.")
self.client.subscribe(self.config['topic'])
else:
print(f"Failed to connect with result code {rc}")
def on_message(self, client, userdata, msg):
print(f"Received message on {msg.topic}: {msg.payload.decode()}")
self.message_handler.handle_message(msg.topic, msg.payload.decode())
def on_disconnect(self, client, userdata, rc):
print("Disconnected from MQTT server with result code {}".format(rc))
def start(self):
self.client.connect(self.config['server'], self.config['port'], 60)
self.client.loop_start() # Start the network loop in a non-blocking way.
def publish(self, topic, payload):
self.client.publish(topic, payload)
def stop(self):
self.client.loop_stop() # Stop the network loop.
self.client.disconnect() # Disconnect cleanly from the broker.
6. 메시지 처리 핸들러 (message_handler.py)
메시지를 처리하는 로직을 담당하는 클래스를 구현합니다. 이 클래스는 MQTT에서 수신한 메시지를 적절히 처리하고 필요한 경우 다른 MQTT 토픽으로 메시지를 발행합니다.
class MessageHandler:
def __init__(self, mqtt_manager):
self.mqtt_manager = mqtt_manager
def handle_message(self, topic, message):
print(f"Handling message from {topic}: {message}")
# 메시지에 따라 필요한 로직을 구현합니다.
# 예를 들어, 특정 조건을 충족하면 다른 토픽으로 메시지를 발송할 수 있습니다.
if "request" in message:
response = self.process_request(message)
self.mqtt_manager.publish("response/topic", response)
def process_request(self, message):
# 요청을 처리하고 응답 메시지를 생성합니다.
return "Processed message: " + message
7. 주 프로그램 (main.py 확장)
메인 프로그램에서 MQTT 메니저와 메시지 핸들러를 초기화하고 연결합니다.
from config_manager import ConfigManager
from logger_manager import init_logger
from mqtt_manager import MQTTManager
from message_handler import MessageHandler
def main():
config_manager = ConfigManager('default_config.json', 'user_config.json')
config = config_manager.get_config()
logger = init_logger(level=config['logging']['level'], log_to_file=config['logging']['log_to_file'], filename=config['logging']['filename'])
mqtt_config = config['mqtt']
message_handler = MessageHandler(None) # Initialize message handler without mqtt_manager.
mqtt_manager = MQTTManager(mqtt_config, message_handler)
message_handler.mqtt_manager = mqtt_manager # Set the mqtt_manager for the handler.
mqtt_manager.start()
try:
while True:
pass # Keep the application running to handle incoming messages.
except KeyboardInterrupt:
print("Application stopped by user.")
finally:
mqtt_manager.stop()
if __name__ == "__main__":
main()
이러한 추가된 단계들은 MQTT 메시지의 수신과 발송 로직을 구현하며, 실제 메시지 처리 기능을 포함합니다. 각 컴포넌트는 독립적으로 작동하면서 필요에 따라 상호 작용하게 설계되어 있습니다.
Home Assistant에서 디바이스가 추가되지 않는 문제를 해결하기 위해, MQTT 디스커버리 메시지가 제대로 전송되고 있는지 확인하고, 패킷 처리 로직을 점검해야 합니다. 디스커버리 메시지가 올바르게 전송되지 않으면 Home Assistant가 디바이스를 인식하지 못할 수 있습니다. 여기서 중요한 점은 디스커버리 메시지가 올바른 형식으로 전송되었는지, 그리고 Home Assistant가 이를 올바르게 수신했는지 확인하는 것입니다. 이를 위해, 패킷 처리 로직과 디스커버리 메시지 전송 로직을 점검하는 방법입니다.
1. mqtt_discovery 함수 확인
디스커버리 메시지를 올바르게 전송하기 위해 mqtt_discovery
함수에 retain 플래그를 설정합니다.
def mqtt_discovery(payload):
intg = payload.pop("_intg")
# MQTT 통합구성요소에 등록되기 위한 추가 내용
payload["device"] = DISCOVERY_DEVICE
payload["uniq_id"] = payload["name"]
# discovery에 등록
topic = f"homeassistant/{intg}/ezville_wallpad/{payload['name']}/config"
logger.info("Add new device: %s", topic)
mqtt.publish(topic, json.dumps(payload), retain=True)
2. serial_new_device 함수 확인
현관벨에 대한 디스커버리 메시지를 전송하는 로직을 확인합니다.
def serial_new_device(device, packet, idn=None):
prefix = Options["mqtt"]["prefix"]
if device == "doorbell":
payload = {
"_intg": "binary_sensor",
"~": f"{prefix}/doorbell",
"name": f"{prefix}_doorbell",
"stat_t": "~/state",
"dev_cla": "door",
}
mqtt_discovery(payload)
# 다른 장치들에 대한 Discovery...
3. serial_receive_state 함수 수정
패킷을 수신하여 적절히 처리하고 디스커버리 메시지를 전송합니다.
def serial_receive_state(device, packet):
form = RS485_DEVICE[device]["state"]
last = RS485_DEVICE[device]["last"]
idn = (packet[1] << 8) | packet[2]
if last.get(idn) == packet:
return
if Options["mqtt"]["_discovery"] and not last.get(idn):
serial_new_device(device, packet, idn)
last[idn] = True
return
else:
last[idn] = packet
prefix = Options["mqtt"]["prefix"]
if device == "doorbell":
event = {
0x02: "ring",
0x82: "talk",
0x01: "open",
0x11: "cancel",
}.get(packet[3], "unknown")
if event != "unknown":
topic = f"{prefix}/{device}/{event}/state"
value = "ON" if event == "ring" else "OFF"
mqtt.publish(topic, value)
logger.debug("publish to HA: %s = %s (%s)", topic, value, packet.hex())
# 다른 장치들에 대한 처리...
4. Home Assistant 설정 확인
configuration.yaml
파일을 확인하여 Home Assistant의 MQTT 설정이 올바른지 확인합니다.
mqtt:
broker: your_mqtt_broker_address
port: 1883
username: your_username
password: your_password
discovery: true
discovery_prefix: homeassistant
5. Home Assistant 로그 확인
Home Assistant 로그를 확인하여 디스커버리 메시지가 수신되었는지, 오류 메시지가 있는지 확인합니다.
tail -f /config/home-assistant.log
6. 디버깅
디스커버리 메시지가 제대로 전송되고 있는지 확인하기 위해 MQTT 클라이언트를 사용하여 디스커버리 메시지를 수신합니다.
mosquitto_sub -h your_mqtt_broker_address -t "homeassistant/#" -u "your_username" -P "your_password" -v
위 단계를 통해 문제를 진단하고 해결할 수 있습니다. 패킷이 제대로 수신되고 디스커버리 메시지가 올바르게 전송되었는지 확인한 후에도 문제가 지속되면, Home Assistant의 MQTT 통합 및 설정을 다시 점검해 보세요.
댓글