Penta SATA HAT enhanced display (oled, I/O, temp) and operation (button, fan)

The Quad SATA HAT (Raspberry Pi) software has had recent significant updates and upgrades by @RayMondDrakon.

The Penata SATA HAT (Rockpi 4B) software is lacking this additional utility and I have merged in the updates from the Quad SATA HAT software, and done additional modifications.

I used Armbian_20.05.02_Rockpi-4b_buster_legacy_4.4.213.img with openmediavault-omvextraorg_latest_all5.deb a Rockpi 4B+.

The Penata SATA HAT codebase came from: https://github.com/akgnah/rockpi-penta

Display

There are the three basic display pages:

page1
page2
page3

And three optional pages that will display depending on /etc/rockpi-penta.conf settings:

  • [network]

    • interfaces = eth0

page4

  • [disk]

    • space_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3

    • io_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3

    • disks_temp = true

page5
page6

Display Updating and Data Sampling

Each individual page’s statistics are determined just prior to its display. This keeps the displayed data up-to-date. Additionally the user can configure display refreshing so that a page’s data and display may be refreshed and remain current even if the page is displayed for a long duration.

  • [slider]

    • time = 10

    • refresh = 1.0

Data Transfer is fully captured

The data for Network and Disk I/O pages are continuous accumulation, rather than sampled for a short time. Each update of an I/O page will be the total transfers since the last update (display or refresh) of that page. This means that transient loads are not missed.

Fan Speed

The Quad HAT code based the fan speed on the CPU, and SATA board temperature sensor, and like the original code it had four speed steps (25%, 50%, 75% and 100%) whose temperatures could be configured.

The Penta HAT fan speed control has been updated so that it can also be configured to be linear between the 25% and 100% temperature settings, and does not have to be a limited number of steps, so it more closely matches the current system temperature(s).

The new code also allows the user to use disk temperatures in fan control, where the speed will be dependent upon the maximum of the CPU temperature and average physical disk temperatures. So if the disks get hot, the fan speed will increase.

  • [fan]

    • linear = true

    • temp_disks = true

Temperature Specification

As with the Quad HAT code the temperature scale (Fahrenheit or Celsius) can be configured. This setting also covers the specification of temperatures for setting fan speed. So if you use Fahrenheit, the fan temperature levels would also be specified in Fahrenheit. Previously they were specified only in Celsius.

Control Switch changes

The top-board switch (button) is used to control manual page advance, turning the fan on or off and shutdown or reset. This switch was not detected on my system, even with code to enable a passive pullup. This may be a limit of the Armbian release I used.

This meant that I had to solder a passive pullup between the top-board’s switch and 3.3V, as seen in the following picture. The code to enable the Rockpi’s pullup is still in the code and may work for you.

I was getting gpio146: mode: Failed to open 'drive' for writing: No such file or directory, when starting the service, though it still ran with the pullup soldered on. I used a 1K resistor, as the first one I grabbed, a 10K is probably a better value.

resistor

Config

The config file has a number of sections and keys. Config processing now defaults what it does not find in the config file, so you would only need to enter the unique or changed values. If you make a mistake in a value, such as text in a numeric field, a message will be logged.

The defaults can be seen in misc.py source.

[fan]
# When the temperature is above lv0, the fan at 25% power,
# and lv1 at 50% power, lv2 at 75% power, lv3 at 100% power.
# When the temperature is below lv0, the fan is essentially turned off.
# If linear is true, the fan speed is continuously maintained between
# the temperature limits of lv0 (25%) and lv3 (100%) and lv1/lv2 are ignored.
# If temp_disks is true, the average temperature of the SD disks is used,
# with the CPU temperature (max between cpu/disk) to set fan speed.
# You can change these values if necessary.
lv0 = 25
lv1 = 40
lv2 = 45
lv3 = 50
linear = true
temp_disks = true
[key]
# You can customize the function of the key, currently available functions are
# slider: oled display next page
# switch: fan turn on/off switch
# reboot, poweroff
# If you have any good suggestions for key functions, 
# please add an issue on https://setq.me/rockpi-sata
click = slider
twice = switch
press = poweroff

[time]
# twice: maximum time between double clicking (seconds)
# press: long press time (seconds)
twice = 0.7
press = 3

[slider]
# Whether the oled auto display next page and the time interval (seconds)
# If refresh is non-zero it will be the period between successive refreshes
# of the data in the currently displayed page.
auto = true
time = 4
refresh = 1.0

[oled]
# Whether rotate the text of oled 180 degrees, whether all temps are in Fahrenheit
# or Celsius, including fan lv# temperatures.
rotate = true
f-temp = false

[network]
# Name of the interfaces which should be measured (separated with |)
# Leave it blank (after the =) if you don't want to use it
# Option 'auto' means select them automatically by their link status
#  (every interface with link status is UP)
interfaces = eth0

[disk]
# Mount points for non-root disks to show space usage (separated with |)
# Mount points found through df -h, rather than drive names which change
space_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3

# Mount points for disks to show space usage (separated with |)
# Leave it blank (after the =) if you don't want to use it
# Mount point name is from "df -Bg" command.
io_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3

# Detect Automatically the disks and show their temperatures
disks_temp = true 

main.py

