잡동사니

임베디드환경에서 MQTT 사용해보기 본문

IT

임베디드환경에서 MQTT 사용해보기

yeTi 2021. 2. 22. 14:19

안녕하세요. yeTi입니다.
오늘은 임베디드환경(tx2)기반에서 MQTT 브로커를 설치하고 메세징을 해보려고 합니다.

MQTT brocker 설치

MQTT란, IoT용 message broker 선택하기를 기반으로 mosquittomqtt broker로 진행했습니다.

도커 이미지를 기반으로 mosquitto broker를 구동합니다.

$ docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto

Subscriber 구현

먼저 mqtt를 활용하기 위해서 paho 모듈을 설치합니다.

$ pip install paho-mqtt==1.5.1

다음과 같이 Subscriber 클래스를 작성합니다.

import paho.mqtt.client as mqtt

class Subscriber:

    def __init__(self, on_message):
        self._set_client(on_message)

    def _set_client(self, on_message):
        self.mqttc = mqtt.Client()

        self.mqttc.on_connect = self._on_connect
        self.mqttc.on_disconnect = self._on_disconnect
        self.mqttc.on_subscribe = self._on_subscribe
        self.mqttc.on_message = on_message

    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Subscriber connected")
        else:
            print("Bad connection Returned code=", rc)

    def _on_disconnect(self, client, userdata, flags, rc=0):
        print(str(rc))

    def _on_subscribe(self, client, userdata, mid, granted_qos):
        print("subscribed: " + str(mid) + " " + str(granted_qos))

    def start(self):
        self.mqttc.connect('localhost', 1883)

        self.mqttc.subscribe('common/test', 1)
        self.mqttc.loop_forever()

    def stop(self):
        self.mqttc.disconnect()

Publisher구현

다음과 같이 Publisher 클래스를 작성합니다.

import paho.mqtt.client as mqtt

class Publisher:

    def __init__(self):
        self._set_client()

    def _set_client(self):
        self.mqttc = mqtt.Client()

        self.mqttc.on_connect = self._on_connect
        self.mqttc.on_disconnect = self._on_disconnect
        self.mqttc.on_publish = self._on_publish

    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Publisher connected")
        else:
            print("Bad connection Returned code=", rc)

    def _on_disconnect(self, client, userdata, flags, rc=0):
        print(str(rc))

    def _on_publish(self, client, userdata, mid):
        print("Publish success, callback mid= ", mid)

    def start(self):
        self.mqttc.connect('localhost', 1883)
        self.mqttc.loop_start()

    def stop(self):
        self.mqttc.loop_stop()
        self.mqttc.disconnect()

    def publish(self, data):
        self.mqttc.publish('common/test', data, 1)

예제 구동하기

다음과 같이 subscriber 예제를 구현합니다.

from subscriber import Subscriber

def on_message(client, userdata, msg):
#    print('subscribe msg:', msg)
    print(str(msg.payload.decode("utf-8")))

if __name__=='__main__':
    subscriber = Subscriber(on_message)
    subscriber.start()

다음과 같이 publisher 예제를 구현합니다.

import time
from publisher import Publisher

if __name__=='__main__':
    publisher = Publisher()
    publisher.start()

    while True:
        publisher.publish('msg 1')
        time.sleep(1)

각각의 예제를 구동하면 msg 1이라는 데이터를 송신하고 수신하는것을 확인할 수 있습니다.

Comments