본문 바로가기

생활코딩

라즈베리파이 먼지센서 사용하기

라즈베리파이에 먼지센서(PMS7003) 장착 및 사용 방법입니다.

환경 및 준비물

라즈베리파이 설정

UART

PMS7003 먼지센서와 라즈베리파이는 UART를 사용하여 통신하게 됩니다. UART는 간단히 얘기하면 직렬 통신의 한 방식이자 해당 방식의 장치라고 할 수 있습니다. 라즈베리파이에는 다음과 같이 두 가지 유형의 UART가 있습니다. 하나는 PL001이고 다른 하나는 mini UART입니다

  • PL001: 16550 호한 UART
  • mini UART: 보다 적은 기능을 갖는 UART
    • 사용하기 위해서는 라즈베리파이의 VPU 코어 클럭을 고정 시켜야 합니다. 이유는 mini UART의 클록이 VPU와 연결돼 있어서 코어 클록이 변경되는 경우 mini UART의 baud rate이 변경되기 때문입니다.
    • 패리티 비트가 없습니다.
    • 프레임 오류 검출이 없습니다.
    • PL001과의 보다 상세한 차이점은 여기에서 확인하실 수 있습니다.

여러모로 봤을 때 PL001 유형을 사용하는 것이 좋습니다.

라즈베리파이 4에는 기본적인 PL001과 miniUART가 한 개씩 있고 추가적으로 4개의 PL001이 있습니다(추가적인 4개는 기본적으로 비활성화 돼 있습니다).

Name Type
UART0 PL011
UART1 mini UART
UART2 PL011
UART3 PL011
UART4 PL011
UART5 PL011

 

※ 라즈베리파이 3와 5에 대한 정보는 여기에서 확인하실 수 있습니다.

Primary UART

GPIO 14(transmit, 8번 핀)와 GPIO 15(receive, 10번 핀)가 Primary UART입니다. 기본적으로 리녹스 콘솔에 할당됩니다.

Secondary UART

Secondary UART는 GPIO 커넥터에 나타나지 않고 블루투스와 연결됩니다. 그러므로, 만약 Secondary UART를 사용해야 하는 경우에는 블루투스 기능을 비활성화 해야 합니다.

Model Primary/console Secondary/Bluetooth
Raspberry Pi Zero UART0 UART1
Raspberry Pi Zero W / Raspberry Pi Zero 2 W UART1 UART0
Raspberry Pi 1 UART0 UART1
Raspberry Pi 2 UART0 UART1
Raspberry Pi 3 UART1 UART0
Compute Module 3 & 3+ UART0 UART1
Raspberry Pi 4 UART1 UART0
Raspberry Pi 5 UART10 <dedicated UART>

 

표에 나오는 것처럼 라즈베리파이4의 경우 블루투스를 비활성화 하지 않는다고 하면 기본적으로 Primary UART를 사용해야 하는데 이는 UART1 입니다. UART1은 타입이 mini UART입니다. 그러므로, PL011 타입의 UART를 라즈베리파이 4에서 사용하기 위해서는 추가적인 4개의 PL011 타입 UART2 ~ 5 중 하나를 활성화하고 이를 사용해야 합니다.

UART2 활성화

UART2을 활성화하여 먼지 센서를 연결해 보겠습니다. vi나 nano를 사용하여 /boot/firmware/config.txt 파일을 엽니다.

sudo vi /boot/firmware/config.txt

 

파일의 맨 아래에 보면 [all] 섹션이 있는데 그 밑에 다음과 같은 내용의 줄을 추가합니다. 이렇게 하는 것을 디바이스 트리 오버레이라고 합니다. 관련해서 상세한 정보가 궁금하신 분은 여기를 참고하시기 바랍니다.

[all]
dtoverlay=uart2

 

라즈베리파이를 재부팅합니다.

sudo reboot

 

재부팅 후에 접속하여 재대로 활성화 됐는지 확인합니다.

ls /dev/ttyAMA*

 

/dev/ttyAMA2 라는 항목이 나오면 제대로 활성화가 된 것입니다.

 