#!/usr/bin/env python3
import sys
import time
import fan
import misc
import multiprocessing as mp

""" Conditionally import the oled functions and flag if
    the top_board seems to exist.
    Log any exception generated.
"""
try:
    import oled
    top_board = 1
except Exception as ex:
    top_board = 0
    print (ex)


q = mp.Queue()              # communication on watch_key/receive_key
display_queue = mp.Queue()  # communication in the display processor

refresh_theshold = 0.1    # we will not refresh if period is less than this (seconds)

action = {
    'none': lambda: 'nothing',
    'slider': lambda: display_queue.put(True),
    'switch': lambda: misc.fan_switch(),
    'reboot': lambda: misc.check_call('reboot'),
    'poweroff': lambda: misc.check_call('poweroff --halt'),
}

"""
   Receive a user input from the queue as a 'key' value
   to get the processing function and then run the action
   for it.
"""
def receive_key(q):
    while True:
        func = misc.get_func(q.get())
        action[func]()
        time.sleep(0.1)


def main():
    if sys.argv[-1] == 'on':
        if top_board:
            oled.welcome()
    elif sys.argv[-1] == 'off':
        if top_board:
            oled.goodbye()
        exit(0)
        

if __name__ == '__main__':
    main()
    if top_board:
        p_key_processor = mp.Process(target=receive_key, args=(q,), name='Receive Key')
        p_key_processor.start()
        
        p_Key_decoder = mp.Process(target=misc.watch_key, args=(q,), name='Watch Key')
        p_Key_decoder.start()
        
        p_display_mamager = mp.Process(target=oled.auto_slider, name='Auto Slider', args=(display_queue,))
        p_display_mamager.start()
        
        p_display_process = mp.Process(target=oled.display_process, name='Display Process', args=(display_queue,))
        p_display_process.start()
        
        refresh_period = misc.get_refresh_period()
        if (refresh_period > refresh_theshold):
            p_refresh_display = mp.Process(target=oled.refresh_display, name='Refresh Display', args=(display_queue,))
            p_refresh_display.start()

    p_fan = mp.Process(target=fan.running, name='Fan')
    p_fan.start()
    
    p_fan.join()

fan.py

#!/usr/bin/env python3
"""
Control the PENTA HAT top fan according to the temperature of the RockPi.
We really should read the top hat temperature (as with the SATA HAT code).

Uses the mraa GPIO library for PWM.
   : https://iotdk.intel.com/docs/master/mraa/python/mraa.html#pwm
   PWM ratio is in range of 0.0 to 1.0 - it is the off ratio, rather than on ratio

Fan PWN frequency is 25KHz, according to Noctua fan white paper
   : https://noctua.at/pub/media/wysiwyg/Noctua_PWM_specifications_white_paper.pdf

"""

import time
import mraa  # pylint: disable=import-error
import misc

fan_pin = 13

pin13 = mraa.Pwm(fan_pin)
pin13.period_us(40)
pin13.enable(True)


"""
    Read the CPU temperature and include disks if we want
    to use their temperature as well. This means that we
    have to be capturing disk temperatures (auto display or polled).
"""
def read_cpu_temp():
    with open('/sys/class/thermal/thermal_zone0/temp') as f:
        t_cpu = float(f.read().strip()) / 1000.0
        if misc.is_temp_farenheit():
            t_cpu = t_cpu *1.8 + 32
    if misc.is_fan_cpu_and_disk():
        if (misc.get_last_disk_temp_poll() + misc.get_fan_poll_delay()) < time.time():    # poll disk temps
            misc.get_disk_temp_info()
        t_disk = misc.get_disk_temp_average()
    else:
        t_disk = 0.0
    cpu_temp = max(t_cpu, t_disk)
    return cpu_temp


"""
    Return the speed % the top_board fan.
"""
def get_dc(cache={}):
    if not(misc.fan_running()):
        return 0.1      # 0.0% can make fan run faster

    # limit the update rate to once every 5 seconds
    if time.time() - cache.get('time', 0) > 5:
        cache['time'] = time.time()
        cache['dc'] = misc.fan_temp2dc(read_cpu_temp())
    return cache['dc']

"""
    Change the PWM off ratio for the fan if it changed.
    We receive a percent and need a 0.0 to 1.0 off ratio.
"""
def change_dc(dc, cache={}):
    if dc != cache.get('dc'):
        cache['dc'] = dc
        pin13.write(1 - (dc / 100))

"""
    Main loop updating the fan's speed according to the
    desired temperature thresholds.
"""
def running():
    while True:
        change_dc(get_dc())
        time.sleep(0.1)

oled.py

#!/usr/bin/env python3

"""
    Manage the display according to the configuration data and
    the state of the hardware. 
    
    Displayed pages have their values update just prior to display
    and the user can specify a refresh period so that a page that
    stays up a long time can be refreshed to get current state.
    
    I/O rates are for the period since the last update for that
    device's display page, so they are continuous collections.
"""
from pickle import NONE
import time
from weakref import ref
import misc
import fan
import Adafruit_SSD1306
import multiprocessing as mp

from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont

font = {
    '10': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 10),
    '11': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 11),
    '12': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 12),
    '14': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 14),
}

