The Quad SATA HAT (Raspberry Pi) software has had recent significant updates and upgrades by @RayMondDrakon.
The Penata SATA HAT (Rockpi 4B) software is lacking this additional utility and I have merged in the updates from the Quad SATA HAT software, and done additional modifications.
I used Armbian_20.05.02_Rockpi-4b_buster_legacy_4.4.213.img with openmediavault-omvextraorg_latest_all5.deb a Rockpi 4B+.
The Penata SATA HAT codebase came from: https://github.com/akgnah/rockpi-penta
Display
There are the three basic display pages:
And three optional pages that will display depending on /etc/rockpi-penta.conf settings:
-
[network]
-
interfaces = eth0
-
-
[disk]
-
space_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3
-
io_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3
-
disks_temp = true
-
Display Updating and Data Sampling
Each individual page’s statistics are determined just prior to its display. This keeps the displayed data up-to-date. Additionally the user can configure display refreshing so that a page’s data and display may be refreshed and remain current even if the page is displayed for a long duration.
-
[slider]
-
time = 10
-
refresh = 1.0
-
Data Transfer is fully captured
The data for Network and Disk I/O pages are continuous accumulation, rather than sampled for a short time. Each update of an I/O page will be the total transfers since the last update (display or refresh) of that page. This means that transient loads are not missed.
Fan Speed
The Quad HAT code based the fan speed on the CPU, and SATA board temperature sensor, and like the original code it had four speed steps (25%, 50%, 75% and 100%) whose temperatures could be configured.
The Penta HAT fan speed control has been updated so that it can also be configured to be linear between the 25% and 100% temperature settings, and does not have to be a limited number of steps, so it more closely matches the current system temperature(s).
The new code also allows the user to use disk temperatures in fan control, where the speed will be dependent upon the maximum of the CPU temperature and average physical disk temperatures. So if the disks get hot, the fan speed will increase.
-
[fan]
-
linear = true
-
temp_disks = true
-
Temperature Specification
As with the Quad HAT code the temperature scale (Fahrenheit or Celsius) can be configured. This setting also covers the specification of temperatures for setting fan speed. So if you use Fahrenheit, the fan temperature levels would also be specified in Fahrenheit. Previously they were specified only in Celsius.
Control Switch changes
The top-board switch (button) is used to control manual page advance, turning the fan on or off and shutdown or reset. This switch was not detected on my system, even with code to enable a passive pullup. This may be a limit of the Armbian release I used.
This meant that I had to solder a passive pullup between the top-board’s switch and 3.3V, as seen in the following picture. The code to enable the Rockpi’s pullup is still in the code and may work for you.
I was getting gpio146: mode: Failed to open 'drive' for writing: No such file or directory, when starting the service, though it still ran with the pullup soldered on. I used a 1K resistor, as the first one I grabbed, a 10K is probably a better value.
Config
The config file has a number of sections and keys. Config processing now defaults what it does not find in the config file, so you would only need to enter the unique or changed values. If you make a mistake in a value, such as text in a numeric field, a message will be logged.
The defaults can be seen in misc.py source.
[fan]
# When the temperature is above lv0, the fan at 25% power,
# and lv1 at 50% power, lv2 at 75% power, lv3 at 100% power.
# When the temperature is below lv0, the fan is essentially turned off.
# If linear is true, the fan speed is continuously maintained between
# the temperature limits of lv0 (25%) and lv3 (100%) and lv1/lv2 are ignored.
# If temp_disks is true, the average temperature of the SD disks is used,
# with the CPU temperature (max between cpu/disk) to set fan speed.
# You can change these values if necessary.
lv0 = 25
lv1 = 40
lv2 = 45
lv3 = 50
linear = true
temp_disks = true
[key]
# You can customize the function of the key, currently available functions are
# slider: oled display next page
# switch: fan turn on/off switch
# reboot, poweroff
# If you have any good suggestions for key functions,
# please add an issue on https://setq.me/rockpi-sata
click = slider
twice = switch
press = poweroff
[time]
# twice: maximum time between double clicking (seconds)
# press: long press time (seconds)
twice = 0.7
press = 3
[slider]
# Whether the oled auto display next page and the time interval (seconds)
# If refresh is non-zero it will be the period between successive refreshes
# of the data in the currently displayed page.
auto = true
time = 4
refresh = 1.0
[oled]
# Whether rotate the text of oled 180 degrees, whether all temps are in Fahrenheit
# or Celsius, including fan lv# temperatures.
rotate = true
f-temp = false
[network]
# Name of the interfaces which should be measured (separated with |)
# Leave it blank (after the =) if you don't want to use it
# Option 'auto' means select them automatically by their link status
# (every interface with link status is UP)
interfaces = eth0
[disk]
# Mount points for non-root disks to show space usage (separated with |)
# Mount points found through df -h, rather than drive names which change
space_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3
# Mount points for disks to show space usage (separated with |)
# Leave it blank (after the =) if you don't want to use it
# Mount point name is from "df -Bg" command.
io_usage_mnt_points = /srv/dev-disk-by-uuid-7adfa17c-a353-46e7-8c9a-eec58f9c86e3
# Detect Automatically the disks and show their temperatures
disks_temp = true
main.py
#!/usr/bin/env python3
import sys
import time
import fan
import misc
import multiprocessing as mp
""" Conditionally import the oled functions and flag if
the top_board seems to exist.
Log any exception generated.
"""
try:
import oled
top_board = 1
except Exception as ex:
top_board = 0
print (ex)
q = mp.Queue() # communication on watch_key/receive_key
display_queue = mp.Queue() # communication in the display processor
refresh_theshold = 0.1 # we will not refresh if period is less than this (seconds)
action = {
'none': lambda: 'nothing',
'slider': lambda: display_queue.put(True),
'switch': lambda: misc.fan_switch(),
'reboot': lambda: misc.check_call('reboot'),
'poweroff': lambda: misc.check_call('poweroff --halt'),
}
"""
Receive a user input from the queue as a 'key' value
to get the processing function and then run the action
for it.
"""
def receive_key(q):
while True:
func = misc.get_func(q.get())
action[func]()
time.sleep(0.1)
def main():
if sys.argv[-1] == 'on':
if top_board:
oled.welcome()
elif sys.argv[-1] == 'off':
if top_board:
oled.goodbye()
exit(0)
if __name__ == '__main__':
main()
if top_board:
p_key_processor = mp.Process(target=receive_key, args=(q,), name='Receive Key')
p_key_processor.start()
p_Key_decoder = mp.Process(target=misc.watch_key, args=(q,), name='Watch Key')
p_Key_decoder.start()
p_display_mamager = mp.Process(target=oled.auto_slider, name='Auto Slider', args=(display_queue,))
p_display_mamager.start()
p_display_process = mp.Process(target=oled.display_process, name='Display Process', args=(display_queue,))
p_display_process.start()
refresh_period = misc.get_refresh_period()
if (refresh_period > refresh_theshold):
p_refresh_display = mp.Process(target=oled.refresh_display, name='Refresh Display', args=(display_queue,))
p_refresh_display.start()
p_fan = mp.Process(target=fan.running, name='Fan')
p_fan.start()
p_fan.join()
fan.py
#!/usr/bin/env python3
"""
Control the PENTA HAT top fan according to the temperature of the RockPi.
We really should read the top hat temperature (as with the SATA HAT code).
Uses the mraa GPIO library for PWM.
: https://iotdk.intel.com/docs/master/mraa/python/mraa.html#pwm
PWM ratio is in range of 0.0 to 1.0 - it is the off ratio, rather than on ratio
Fan PWN frequency is 25KHz, according to Noctua fan white paper
: https://noctua.at/pub/media/wysiwyg/Noctua_PWM_specifications_white_paper.pdf
"""
import time
import mraa # pylint: disable=import-error
import misc
fan_pin = 13
pin13 = mraa.Pwm(fan_pin)
pin13.period_us(40)
pin13.enable(True)
"""
Read the CPU temperature and include disks if we want
to use their temperature as well. This means that we
have to be capturing disk temperatures (auto display or polled).
"""
def read_cpu_temp():
with open('/sys/class/thermal/thermal_zone0/temp') as f:
t_cpu = float(f.read().strip()) / 1000.0
if misc.is_temp_farenheit():
t_cpu = t_cpu *1.8 + 32
if misc.is_fan_cpu_and_disk():
if (misc.get_last_disk_temp_poll() + misc.get_fan_poll_delay()) < time.time(): # poll disk temps
misc.get_disk_temp_info()
t_disk = misc.get_disk_temp_average()
else:
t_disk = 0.0
cpu_temp = max(t_cpu, t_disk)
return cpu_temp
"""
Return the speed % the top_board fan.
"""
def get_dc(cache={}):
if not(misc.fan_running()):
return 0.1 # 0.0% can make fan run faster
# limit the update rate to once every 5 seconds
if time.time() - cache.get('time', 0) > 5:
cache['time'] = time.time()
cache['dc'] = misc.fan_temp2dc(read_cpu_temp())
return cache['dc']
"""
Change the PWM off ratio for the fan if it changed.
We receive a percent and need a 0.0 to 1.0 off ratio.
"""
def change_dc(dc, cache={}):
if dc != cache.get('dc'):
cache['dc'] = dc
pin13.write(1 - (dc / 100))
"""
Main loop updating the fan's speed according to the
desired temperature thresholds.
"""
def running():
while True:
change_dc(get_dc())
time.sleep(0.1)
oled.py
#!/usr/bin/env python3
"""
Manage the display according to the configuration data and
the state of the hardware.
Displayed pages have their values update just prior to display
and the user can specify a refresh period so that a page that
stays up a long time can be refreshed to get current state.
I/O rates are for the period since the last update for that
device's display page, so they are continuous collections.
"""
from pickle import NONE
import time
from weakref import ref
import misc
import fan
import Adafruit_SSD1306
import multiprocessing as mp
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
font = {
'10': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 10),
'11': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 11),
'12': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 12),
'14': ImageFont.truetype('fonts/DejaVuSansMono-Bold.ttf', 14),
}
def disp_init():
disp = Adafruit_SSD1306.SSD1306_128_32(rst=None, i2c_bus=7)
[getattr(disp, x)() for x in ('begin', 'clear', 'display')]
return disp
"""
Mainline functional code to setup environment.
manager mutable variables are used between processes.
"""
manager = mp.Manager()
refresh_time = manager.list()
refresh_time += [time.time()]
next_time = manager.list()
next_time += [time.time()]
display_lock = mp.Lock() # lock clean display updates
"""
Condition the display when first imported. If we have any
problems, our exception will signal our inability to handle
the display.
"""
misc.set_mode(23, 0)
time.sleep(0.2)
misc.set_mode(23, 1)
try:
disp = disp_init()
except Exception:
misc.open_pwm_i2c()
time.sleep(0.2)
disp = disp_init()
image = Image.new('1', (disp.width, disp.height))
draw = ImageDraw.Draw(image)
def disp_show():
im = image.rotate(180) if misc.conf['oled']['rotate'] else image
disp.image(im)
disp.display()
draw.rectangle((0, 0, disp.width, disp.height), outline=0, fill=0)
def welcome():
with display_lock:
draw.text((0, 0), 'ROCKPi PENTA HAT', font=font['14'], fill=255)
draw.text((32, 16), 'Loading...', font=font['12'], fill=255)
disp_show()
misc.get_disk_io_rates()
misc.get_interface_io_rates()
time.sleep(1)
def goodbye():
with display_lock:
draw.text((32, 8), 'Good Bye ~', font=font['14'], fill=255)
disp_show()
time.sleep(2)
with display_lock:
disp_show() # clear
"""
A Class to hold page generators. Page generators are
specialized to generate their specific data, current
to the time of display.
page_factory will return a list of page objects.
get_page_text will return an empty Display data list
if we have nothing to display.
"""
class Generated_page:
"""
Return a list of page objects.
"""
@staticmethod
def page_factory(self):
return list()
"""
Return a list of Display Text for each
of the display lines.
If data is invalid or missing, the list is empty,
and should be skipped.
"""
def get_page_text():
return {'line 1', 'line 2', 'line 3'}
""" Generate list of display page 0 - Uptime, CPU Temp, Host network address. """
class General_system_info_page0 (Generated_page):
@staticmethod
def page_factory():
return [General_system_info_page0()]
def get_page_text(self):
return [{'xy': (0, -2), 'text': misc.get_info('up'), 'fill': 255, 'font': font['11']},
{'xy': (0, 10), 'text': get_cpu_temp(), 'fill': 255, 'font': font['11']},
{'xy': (0, 21), 'text': misc.get_info('ip'), 'fill': 255, 'font': font['11']}
]
""" Generate the list of display page 1 - Fan speed %, CPU use, Memory Use. """
class General_system_info_page1 (Generated_page):
@staticmethod
def page_factory():
return [General_system_info_page1()]
def get_page_text(self):
return [{'xy': (0, -2), 'text': 'Fan speed: ' + str(int(fan.get_dc())) + '%', 'fill': 255, 'font': font['11']},
{'xy': (0, 10), 'text': misc.get_info('cpu'), 'fill': 255, 'font': font['11']},
{'xy': (0, 21), 'text': misc.get_info('mem'), 'fill': 255, 'font': font['11']}
]
""" Generate List of 1 Disk info page. This will show the %full
of up to the first 4 drives specified, plus the root.
"""
class Disk_info_page (Generated_page):
@staticmethod
def page_factory():
return [Disk_info_page()]
def get_page_text(self):
k, v = misc.get_disk_used_info()
text1 = 'Disk: {} {}'.format(k[0], v[0])
text2 = ''
text3 = ''
if len(k) >= 5: # take first 4 if more than 3 disks
text2 = '{} {} {} {}'.format(k[1], v[1], k[2], v[2])
text3 = '{} {} {} {}'.format(k[3], v[3], k[4], v[4])
elif len(k) == 4:
text2 = '{} {} {} {}'.format(k[1], v[1], k[2], v[2])
text3 = '{} {}'.format(k[3], v[3])
elif len(k) == 3:
text2 = '{} {} {} {}'.format(k[1], v[1], k[2], v[2])
elif len(k) == 2:
text2 = '{} {}'.format(k[1], v[1])
return [
{'xy': (0, -2), 'text': text1, 'fill': 255, 'font': font['11']},
{'xy': (0, 10), 'text': text2, 'fill': 255, 'font': font['11']},
{'xy': (0, 21), 'text': text3, 'fill': 255, 'font': font['11']},
]
""" The generator for a list of display pages, one for each network
interface whose I/O rate has been requested. The list is empty if no
interfaces have had their I/O rates requested.
"""
class Interface_io_page(Generated_page):
# A page object is generated for each interface.
def __init__(self, interface_name):
self.interface_name = interface_name
@staticmethod
def page_factory():
interface_list = []
interfaces = misc.get_interface_list()
if not interfaces:
return interface_list
for interface in interfaces:
interface_list += [Interface_io_page(interface)]
return interface_list
def get_page_text(self):
# update the current rate
interface_rate = misc.get_interface_io_rate(self.interface_name)
rx = 'Rx:{:10.6f}'.format(interface_rate["rx"]) + " MB/s"
tx = 'Tx:{:10.6f}'.format(interface_rate["tx"]) + " MB/s"
return [
{'xy': (0, -2), 'text': 'Network (' + self.interface_name + '):', 'fill': 255, 'font': font['11']},
{'xy': (0, 10), 'text': rx, 'fill': 255, 'font': font['11']},
{'xy': (0, 21), 'text': tx, 'fill': 255, 'font': font['11']}
]
""" The generator for a list of display pages, one for each disk
whose I/O rate has been requested. The list is empty if no disks
have had their I/O rates requested.
"""
class Disk_io_page(Generated_page):
# A page object is generated for each disk.
def __init__(self, disk_name):
self.disk_name = disk_name
@staticmethod
def page_factory():
disk_list = []
disks = misc.get_disk_list('io_usage_mnt_points')
if not disks:
return disk_list
for disk_name in disks:
disk_name = misc.delete_disk_partition_number(disk_name)
disk_list += [Disk_io_page(disk_name)]
return disk_list
def get_page_text(self):
disk_rate = misc.get_disk_io_rate(self.disk_name)
read = 'R:{:11.6f}'.format(disk_rate["rx"]) + " MB/s"
write = 'W:{:11.6f}'.format(disk_rate["tx"]) + " MB/s"
return [
{'xy': (0, -2), 'text': 'Disk (' + self.disk_name + '):', 'fill': 255, 'font': font['11']},
{'xy': (0, 10), 'text': read, 'fill': 255, 'font': font['11']},
{'xy': (0, 21), 'text': write, 'fill': 255, 'font': font['11']}
]
""" The generator for a list of 1 display page, for the temperatures
of up to the 1st four /dev/sd* drives. The list is empty if no
/dev/sd* disks are plugged in.
"""
class Disk_temp_info_page (Generated_page):
@staticmethod
def page_factory():
return [Disk_temp_info_page()] if misc.conf['disk']['disks_temp'] else []
""" Get the display text list of display records for this entry. """
def get_page_text(self):
k, v = misc.get_disk_temp_info()
self.k = k
text1 = 'Disk Temps:'
text2 = ''
text3 = ''
if len(k) >= 4:
text2 = '{} {} {} {}'.format(k[0], v[0], k[1], v[1])
text3 = '{} {} {} {}'.format(k[2], v[2], k[3], v[3])
elif len(k) == 3:
text2 = '{} {} {} {}'.format(k[0], v[0], k[1], v[1])
text3 = '{} {}'.format(k[2], v[2])
elif len(k) == 2:
text2 = '{} {} {} {}'.format(k[0], v[0], k[1], v[1])
elif len(k) == 1:
text2 = '{}'.format(k[0], v[0])
return [
{'xy': (0, -2), 'text': text1, 'fill': 255, 'font': font['11']},
{'xy': (0, 10), 'text': text2, 'fill': 255, 'font': font['11']},
{'xy': (0, 21), 'text': text3, 'fill': 255, 'font': font['11']},
]
"""
Generate all the display page objects. We will iterate through the
list and generate their text just before display. If a generator is
configured to not generate any page its returned list will be empty.
"""
def gen_display_pages_list():
display_page_list = General_system_info_page0.page_factory()
display_page_list += General_system_info_page1.page_factory()
display_page_list += Disk_info_page.page_factory()
display_page_list += Interface_io_page.page_factory()
display_page_list += Disk_io_page.page_factory()
display_page_list += Disk_temp_info_page.page_factory()
return display_page_list
"""
Return a string with the current CPU temperature converted
into the desired scale (f/c), ready for display
"""
def get_cpu_temp():
t = float(misc.get_info('temp')) / 1000
if misc.is_temp_farenheit():
temp = "CPU Temp: {:.0f}°F".format(t * 1.8 + 32)
else:
temp = "CPU Temp: {:.1f}°C".format(t)
return temp
"""
Update the display on a timed basis, if we are configured as auto.
"""
def auto_slider(display_queue):
next_time[0] = time.time() + 10
display_queue.put(True) # force an initial display
while misc.conf['slider']['auto']: # to allow retry on duration config
duration = misc.get_slider_sleep_duration()
if duration:
next_time[0] = time.time() + duration
while time.time() < next_time[0]:
time.sleep(0.1)
display_queue.put(True)
else:
time.sleep(0.1) # wait for misc to startup and read config
"""
display_process runs to update the display from a Boolean on its
display_queue. True causes it to display the next page and can
come from the auto_slider process as it runs through the timer
or from the button. When a next is performed, it resets the auto
timer so that a button advance will last as long as an auto advance.
The refresh timer is reset.
A False means that the display is refreshed with the current page
and its updated data. It does not change the auto timer.
"""
def display_process(display_queue):
last_page = [None]
display_list = []
misc.get_interface_io_rates()
misc.get_disk_io_rates()
"""
Follow the list of pages to be generated and
displayed and generate a fresh set when exhausted.
"""
while True:
action = display_queue.get() # wait for an display request
with display_lock:
if not len(display_list): # refresh before first display
display_list += gen_display_pages_list()
if action: # next page and reset time
next_time[0] = time.time() + misc.get_slider_sleep_duration()
refresh_time[0] = time.time() + misc.get_refresh_period()
last_page[0] = display_list[0] if action else last_page[0] # refresh displays the last page
if last_page[0]:
for item in last_page[0].get_page_text():
draw.text(**item)
disp_show()
if action:
display_list.pop(0)
"""
We will refresh the display status for the current page. Refresh time
is updated by the display_process each time a page (new or current) is
displayed.
"""
def refresh_display(display_queue):
while misc.get_refresh_period():
if time.time() > refresh_time[0]:
refresh_time[0] = time.time() + misc.get_refresh_period()
display_queue.put(False)
time.sleep(0.1)
misc.py
The body of the misc.py code exceeds the limit of a single entry, and will follow as a reply to this entry.