Pulse Meter

Posted on Nov 10, 2021

So I recently setup a microcontroller (using ESPHome, this repository in particular) to measure the pulses my electricity meter outputs. It's been working ok, but I fear that the wifi connectivity in my basement is terrible and that the microcontroller is missing pulses1 due to this.

I located another SBC2 intending to use it as a small hotspot, and place it close to the microcontroller to improve things, but realized that I couldn't power it reliably with my cheap PoE to USB-adapter for that board in particular, that it was tricky to power things in that part of the basement using anything over than PoE (having a power outlet near the electricity meter, how absurd?!)…so I took a Raspberry Pi 4 instead (as the PoE to USB-C adapters I have are reliable), and spent an hour or so to write some Python.

Development setup, very advanced

Below is the script I came up with. I was fearing/hoping that this project would have me learn to use system calls like select, or kqueue/epoll, but it turned out that gpiozero is an excellent library for tasks such as this!

The first working version was much shorter, but I decided to stop juggling global variables for state variables everywhere and just tidy it up a bit. I'm just using the distribution provided versions of gpiozero & paho-mqtt due to lazyness.

  # apt install python3-paho-mqtt python3-gpiozero
  # https://forum.micropython.org/viewtopic.php?t=7224
  # https://github.com/sjmelia/pi-electricity-monitor/blob/master/monitor.py
  # https://www.vishay.com/resistors/pulse-energy-calculator/
  # https://learn.openenergymonitor.org/electricity-monitoring/pulse-counting/introduction-to-pulse-counting

  from paho.mqtt import client as mqtt_client

  from gpiozero import LED, DigitalInputDevice
  from signal import pause

  import random, time, os, logging, json, systemd.daemon

  MQTT_TOPIC = "test/test"
  MQTT_BROKER = "mqtt.local"
  MQTT_PORT = 1883
  MQTT_CLIENT_ID = f'python-mqtt-{random.randint(0, 1000)}'
  MQTT_USER = "test"
  MQTT_PASSWORD = "test"

  # Mandatory, which PIN is the photodiode connected to?
  PD_PIN = 23

  # This was used when writing this script, set to true and connect a
  # LED to the correct pin
  BLINK_LED = os.environ.get("BLINK_LED", False)
  LED_PIN = 24 # Optional


  class PulseMeter(object):
      def __init__(self, mqtt_topic=MQTT_TOPIC, pulse_rate=1000):
          self.last_pulse = False
          self.pulse_start_time = False
          self.delta = 0
          self.current_power = 0.0
          self.pulse_rate = pulse_rate
          self.mqtt_topic = mqtt_topic
          self.mqtt_client = self.connect_mqtt()
          self.mqtt_client.loop_start()

      def connect_mqtt(self):
          def on_connect(client, userdata, flags, rc):
              logging.debug(f"broker: {MQTT_BROKER}, port: {MQTT_PORT}, client id: {MQTT_CLIENT_ID}")
              if rc == 0:
                  logging.info("Connected to MQTT Broker!")
              else:
                  logging.warn("Failed to connect, return code %d\n", rc)

          client = mqtt_client.Client(MQTT_CLIENT_ID)
          client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
          client.on_connect = on_connect
          client.connect(MQTT_BROKER, MQTT_PORT)
          return client

      def pulse_start(self):
          # This isn't even used, but this looks cooler than just having
          # a pass call here. :)
          self.pulse_start_time = time.time()

      def pulse_end(self):
          # No last time? Probably in startup, ignore and wait for the
          # next pulse...
          if not self.last_pulse:
              self.last_pulse = time.time()
              return

          current_time = time.time()

          self.delta = current_time - self.last_pulse
          self.current_power = 3600 / ( self.delta * self.pulse_rate )
          self.last_pulse = current_time
          logging.debug(f"power: {self.current_power:.3f} kWh delta: {self.delta}")

          self.publish_current_power()

      def publish_current_power(self, precision=3):
          # JSON due to reasons
          output = dict([("power", round(self.current_power, precision))])
          logging.debug(f"Trying to publish: {output}")
          result = self.mqtt_client.publish(self.mqtt_topic, json.dumps(output))

          status = result[0]
          if status != 0:
              logging.warn(f"Failed to send message to topic {self.mqtt_topic}")


  def run():
      logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
      pm = PulseMeter()
      led = LED(LED_PIN)
      pd = DigitalInputDevice(PD_PIN)

      pd.when_activated = pm.pulse_start
      pd.when_deactivated = pm.pulse_end

      if BLINK_LED:
          led.blink(on_time=0.01, off_time=1.25)

      systemd.daemon.notify('READY=1')
      pause()

  if __name__ == "__main__":
      run()

systemd unit file

Here's a systemd unit file, place it in /etc/systemd/system/pulse-meter.service or such. I stole this from torfsen/python-systemd-tutorial on GitHub.

  [Unit]
  Description=Pulse metering service

  [Service]
  # CHANGE ME
  ExecStart=/usr/bin/python3 /path/to/pulse-meter.py
  # Disable Python's buffering of STDOUT and STDERR, so that output from the
  # service shows up immediately in systemd's logs
  Environment=PYTHONUNBUFFERED=1
  Restart=on-failure
  Type=notify
  # /dev/gpio seems to be owned by root/root
  # User=python_demo_service

  [Install]
  WantedBy=default.target

Enable it:

  # systemctl daemon-reload
  # systemctl enable --now pulse-meter.service
  # journalctl -u pulse-meter.service
  ...
  Nov 13 17:59:21 ubuntu systemd[1]: Starting Pulse metering service...
  Nov 13 17:59:22 ubuntu python3[11487]: INFO:root:Connected to MQTT Broker!
  Nov 13 17:59:22 ubuntu systemd[1]: Started Pulse metering service.

Now I just have to figure out how reliable this is. That shouldn't be so hard, right? (-:


1

Yes, I've stared at it and counted each pulse from the electricity meter and counted the 'echoes' from the microcontroller setup. They didn't match up, and I've been too lazy to read the actual source code for the protobuf based protocol that is used for reporting to Home Assistant.

2

An ASUS Tinkerboard, infamous for being hard to power with micro-USB, which ironically also is its primary power source. It's generally recommeded to power it through its VCC pin instead.