def disp_init():
    disp = Adafruit_SSD1306.SSD1306_128_32(rst=None, i2c_bus=7)
    [getattr(disp, x)() for x in ('begin', 'clear', 'display')]
    return disp

""" 
    Mainline functional code to setup environment.
    manager mutable variables are used between processes.
"""
manager = mp.Manager()

refresh_time = manager.list()
refresh_time += [time.time()]

next_time = manager.list()
next_time += [time.time()]

display_lock = mp.Lock()       # lock clean display updates 

"""
    Condition the display when first imported. If we have any
    problems, our exception will signal our inability to handle
    the display.
"""
misc.set_mode(23, 0)
time.sleep(0.2)
misc.set_mode(23, 1)

try:
    disp = disp_init()
except Exception:
    misc.open_pwm_i2c()
    time.sleep(0.2)
    disp = disp_init()

image = Image.new('1', (disp.width, disp.height))
draw = ImageDraw.Draw(image)


def disp_show():
    im = image.rotate(180) if misc.conf['oled']['rotate'] else image
    disp.image(im)
    disp.display()
    draw.rectangle((0, 0, disp.width, disp.height), outline=0, fill=0)


def welcome():
    with display_lock:
        draw.text((0, 0), 'ROCKPi PENTA HAT', font=font['14'], fill=255)
        draw.text((32, 16), 'Loading...', font=font['12'], fill=255)
        disp_show()
    misc.get_disk_io_rates()
    misc.get_interface_io_rates()
    time.sleep(1)

def goodbye():
    with display_lock:
        draw.text((32, 8), 'Good Bye ~', font=font['14'], fill=255)
        disp_show()
    time.sleep(2)
    with display_lock:
        disp_show()  # clear

"""
    A Class to hold page generators. Page generators are
    specialized to generate their specific data, current
    to the time of display.
    
    page_factory will return a list of page objects.
    
    get_page_text will return an empty Display data list
    if we have nothing to display.
"""
class Generated_page:
    """
        Return a list of page objects.
    """
    @staticmethod
    def page_factory(self):
        return list()
    
    """ 
        Return a list of Display Text for each 
        of the display lines. 
        
        If data is invalid or missing, the list is empty,
        and should be skipped.
    """
    def get_page_text():
        return {'line 1', 'line 2', 'line 3'}


""" Generate list of display page 0 - Uptime, CPU Temp, Host network address. """
class General_system_info_page0 (Generated_page):
    
    @staticmethod
    def page_factory():
        return [General_system_info_page0()]
    
    def get_page_text(self):
        return  [{'xy': (0, -2), 'text': misc.get_info('up'), 'fill': 255, 'font': font['11']},
            {'xy': (0, 10), 'text': get_cpu_temp(), 'fill': 255, 'font': font['11']},
            {'xy': (0, 21), 'text': misc.get_info('ip'), 'fill': 255, 'font': font['11']}
        ]

    
""" Generate the list of display page 1 - Fan speed %, CPU use, Memory Use. """
class General_system_info_page1 (Generated_page):
    
    @staticmethod
    def page_factory():
        return [General_system_info_page1()]
    
    def get_page_text(self):
        return [{'xy': (0, -2), 'text': 'Fan speed: ' + str(int(fan.get_dc())) + '%', 'fill': 255, 'font': font['11']},
            {'xy': (0, 10), 'text': misc.get_info('cpu'), 'fill': 255, 'font': font['11']},
            {'xy': (0, 21), 'text': misc.get_info('mem'), 'fill': 255, 'font': font['11']}
        ] 
        
           
""" Generate List of 1 Disk info page. This will show the %full
    of up to the first 4 drives specified, plus the root.
""" 
class Disk_info_page (Generated_page):
        
    @staticmethod
    def page_factory():
        return [Disk_info_page()]
    
    def get_page_text(self):
        k, v = misc.get_disk_used_info()
        text1 = 'Disk: {} {}'.format(k[0], v[0])
        text2 = ''
        text3 = ''

        if len(k) >= 5:     # take first 4 if more than 3 disks
            text2 = '{} {}  {} {}'.format(k[1], v[1], k[2], v[2])
            text3 = '{} {}  {} {}'.format(k[3], v[3], k[4], v[4])
        elif len(k) == 4:
            text2 = '{} {}  {} {}'.format(k[1], v[1], k[2], v[2])
            text3 = '{} {}'.format(k[3], v[3])
        elif len(k) == 3:
            text2 = '{} {}  {} {}'.format(k[1], v[1], k[2], v[2])
        elif len(k) == 2:
            text2 = '{} {}'.format(k[1], v[1])
        return [
                {'xy': (0, -2), 'text': text1, 'fill': 255, 'font': font['11']},
                {'xy': (0, 10), 'text': text2, 'fill': 255, 'font': font['11']},
                {'xy': (0, 21), 'text': text3, 'fill': 255, 'font': font['11']},
            ]