먼지 센서 연결

다음과 같이 GPIO 번호를 확인할 수 있습니다. Info에서 GPIO 0 ~ 3번이라고 나오는데, 이 중 0번이 tx, 1번이 rx입니다. ctsrts를 매개변수를 추가하면 2, 3번이 CTS와 RTS로 쓰이게 됩니다. CTS와 RTS는 UART의 흐름 제어와 관련된 것으로, 먼지 센서 연결시에는 신경쓰지 않아도 됩니다.

먼지 센서 인터페이스 보드

 

  • VCC: 5V 핀에 연결 
  • GND: ground 핀에 연결
  • TX: GPIO의 RX에 연결. 즉, GPIO 1번 (핀번호 28)
  • RX: GPIO의 TX에 연결. 즉, GPIO 0번 (핀번호 27)

출처:https://www.raspberrypi.com/documentation/computers/raspberry-pi.html

동작 테스트

다음은 동작 테스트입니다. 테스트를 위한 코드는 eleparts에서 제공하는 https://github.com/eleparts/PMS7003의 코드를 pythonic하게 정리 및 수정했습니다.

import logging
from typing import Any

import serial
import struct


class DustSensor:
    # PMS7003 protocol data (HEADER 2byte + 30byte)
    PMS7003_PROTOCOL_SIZE = 32

    # PMS7003 data list
    HEADER_HIGH            = 0  # 0x42
    HEADER_LOW             = 1  # 0x4d
    FRAME_LENGTH           = 2  # 2x13+2(data+check bytes)
    DUST_PM1_0_CF1         = 3  # PM1.0 concentration unit μ g/m3(CF=1,standard particle)
    DUST_PM2_5_CF1         = 4  # PM2.5 concentration unit μ g/m3(CF=1,standard particle)
    DUST_PM10_0_CF1        = 5  # PM10 concentration unit μ g/m3(CF=1,standard particle)
    DUST_PM1_0_ATM         = 6  # PM1.0 concentration unit μ g/m3(under atmospheric environment)
    DUST_PM2_5_ATM         = 7  # PM2.5 concentration unit μ g/m3(under atmospheric environment)
    DUST_PM10_0_ATM        = 8  # PM10 concentration unit μ g/m3  (under atmospheric environment)
    DUST_AIR_0_3           = 9  # indicates the number of particles with diameter beyond 0.3 um in 0.1 L of air.
    DUST_AIR_0_5           = 10 # indicates the number of particles with diameter beyond 0.5 um in 0.1 L of air.
    DUST_AIR_1_0           = 11 # indicates the number of particles with diameter beyond 1.0 um in 0.1 L of air.
    DUST_AIR_2_5           = 12 # indicates the number of particles with diameter beyond 2.5 um in 0.1 L of air.
    DUST_AIR_5_0           = 13 # indicates the number of particles with diameter beyond 5.0 um in 0.1 L of air.
    DUST_AIR_10_0          = 14 # indicates the number of particles with diameter beyond 10 um in 0.1 L of air.
    RESERVEDF              = 15 # Data13 Reserved high 8 bits
    RESERVEDB              = 16 # Data13 Reserved low 8 bits
    CHECKSUM               = 17 # Checksum code


    def check_header(self, buffer) -> bool:
        return buffer[self.HEADER_HIGH] == 66 and buffer[self.HEADER_LOW] == 77

    def calculate_checksum(self, buffer) -> int:
        buffer = buffer[0:self.PMS7003_PROTOCOL_SIZE]

        # data unpack (Byte -> Tuple (30 x unsigned char <B> + unsigned short <H>))
        checksum_data = struct.unpack('!30BH', buffer)

        checksum: int = 0

        for i in range(30):
            checksum = checksum + checksum_data[i]

        return checksum

    def check_checksum(self, buffer) -> bool:
        calculated_checksum = self.calculate_checksum(buffer)

        checksum_buffer = buffer[30:self.PMS7003_PROTOCOL_SIZE]
        checksum = struct.unpack('!H', checksum_buffer)

        return calculated_checksum == checksum[0]

    def check_protocol_size(self, buffer) -> bool:
        return self.PMS7003_PROTOCOL_SIZE <= len(buffer)

    def check_protocol(self, buffer) -> bool:
        result = False

        if not self.check_protocol_size(buffer):
            logging.error("protocol error")
        elif not self.check_header(buffer):
            logging.error('header error')
        elif not self.check_checksum(buffer):
            logging.error("checksum error")
        else:
            result = True

        return result

    # unpack data
    # <Tuple (13 x unsigned short <H> + 2 x unsigned char <B> + unsigned short <H>)>
    def unpack_data(self, buffer) -> tuple[Any, ...]:
        buffer = buffer[0:self.PMS7003_PROTOCOL_SIZE]

        # data unpack (Byte -> Tuple (13 x unsigned short <H> + 2 x unsigned char <B> + unsigned short <H>))
        return struct.unpack('!2B13H2BH', buffer)

    def read(self, uart) -> tuple[tuple[Any, ...], int]:
        data = None
        checksum = None

        ser = serial.Serial(uart, 9600, timeout=1)
        buffer = ser.read(1024)
        ser.close()

        if self.check_protocol(buffer):
            data = self.unpack_data(buffer)
            checksum = self.calculate_checksum(buffer)

        return data, checksum

    def test(self, uart):
        data, checksum = self.read(uart)

        if data:
            print(f'Header : {data[self.HEADER_HIGH]} {data[self.HEADER_LOW]} \t\t | Frame length : {data[self.FRAME_LENGTH]}')
            print(f'PM 1.0 (CF=1) : {data[self.DUST_PM1_0_CF1]}\t | PM 1.0 : {data[self.DUST_PM1_0_ATM]}')
            print(f'PM 2.5 (CF=1) : {data[self.DUST_PM2_5_CF1]}\t | PM 2.5 : {data[self.DUST_PM2_5_ATM]}')
            print(f'PM 10.0 (CF=1) : {data[self.DUST_PM10_0_CF1]}\t | PM 10.0 : {data[self.DUST_PM10_0_ATM]}')
            print(f'0.3um in 0.1L of air : {data[self.DUST_AIR_0_3]}')
            print(f'0.5um in 0.1L of air : {data[self.DUST_AIR_0_5]}')
            print(f'1.0um in 0.1L of air : {data[self.DUST_AIR_1_0]}')
            print(f'2.5um in 0.1L of air : {data[self.DUST_AIR_2_5]}')
            print(f'5.0um in 0.1L of air : {data[self.DUST_AIR_5_0]}')
            print(f'10.0um in 0.1L of air : {data[self.DUST_AIR_10_0]}')
            print(f'Reserved F : {data[self.RESERVEDF]} | Reserved B : {data[self.RESERVEDB]}')
            print(f'CHKSUM : {checksum} | read CHKSUM : {data[self.CHECKSUM]} | CHKSUM result : {checksum == data[self.CHECKSUM]}')
        else:
            print ("data read error")


if __name__ == '__main__':
    DustSensor().test('/dev/ttyAMA2')

 

이 코드를 dust_sensor.py로 저장하고 python dust_sensor.py 식으로 실행하면 다음과 같은 결과를 볼 수 있습니다.

 

코드에서 볼 수 있지만, 직렬 데이터를 읽어와서 헤더, 체크섬 등을 확인하고, 데이터시트(https://www.eleparts.co.kr/data/goods_old/data/DS_PMS7003.pdf)를 참고하여 제공되는 데이터를 구분해야 하는데 eleparts에서 제공하는 코드가 잘 돼 있어서 구분이 어렵지 않습니다.

위 코드를 다른 곳에서 사용할 때는 DustSensor 클래스를 import 한 후에 다음과 같이 사용하면 됩니다(주의할 점은 이 코드는 동시성이 보장 되지는 않습니다).

from dust_sensor import DustSensor


dust_sensor = DustSensor()
data, sensor = dust_sensor.read('/dev/ttyAMA2')
# data에서 각 항목을 읽는 부분은 DustSensor#test의 코드를 보시면 됩니다.

참고 자료