본문 바로가기

생활코딩

LG ThinQ API로 가전 제품과 연동하기 - 2 (MQTT)

반응형
LG ThinQ Open API 연동 시리즈
1. LG ThinQ API 로 가전 제품과 연동하기  
2. LG ThinQ API로 가전 제품과 연동하기 - 2 (MQTT)  
3. 라즈베리파이 - Home Assistant - LG ThinQ 연동

 

ThinQ API 체계에서는 기기에서 발생하는 푸시 메시지나 상태 변경에 대한 이벤트 메시지를 MQTT 방식으로 제공합니다. 이벤트/푸시 구독 API를 통해서 특정 기기에 대해서 구독을 하면 이벤트나 푸시가 있는 경우 MQTT 브로커러 발행을 합니다. MQTT Subscriber(또는 클라이언트)로 접속하여 이 메시지를 지속적으로 수신할 수 있습니다.

1. 이벤트/푸시 구독

이벤트와 푸시 모두 구독 목록/구독/구독 해제 API로 구성돼 있고, 추가적으로 푸시쪽에는 디바이스 추가/삭제 알림 구독에 대한 클라이언트 등록/조회/삭제 API가 있습니다. 여기서는 이벤트와 푸시 구독 API만 살펴 보도록 하겠습니다. 다음은 푸시를 구독하는 코드입니다.

import asyncio
from pprint import pprint

from aiohttp import ClientSession
from thinqconnect import ThinQApi


async def main():
    async with ClientSession() as session:
        thinq_api = ThinQApi(
            session=session,
            access_token='자신의PAT',
            country_code='KR',
            client_id='등록한클라이언트아이디',
        )

        response = await thinq_api.async_post_push_subscribe(
            '디바이스아이디',
        )

        pprint(response)


if __name__ == '__main__':
    asyncio.run(main())

 

이벤트의 경우에는 thinq_api.sync_post_push_subscribe 만  thisq_api.sync_post_event_scribe로 변경해 주면 됩니다.

2. MQTT Subscriber(Client)

MQTT Subscriber 접속을 위해서는 MQTT 브로커 주소도 알아야하고 여러가지 작업들을 해야합니다. 브로커 주소(ThinQ 문서에서는 브로커 대신 서버라고 말하고 있습니다)는 API를 통해 확인할 수 있습니다. 브로커 주소와 GitHub의 내용을 보면 AWS IoT Core를 사용하는 것으로 보입니다. 이를 구독하기 위해서는 일반적인 MQTT 라이브러리를 사용하거나 AWS에서 제공하는 라이브러리를 사용해야 하는데 번거롭고 추가적인 내용들을 알아야 합니다. 그런데, 고맙게도 pythinqconnect 패키지에서 비교적 쉽게 접속 및 사용할 수 있도록 기능을 제공하고 있습니다(다만, 아직 사용법에 대한 설명이 없는게 아쉽습니다. 곧 추가 되겠죠).

import asyncio
import json
import signal
from pprint import pprint

from aiohttp import ClientSession
from thinqconnect import ThinQMQTTClient, ThinQApi


class ThinqMqttSubscriber:
    CLIENT_ID = '등록한클라이언트아이디'

    def __init__(self):
        # 로그 레벨 전역 설정. 디버깅이 필요 없으면 삭제나 레벨 변경.
        logging.basicConfig(level=logging.DEBUG)
        self.mqtt_client: ThinQMQTTClient | None = None

    @classmethod
    async def create(cls):
        session = ClientSession()
        thinq_api = ThinQApi(
            session=session,
            access_token='자신의PAT',
            country_code='KR',
            client_id=ThinqMqttSubscriber.CLIENT_ID,
        )

        instance = cls()
        instance.mqtt_client = await ThinQMQTTClient(
            thinq_api=thinq_api,
            client_id=ThinqMqttSubscriber.CLIENT_ID,
            on_message_received=instance.on_message_received,
            on_connection_success=instance.on_connection_success,
            on_connection_failure=instance.on_connection_failure,
        )

        return instance

    async def execute(self):
        # MQTT 준비 (CSR 생성 및 인증서 발급)
        if not await self.mqtt_client.async_prepare_mqtt():
            print('MQTT preparation failed')
            return

        await self.mqtt_client.async_connect_mqtt()

        # 메시지를 계속 받기 위해 무한 대기
        stop_event = asyncio.Event()

        def shutdown():
            print("Shutdown signal received! Disconnecting...")
            stop_event.set()

        signal.signal(signal.SIGINT, lambda s, f: shutdown())

        await stop_event.wait()

        # 연결 해제
        await self.mqtt_client.async_disconnect()

    @staticmethod
    async def main():
        task = await ThinqMqttSubscriber.create()
        await task.execute()

    def on_message_received(self, topic, payload, **kwargs):
        """MQTT 메시지 수신 콜백"""
        print(f"Message received on topic {topic}")
        """수신된 pyload 문자열을 Python Python Dictionary로 변환"""
        data = json.loads(payload.decode('utf-8'))
        pprint(data)

    def on_connection_success(self, connection, callback_data):
        """MQTT 연결 성공 콜백"""
        print(f"MQTT Connection Successful: {callback_data}")

    def on_connection_failure(self):
        """MQTT 연결 실패 콜백"""
        print("MQTT Connection Failed")


if __name__ == '__main__':
    asyncio.run(ThinqMqttSubscriber.main())

 

pythinqconnect가 비동기 방식을 사용해서 코드가 살짝 복잡하기는 합니다만, 이렇게 코드를 구성하면 그 이후는 on_message_received 부분에만 집중하면 됩니다. 이벤트나 푸시가 발생하면 수신되어 data라는 디셔너리로 변환되기 때문에 data 변수의 내용을 참고하여 원하는대로 데이터를 사용합니다.

수신되는 메시지의 형태는 아래와 같은데, 그 중에 report 부분이 디바이스 프로파일에서 해당 기기의 요청/응답 스키마에서 응답 스키마 부분과 같습니다. 그래서, data['report'] 부분에만 집중하면 됩니다.

 

수신 메시지 예: 스틱청소기 / 충전이 완료된 것에 대한 푸시

Topic: app/clients/등록한클라이언트/push

{
 'deviceId': '디바이스아이디',
 'deviceType': 'DEVICE_STICK_CLEANER',
 'pushType': 'DEVICE_STATUS',
 'report': {'runState': {'currentState': 'CHARGING_COMPLETE'}},
 'serviceId': '서비스아이디',
 'userList': ['자신의사용자아이디']
 }

 

이것으로 MQTT로 연동하는 것은 마무리하도록 하겠습니다.