""" The generator for a list of display pages, one for each network
    interface whose I/O rate has been requested. The list is empty if no 
    interfaces have had their I/O rates requested.
"""
class Interface_io_page(Generated_page):
    # A page object is generated for each interface.
    def __init__(self, interface_name):
        self.interface_name = interface_name
        
    @staticmethod
    def page_factory():
        interface_list = []
        interfaces = misc.get_interface_list()

        if not interfaces:
            return interface_list
        
        for interface in interfaces:
            interface_list += [Interface_io_page(interface)]
        return interface_list
    
    def get_page_text(self):
        # update the current rate
        interface_rate = misc.get_interface_io_rate(self.interface_name)
        rx = 'Rx:{:10.6f}'.format(interface_rate["rx"]) + " MB/s"
        tx = 'Tx:{:10.6f}'.format(interface_rate["tx"]) + " MB/s"
        return [
            {'xy': (0, -2), 'text': 'Network (' + self.interface_name + '):', 'fill': 255, 'font': font['11']},
            {'xy': (0, 10), 'text': rx, 'fill': 255, 'font': font['11']},
            {'xy': (0, 21), 'text': tx, 'fill': 255, 'font': font['11']}
        ]
        

""" The generator for a list of display pages, one for each disk
    whose I/O rate has been requested. The list is empty if no disks
    have had their I/O rates requested.
"""
class Disk_io_page(Generated_page):
    # A page object is generated for each disk.
    def __init__(self, disk_name):
        self.disk_name = disk_name
    
    @staticmethod
    def page_factory():
        disk_list = []
        disks = misc.get_disk_list('io_usage_mnt_points')

        if not disks:
            return disk_list
        
        for disk_name in disks:
            disk_name = misc.delete_disk_partition_number(disk_name)
            disk_list += [Disk_io_page(disk_name)]
        return disk_list
    
    def get_page_text(self):
        disk_rate = misc.get_disk_io_rate(self.disk_name)
        read =  'R:{:11.6f}'.format(disk_rate["rx"]) + " MB/s"
        write = 'W:{:11.6f}'.format(disk_rate["tx"]) + " MB/s"
    
        return [
            {'xy': (0, -2), 'text': 'Disk (' + self.disk_name + '):', 'fill': 255, 'font': font['11']},
            {'xy': (0, 10), 'text': read, 'fill': 255, 'font': font['11']},
            {'xy': (0, 21), 'text': write, 'fill': 255, 'font': font['11']}
        ]


""" The generator for a list of 1 display page, for the temperatures
    of up to the 1st four /dev/sd* drives. The list is empty if no
    /dev/sd* disks are plugged in.
"""
class Disk_temp_info_page (Generated_page):
    
    @staticmethod
    def page_factory():
        return [Disk_temp_info_page()] if misc.conf['disk']['disks_temp'] else []
    
    
    """ Get the display text list of display records for this entry. """
    def get_page_text(self):
        k, v = misc.get_disk_temp_info()
        self.k = k

        text1 = 'Disk Temps:'
        text2 = ''
        text3 = ''
        if len(k) >= 4:
            text2 = '{} {}  {} {}'.format(k[0], v[0], k[1], v[1])
            text3 = '{} {}  {} {}'.format(k[2], v[2], k[3], v[3])
        elif len(k) == 3:
            text2 = '{} {}  {} {}'.format(k[0], v[0], k[1], v[1])
            text3 = '{} {}'.format(k[2], v[2])
        elif len(k) == 2:
            text2 = '{} {}  {} {}'.format(k[0], v[0], k[1], v[1])
        elif len(k) == 1:
            text2 = '{}'.format(k[0], v[0])
            
        return [
            {'xy': (0, -2), 'text': text1, 'fill': 255, 'font': font['11']},
            {'xy': (0, 10), 'text': text2, 'fill': 255, 'font': font['11']},
            {'xy': (0, 21), 'text': text3, 'fill': 255, 'font': font['11']},
        ]
            
"""
    Generate all the display page objects. We will iterate through the
    list and generate their text just before display. If a generator is
    configured to not generate any page its returned list will be empty.
"""
def gen_display_pages_list():
    display_page_list = General_system_info_page0.page_factory()
    display_page_list += General_system_info_page1.page_factory()
    display_page_list += Disk_info_page.page_factory()
    display_page_list += Interface_io_page.page_factory()
    display_page_list += Disk_io_page.page_factory()
    display_page_list += Disk_temp_info_page.page_factory()
    return display_page_list


"""
    Return a string with the current CPU temperature converted
    into the desired scale (f/c), ready for display
"""
def get_cpu_temp():
    t = float(misc.get_info('temp')) / 1000
    if misc.is_temp_farenheit():
        temp = "CPU Temp: {:.0f}°F".format(t * 1.8 + 32)
    else:
        temp = "CPU Temp: {:.1f}°C".format(t)
    return temp


"""
    Update the display on a timed basis, if we are configured as auto.
"""
def auto_slider(display_queue):
    next_time[0] = time.time() + 10
    display_queue.put(True)                 # force an initial display
    while misc.conf['slider']['auto']:      # to allow retry on duration config
        duration = misc.get_slider_sleep_duration()
        if duration:
            next_time[0] = time.time() + duration
            while time.time() < next_time[0]:
                time.sleep(0.1)
            display_queue.put(True)
        else:
            time.sleep(0.1)     # wait for misc to startup and read config
            
"""
    display_process runs to update the display from a Boolean on its
    display_queue. True causes it to display the next page and can
    come from the auto_slider process as it runs through the timer
    or from the button. When a next is performed, it resets the auto
    timer so that a button advance will last as long as an auto advance.
    The refresh timer is reset.
    
    A False means that the display is refreshed with the current page
    and its updated data. It does not change the auto timer.
"""
def display_process(display_queue):
    last_page = [None]
    display_list = []

    misc.get_interface_io_rates()
    misc.get_disk_io_rates()

    """
        Follow the list of pages to be generated and
        displayed and generate a fresh set when exhausted.
    """

    while True:
        action = display_queue.get()            # wait for an display request
        with display_lock:
            if not len(display_list):           # refresh before first display
                display_list += gen_display_pages_list()
            if action:                   # next page and reset time
                next_time[0] = time.time() + misc.get_slider_sleep_duration()
            refresh_time[0] = time.time() + misc.get_refresh_period()

            last_page[0] = display_list[0] if action else last_page[0] # refresh displays the last page
            if last_page[0]:
                for item in last_page[0].get_page_text():
                    draw.text(**item)
                disp_show()
            if action:
                display_list.pop(0)

"""
    We will refresh the display status for the current page. Refresh time
    is updated by the display_process each time a page (new or current) is
    displayed.
"""
def refresh_display(display_queue):
    while misc.get_refresh_period():
        if time.time() > refresh_time[0]:
            refresh_time[0] = time.time() + misc.get_refresh_period()
            display_queue.put(False)
        time.sleep(0.1)

misc.py

The body of the misc.py code exceeds the limit of a single entry, and will follow as a reply to this entry.

misc.py

#!/usr/bin/env python3
"""
    Manage the information about the Penta SATA HAT hardware,
    and provide information as requested by the other parts
    of the SATA HAT service.
"""
import re
import os
import sys
import time
import mraa  # pylint: disable=import-error
import shutil
import subprocess
import multiprocessing as mp
from configparser import ConfigParser
from collections import defaultdict, OrderedDict


cmds = {
    'blk': "lsblk | awk '{print $1}'",
    'up': "echo Up: $(uptime -p | sed 's/ years,/y/g;s/ year,/y/g;s/ months,/m/g;s/ month,/m/g;s/ weeks,/w/g;s/ week,/w/g;s/ days,/d/g;s/ day,/d/g;s/ hours,/h/g;s/ hour,/h/g;s/ minutes/m/g;s/ minute/m/g' | cut -d ' ' -f2-)",
    'temp': "cat /sys/class/thermal/thermal_zone0/temp",
    'ip': "hostname -I | awk '{printf \"IP %s\", $1}'",
    'cpu': "uptime | tr , . | awk '{printf \"CPU Load: %.2f%%\", $(NF-2)}'",
    'mem': "free -m | awk 'NR==2{printf \"Mem: %s/%s MB\", $3,$2}'",
    'disk': "df -h | awk '$NF==\"/\"{printf \"Disk: %d/%d GB %s\", $3,$2,$5}'"
}

""" Fan percent correspondence to temperature levels. """
lv2dc = OrderedDict({'lv3': 100, 'lv2': 75, 'lv1': 50, 'lv0': 25})

# we hold raw data for MB count and second of sample time
raw_interface_io = defaultdict(dict)
raw_disk_io = defaultdict(dict)

# we hold the calculated transfer rates in MB/s
interface_io_rate = defaultdict(dict)
disk_io_rate = defaultdict(dict)

# we hold the drive sector size since linux reports in sectors transferred
disk_sector_sizes = defaultdict(dict)

manager = mp.Manager()
last_fan_poll_time = manager.list()
last_fan_poll_time += [0.0]

fan_poll_delay = manager.list()
fan_poll_delay += [10.0]


"""
    Set a value on a GPIO pin, forcing the pin to being
    an Output Pin. 
    
    If the pin cannot be written, print the exception to
    the log and continue.
"""
def set_mode(pin, mode=1):
    try:
        pin = mraa.Gpio(pin)
        pin.dir(mraa.DIR_OUT)
        pin.write(mode)
    except Exception as ex:
        print(ex)

"""
    Call the Linux shell for this user with the supplied
    command string and return the command output string 
    with leading and trailing white space removed.
"""
def check_output(cmd):
    return subprocess.check_output(cmd, shell=True).decode().strip()


"""
    Call the Linux shell for this user with the supplied
    command string and ignore any possible command outut.
"""
def check_call(cmd):
    return subprocess.check_call(cmd, shell=True)

"""
    Call the Linux shell for this user with the specified
    command from the cmd table and return the whitespace trimmed
    result.
"""
def get_info(s):
    return check_output(cmds[s])

"""
    Read the Configuration file and build a dictionary of
    the values specified in it, providing defaults if a
    configuration file does not exist or has missing
    keys/values.
"""
def read_conf():
    global fan_poll_delay
    conf = defaultdict(dict)

    try:
        cfg = ConfigParser()
        cfg.read('/etc/rockpi-penta.conf')
        # fan
        conf['fan']['lv0'] = cfg.getfloat('fan', 'lv0', fallback=35)
        conf['fan']['lv1'] = cfg.getfloat('fan', 'lv1', fallback=40)
        conf['fan']['lv2'] = cfg.getfloat('fan', 'lv2', fallback=45)
        conf['fan']['lv3'] = cfg.getfloat('fan', 'lv3', fallback=50)
        conf['fan']['linear'] = cfg.getboolean('fan', 'linear', fallback=False)
        conf['fan']['temp_disks'] = cfg.getboolean('fan', 'temp_disks', fallback=False)
        # key
        conf['key']['click'] = cfg.get('key', 'click', fallback='slider')
        conf['key']['twice'] = cfg.get('key', 'twice', fallback='switch')
        conf['key']['press'] = cfg.get('key', 'press', fallback='none')
        # time
        conf['time']['twice'] = cfg.getfloat('time', 'twice', fallback=0.7)
        conf['time']['press'] = cfg.getfloat('time', 'press', fallback=1.8)
        # slider
        conf['slider']['auto'] = cfg.getboolean('slider', 'auto', fallback=True)
        conf['slider']['time'] = cfg.getfloat('slider', 'time', fallback=10.0)
        refresh_string = cfg.get('slider', 'refresh', fallback='0.0')
        conf['slider']['refresh'] = 0.0 if not len(refresh_string) else float(refresh_string)
        # oled
        conf['oled']['rotate'] = cfg.getboolean('oled', 'rotate', fallback=False)
        conf['oled']['f-temp'] = cfg.getboolean('oled', 'f-temp', fallback=False)
        # disk
        conf['disk']['space_usage_mnt_points'] = cfg.get('disk', 'space_usage_mnt_points', fallback='').split('|')
        conf['disk']['io_usage_mnt_points'] = cfg.get('disk', 'io_usage_mnt_points', fallback='').split('|')
        conf['disk']['disks_temp'] = cfg.getboolean('disk', 'disks_temp', fallback=False)
        if conf['disk']['disks_temp']:
            fan_poll_delay[0] = conf['slider']['time'] * 16     # allow for a lot of panels
        # network
        conf['network']['interfaces'] = cfg.get('network', 'interfaces', fallback='').split('|')
    except Exception as config_exception:
        print ("Config error:", repr(config_exception))
        # fan
        conf['fan']['lv0'] = 35
        conf['fan']['lv1'] = 40
        conf['fan']['lv2'] = 45
        conf['fan']['lv3'] = 50
        conf['fan']['linear'] = False
        conf['fan']['temp_disks'] = False
        # key
        conf['key']['click'] = 'slider'
        conf['key']['twice'] = 'switch'
        conf['key']['press'] = 'none'
        # time
        conf['time']['twice'] = 0.7  # second
        conf['time']['press'] = 1.8
        # slider
        conf['slider']['auto'] = True
        conf['slider']['time'] = 10.0  # second
        conf['slider']['refresh'] = 0.0
        # oled
        conf['oled']['rotate'] = False
        conf['oled']['f-temp'] = False
        # disk
        conf['disk']['space_usage_mnt_points'] = []
        conf['disk']['io_usage_mnt_points'] = []
        conf['disk']['disks_temp'] = False
        # network
        conf['network']['interfaces'] = []

    return conf

"""
    Read the timed/pattern of input from a top-board pushbutton
    as a GPIO input, according to the supplied pattern.
    If a pattern is matched, return the pattern key.
"""
def read_key(pattern, size):
    s = ''
    while True:
        s = s[-size:] + str(pin11.read())
        for t, p in pattern.items():
            if p.match(s):
                return t
        time.sleep(0.1)

"""
    Process any user input on the top-board button,
    queuing the pattern key when a pattern is detected.
"""
def watch_key(q=None):
    size = int(conf['time']['press'] * 10)
    wait = int(conf['time']['twice'] * 10)
    pattern = {
        'click': re.compile(r'1+0+1{%d,}' % wait),
        'twice': re.compile(r'1+0+1+0+1{3,}'),
        'press': re.compile(r'1+0{%d,}' % size),
    }
    while True:
        action = read_key(pattern, size)
        q.put(action)


"""
    Return the list of interfaces we should monitor for I/O.
"""
def get_interface_list():
    if len(conf['network']['interfaces']) == 1 and conf['network']['interfaces'][0] == '':
        return []

    if len(conf['network']['interfaces']) == 1 and conf['network']['interfaces'][0] == 'auto':
        interfaces = []
        cmd = "ip -o link show | awk '{print $2,$3}'"
        list = check_output(cmd).split('\n')
        for x in list:
            name_status = x.split(': ')
            if "UP" in name_status[1]:
                interfaces.append(name_status[0])

        interfaces.sort()

    else:
        interfaces = conf['network']['interfaces']

    return interfaces

"""
    Remove all parition number digits from the supplied disk name, 
    which must have "sd" in it.
"""
def delete_disk_partition_number(disk):
    while "sd" in disk and disk[-1].isdigit():
        disk = disk[:-1]
    return disk

"""
    Return a list of conf file specified disk types limited to only 
    mounted drives, sorted by drive paritition name.
"""
def get_disk_list(type):
    if len(conf['disk'][type]) == 1 and conf['disk'][type][0] == '':
        return []

    disks = []
    for x in conf['disk'][type]:
        cmd = "df -Bg | awk '$6==\"{}\" {{printf \"%s\", $1}}'".format(x)
        output = check_output(cmd).split('/')[-1]
        if output != '':
            disks.append(output)

    disks.sort()
    return disks

"""
    Return a list of all the sd* drives and their smartctl temperatures,
    sorted by drive name. Drives do not need to be mounted.
"""
def get_disk_temp_info():
    global last_fan_poll_time
    
    disk_temp = 0.0
    disk_temp_average = 0.0
    disks = sorted(check_output("lsblk -d | egrep ^sd | awk '{print $1}'").split("\n"))
    disks_temp = {}
    for disk in disks:
        if disk:
            cmd = "smartctl -A /dev/" + disk + " | egrep ^194 | awk '{print $10}'"
            cmd_output = check_output(cmd)
            try:
                disk_temp = float(cmd_output)
                if is_temp_farenheit():
                    disk_temp = disk_temp * 1.8 + 32
                    disk_temp_formatted = "{:.0f}°F".format(disk_temp)
                else:
                    disk_temp_formatted = "{:.0f}°C".format(disk_temp)
                disk_temp_average += disk_temp
                disks_temp[disk] = disk_temp_formatted
            except:
                disks_temp[disk] = '----'   # cannot read a temperature
        else:
            disks_temp[''] = ''     # no sd drives on the system
    disk_temp_average /= len(disks_temp)
    conf['disk_temp_average'].value = disk_temp_average
    last_fan_poll_time[0] = time.time()
    return list(zip(*disks_temp.items()))


""" Return true if temperatures are stated in Farenheit. """
def is_temp_farenheit():
    return conf['oled']['f-temp']


"""
    Return the time the last disk temperature poll was done.
"""
def get_last_disk_temp_poll():
    global last_fan_poll_time
    
    return last_fan_poll_time[0]

"""
    Return a list of disk partition's %used for all /dev mounted systems.
"""
def get_disk_used_info(cache={}):
    if not cache.get('time') or time.time() - cache['time'] > 30:
        info = {}
        cmd = "df -h | awk '$NF==\"/\"{printf \"%s\", $5}'"
        info['root'] = check_output(cmd)
        conf['disk']['disks'] = get_disk_list('space_usage_mnt_points')
        for x in conf['disk']['disks']:
            delete_disk_partition_number(x)
            cmd = "df -Bg | awk '$1==\"/dev/{}\" {{printf \"%s\", $5}}'".format(x)
            info[x] = check_output(cmd)
        cache['info'] = list(zip(*info.items()))
        cache['time'] = time.time()

    return cache['info']


"""
    Fill in disk_sector_sizes for the drive we will poll.
    Needed to accurately calculate byte rates from sector rates.
"""
def get_sector_size(disk):
    cmd = "cat /sys/block/" + disk + "/queue/hw_sector_size"
    disk_sector_sizes[disk] = int(check_output(cmd))

""" 
    Get the raw network interface transfer count sample and the time of sampling. 
    Raw network transfer values are in bytes.
"""
def get_interface_io(interface):
    cmd = "cat /sys/class/net/" + interface + "/statistics/rx_bytes"
    rx = int(check_output(cmd))
    cmd = "cat /sys/class/net/" + interface + "/statistics/tx_bytes"
    tx = int(check_output(cmd))
    return {"rx": rx, "tx": tx, "time": time.time()}

""" 
    Get the raw disk transfer count sample and the time of sampling. 
    Raw disk transfer values are in sectors for that drive.
"""
def get_disk_io(disk):
    cmd = "cat  /sys/block/" + disk + "/stat"
    output = check_output(cmd)
    columns = output.split()
    return {"rx": int(columns[2]), "tx": int(columns[6]), "time": time.time()}

""" 
    Sample the specified network interfaces and disks and calculate the rates against
    the last raw samples for these devices.
    
    Rates are returned in fractional MB/Second.
"""
def get_interface_io_rates():
    interfaces = get_interface_list()
    for interface in interfaces:
        get_interface_io_rate(interface)

""" Update the dict holding I/O rates for all interfaces. """
def get_interface_io_rate(interface):
        raw = get_interface_io(interface)
        # network raw data is in bytes transferred since the last boot
        if interface in raw_interface_io:
            duration = raw["time"] - raw_interface_io[interface]["time"]
            interface_io_rate[interface]["rx"] = ((raw["rx"] - raw_interface_io[interface]["rx"]) / duration) / 1024 / 1024
            interface_io_rate[interface]["tx"] = ((raw["tx"] - raw_interface_io[interface]["tx"]) / duration) / 1024 / 1024
        else:
            interface_io_rate[interface]["rx"] = 0
            interface_io_rate[interface]["tx"] = 0
        raw_interface_io[interface] = raw
        return interface_io_rate[interface]

""" Get updated rates for all the disks. """
def get_disk_io_rates():
    # disk raw data is in per-device sectors transferred since the last boot
    disks = get_disk_list('io_usage_mnt_points')
    for disk in disks:
        get_disk_io_rate(disk)

""" Get the I/O rate for a specific disk. """
def get_disk_io_rate(disk):
        disk = delete_disk_partition_number(disk)
        if not disk in disk_sector_sizes:        # initial sampling if we have no sector byte size for a disk
            get_sector_size(disk)

        raw = get_disk_io(disk)
        if disk in raw_disk_io:
            duration = raw["time"] - raw_disk_io[disk]["time"]
            disk_io_rate[disk]["rx"] = ((raw["rx"] - raw_disk_io[disk]["rx"]) / duration) / (1024 / disk_sector_sizes[disk]) / 1024
            disk_io_rate[disk]["tx"] = ((raw["tx"] - raw_disk_io[disk]["tx"]) / duration) / (1024 / disk_sector_sizes[disk]) / 1024
        else:
            disk_io_rate[disk]["rx"] = 0
            disk_io_rate[disk]["tx"] = 0
        raw_disk_io[disk] = raw
        return disk_io_rate[disk]


""" Return the IO rates for the specified interface. """
def get_interface_rates(interface):
    return interface_io_rate[interface]

""" return the IO rates for the specified disk. """
def get_disk_rates(disk):
    return disk_io_rate[disk]

def get_slider_sleep_duration():
    return conf['slider']['time']

"""
    Return the fan PWM value from the conf
    correspondence between temperature and fan speed.
    
    if we are a linear fan speed we will adjust the
    fan speed to the precise temperature between:
    lv0=25% and lv3=100%.
"""
def fan_temp2dc(temp):
    if conf['fan']['linear']:
        lv0_percent = lv2dc['lv0']
        lv3_percent = lv2dc['lv3']
        base_temp = conf['fan']['lv0']
        denominator = conf['fan']['lv3'] - base_temp
        slope = (lv3_percent - lv0_percent) / denominator if denominator > 0 else 1.0
        dc = min(lv3_percent, max(slope * (temp - base_temp) + lv0_percent, lv0_percent))        
        return dc
    else:
        for lv, dc in lv2dc.items():
            if temp >= conf['fan'][lv]:
                return dc
    return 10

"""
    Toggle the configuration dictionary setting for
    whether the fan should run or not.
"""
def fan_switch():
    conf['run'].value = not(conf['run'].value)

"""
    Return True if the fan is supposed to be running.
"""
def fan_running():
    return conf['run'].value

def get_func(key):
    return conf['key'].get(key, 'none')

"""
    Return true if we want to include disk temperatures
    with the fan.
"""
def is_fan_cpu_and_disk():
    return conf['fan']['temp_disks']

"""
    The poll delay is large if we normally poll, or
    reasonable if we are not polling.
"""
def get_fan_poll_delay():
    global fan_poll_delay
    
    return fan_poll_delay[0]


"""
    Return the last calculated average diskk temperatures.
"""
def get_disk_temp_average():
    return conf['disk_temp_average'].value

"""
    Return the refresh period configured.
"""
def get_refresh_period():
    return conf['slider']['refresh']

"""
    Open the PWM/I2C system and ensure that the
    mraa's /boot/hw_intfc.conf last setting is
    backed up.
"""
def open_pwm_i2c():
    def replace(filename, raw_str, new_str):
        with open(filename, 'r') as f:
            content = f.read()

        if raw_str in content:
            shutil.move(filename, filename + '.bak')
            content = content.replace(raw_str, new_str)

            with open(filename, 'w') as f:
                f.write(content)

    replace('/boot/hw_intfc.conf', 'intfc:pwm0=off', 'intfc:pwm0=on')
    replace('/boot/hw_intfc.conf', 'intfc:pwm1=off', 'intfc:pwm1=on')
    replace('/boot/hw_intfc.conf', 'intfc:i2c7=off', 'intfc:i2c7=on')


"""
    Configure the top_board's button GPIO through mraa.
"""
pin11 = mraa.Gpio(11)
pin11.dir(mraa.DIR_IN)
pin11.mode(mraa.MODE_IN_ACTIVE_HIGH)

"""
    Initialze internal variables maintained in the conf dictionary and
    read the system's conf file's conf dictionay settings.
"""
conf = {'disk': [], 'run': mp.Value('i', 1), 'disk_temp_average': mp.Value('f', 0.0),}
conf.update(read_conf())

if __name__ == '__main__':
    if sys.argv[-1] == 'open_pwm_i2c':
        open_pwm_i2c()

Just updated misc.fan_temp2dc to correct the speed.

Just updated oled.py to correct short circuit of interface and disk rates to start of display cycle.

misc.get_interface_io_rates()
misc.get_disk_io_rates()

moved to the display_process from Interface_io_page and Disk_io_page.

1 Like

Hey @oket,

can you give me some pointers for my problem? I run the SATA hat on my RPI4 running ubuntu and use the ZFS file system - so any try to get info on the hat display ends short after the root filesystem.
I’m not familiar enough with python to change misc.py without any help
I want info displayed on
vol0/shares which is the zfs volume I share over samba …
By looking in misc.py I see that “disc” is using a hard “reference” to /dev/discname … which doesnt work for me, since the /dev/sda…/dev/sdd are mounted together in a raidz-1 (=raid 5) … so it should point to /vol0/shares instead … and the function to cut off digits is obviously not needed …
Any Ideads how I can make the needed changes in misc.py ?

Could you provide a clipping of the area of code (the function(s)) in misc.py you are thinking of (or the source of misc.py you are using) and your /etc/rockpi-sata.conf. At the moment there might be a few different versions of the code in play, certainly I have updated my SATA Hat source, so I want to be sure I am looking at the code you are.

Could you provide the output of “df -h” as well? I am not running zfs, so what is seen on your system is important.