# vscode: ctrl-shift-p > linting enabled > disable from network import LoRa from network import Bluetooth from network import WLAN from machine import Pin from machine import SD import time, socket, ubinascii, pycom, machine, struct, math, os, sys, pysense, ledPatterns from SI7006A20 import SI7006A20 from microWebCli import MicroWebCli from machine import WDT #new 2.0.9 # ------------- Parameters ------------------ firmwareVersionMajor = 2 firmwareVersionMinor = 4 firmwareVersionRev = 0 default_uplinkPortPayload = 100 default_uplinkPortConfig = 10 default_uplinkPortStatistic = 20 default_BTScanTime = 10 default_Sleeptime = 300 default_StatisticUplinkFactor = 10 # Beresa UUID : 36bd2874-979b-4a8d-9d37-c610320f16da # FUOTA UUID : d5b71956-3648-4fb9-8959-fe29498b252c uuidWhitelist={'36bd2874979b4a8d9d37c610320f16da','d5b7195636484fb98959fe29498b252c'} # WLAN AP for updating firmware updateSSID = "SMBE2020" updatePWD = "#h5K8d2Z!" # for debugging / testing withDeepSleep = True debugON = True withSDlogging = False # global vars / state sendConfigInCurrentIteration = False rejoinLoraInCurrentIteration = False # if rejoin is successfull, sendConfigInCurrentIteration will be set to True automatically # LoRa parameters confirmed = False devEUI = '' #declarations bt = Bluetooth(antenna=Bluetooth.EXT_ANT) py = pysense.Pysense() si = SI7006A20(py) sd = None def joinLoRa(): global sendConfigInCurrentIteration, rejoinLoraInCurrentIteration #limits join time to 10 seconds to avoid infinite join loop writeLogfile("joinLora()") try: ledPatterns.ResetLED() t_join = time.time() + 10 if debugON: print('original board DevEUI: ', devEUI) app_eui = ubinascii.unhexlify('2ab3d5499da00f6c') app_key = ubinascii.unhexlify('a81758fffe035f8b70b3d5e75e00537f') if debugON: print('starting join process..') lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), dr=0, timeout=0) ledPatterns.JoiningPattern() while time.time() < t_join: time.sleep(1) if lora.has_joined(): break if lora.has_joined(): if debugON: print('join: OK') sendConfigInCurrentIteration = True rejoinLoraInCurrentIteration = False # new in 2.4.0 else: if debugON: print('join: failed: waiting 5 sec, then do normal deepsleep, then try again in next iteration..') time.sleep(5) gotoSleep() except Exception as e: ledPatterns.ErrorPattern() if debugON: print('join: failed: waiting 5 sec, then do normal deepsleep, then try again in next iteration..', str(e)) writeErrorFile(e) time.sleep(5) gotoSleep() def bluetoothScan(): writeLogfile("bluetoothScan()") try: ledPatterns.ResetLED() try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime if debugON: print("Scanning for BLE advertisements for seconds: ", currentScanTime) bt.start_scan(currentScanTime) x=[] ledPatterns.BLEScanningPattern() while bt.isscanning(): y=bt.get_adv() if y: x.append(y) bt.deinit() # new 2.0.9 return x except Exception as e: ledPatterns.ErrorPattern() x=[] writeErrorFile(e) return x def bluetoothDataFilter(adv_list): writeLogfile("bluetoothDataFilter()") try: ledPatterns.ResetLED() if debugON: print("Filtering Bluetooth advertisements ") rssi_datalist=[] bleDict = {} values = [] if adv_list: if debugON: print(" total number of scanned advertisements: " + str(len(adv_list))) for i in range(len(adv_list)): adv_data = bt.resolve_adv_data(adv_list[i].data, Bluetooth.ADV_MANUFACTURER_DATA) rssi_data = abs(adv_list[i].rssi) if adv_data: mfg_data = ubinascii.hexlify(adv_data).decode("utf8") uuid=mfg_data[8:40] if uuid in uuidWhitelist: minor=mfg_data[44:48] if uuid == "d5b7195636484fb98959fe29498b252c": major=mfg_data[40:44] if debugON: print(" FUOTA Update Beacon scanned. Going to start WLAN Update for version: " + "{:02d}".format(int(major,16)) + "{:02d}".format(int(minor,16))+"00") if int(major,16) != firmwareVersionMajor or int(minor,16) != firmwareVersionMinor: downloadUpdateFromWeb("{:02d}".format(int(major,16))+"{:02d}".format(int(minor,16)) + "00") return["FFFF"] else: if debugON: print(" skip FUOTA Update because same version is already running..") try: values = bleDict[str(minor)] values.append(rssi_data) except: bleDict[str(minor)] = [rssi_data] if debugON: print(bleDict) filtered_beacons = [] for key in bleDict: rssi_min = ubinascii.hexlify(struct.pack('B', min(bleDict.get(key)))).decode("utf8") rssi_max = ubinascii.hexlify(struct.pack('B', max(bleDict.get(key)))).decode("utf8") rssi_avg = ubinascii.hexlify(struct.pack('B', int(sum(bleDict.get(key)) / len(bleDict.get(key))))).decode("utf8") if debugON: print("minor:" + key) if debugON: print(" rssimin:" + str(int(rssi_min,16)) + "=" + str(rssi_min)) if debugON: print(" rssimax:" + str(int(rssi_max,16)) + "=" + str(rssi_max)) if debugON: print(" rssiavg:" + str(int(rssi_avg,16)) + "=" + str(rssi_avg)) filtered_beacons.append(str(key) + rssi_avg) if debugON: print("Total number of received Beacons : ",len(filtered_beacons)) if debugON: print("Ending Filtering operation:", str(filtered_beacons)) return filtered_beacons except Exception as e: ledPatterns.ErrorPattern() filtered_beacons = [] writeErrorFile(e) if debugON: print("ERROR BLEDataFilter:", str(e)) return filtered_beacons def assembleLoraPayload(filtered_beacons): writeLogfile("assembleLoraPayload()") try: ledPatterns.ResetLED() if debugON: print("Assembling LoRa payload (max. first 70 beacons)") loraPayload=[] received_beacons=len(filtered_beacons) if received_beacons >= 70: len_payload = 70 else: len_payload=received_beacons for a in range(len_payload): loraPayload.append(ubinascii.unhexlify(filtered_beacons[a])) return loraPayload except Exception as e: ledPatterns.ErrorPattern() loraPayload=[] writeErrorFile(e) return loraPayload def readBatteryVoltage(): writeLogfile("readBatteryVoltage()") try: ledPatterns.ResetLED() tmpVolt = py.read_battery_voltage() tmpVolt = int(tmpVolt * 100) if debugON: print("BatteryVoltage mV:" , tmpVolt) return tmpVolt except Exception as e: ledPatterns.ErrorPattern() writeErrorFile(e) return 0 def sendBLEPayload(payload): writeLogfile("sendBLEPayload") try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 0) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5 ) # SF7 s.bind(default_uplinkPortPayload) s.setblocking(True) if(payload==[]): payload=[ubinascii.unhexlify('000102030405060708090A0B0C0D0E0F')] # because Pycom does not send empty uplinks if debugON: print(' BeaconData (HEX) : ', ubinascii.hexlify(b''.join(payload))) loraPayloadBytes = b''.join(payload) if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) rx_beacons = struct.pack('b',len(payload)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Sent Bytes :', count) s.setblocking(False) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') joinLoRa() except Exception as e: ledPatterns.ErrorPattern() writeErrorFile(e) pass def increaseUplinkCounter(): writeLogfile("increaseUplinkCounter()") try: ledPatterns.ResetLED() try: uplinkCounter = pycom.nvs_get('UplinkCounter') except Exception as e: uplinkCounter = 0 # NVRAM only supports 32bit Int if(uplinkCounter >= 4294967290): uplinkCounter = 0 uplinkCounter +=1 if debugON: print(' new uplink counter :', uplinkCounter) pycom.nvs_set("UplinkCounter",uplinkCounter) except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) def checkUplinkCounter(): writeLogfile("checkUplinkCounter()") try: if debugON: print(' checking uplink counter') currentStatFactor = 0 uplinkCounter = 1 try: currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: currentStatFactor = 0 if(currentStatFactor==0): if debugON: print(" no StatisticFactor set, using default factor:" + str(default_StatisticUplinkFactor)) currentStatFactor = default_StatisticUplinkFactor try: uplinkCounter = pycom.nvs_get('UplinkCounter') except Exception as e: uplinkCounter = 1 writeLogfile("current Uplink Counter:" + str(uplinkCounter)) if(uplinkCounter%currentStatFactor==0): sendStatisticPayload() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) def sendStatisticPayload(): global rejoinLoraInCurrentIteration writeLogfile("sendStatisticPayload()") try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() if debugON: print('Sending statistic uplink..') if debugON: print('---------------------------------------------') s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 1) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5 ) # SF7 s.bind(default_uplinkPortStatistic) s.setblocking(True) temp = si.temperature() tempPayload = 10000+int(temp*100) print("Temperature: " + str(tempPayload)) humidPayload = int(si.humid_ambient(temp)*100) print("Humidity : "+ str(humidPayload)) battVolt = readBatteryVoltage() if debugON: print(' Voltage (HEX) : ', "%x" % battVolt) try: uplinkCounter = pycom.nvs_get('UplinkCounter') except: uplinkCounter = 0 print("Uplink Count: "+ str(uplinkCounter)) loraPayloadBytes = struct.pack(">I", uplinkCounter) # unsigned int 4 bytes loraPayloadBytes += struct.pack(">H", battVolt) # 2 unsigned int 2 bytes (short) loraPayloadBytes += struct.pack(">H", tempPayload) loraPayloadBytes += struct.pack(">H", humidPayload) if debugON: print(' sending: Statistic Payload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Uplink OK, sent Bytes :', count) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') if debugON: print('---------------------------------------------') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' ERROR, going to re-join..') rejoinLoraInCurrentIteration = True except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) def sendConfigPayload(): global sendConfigInCurrentIteration writeLogfile("sendConfigPayload()") try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() if debugON: print('Sending firmware version and config uplink..') s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 1) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 0 ) # SF12 s.bind(default_uplinkPortConfig) s.setblocking(True) try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: writeErrorFile(e) currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime try: currentSleepTime = pycom.nvs_get('SleepTime') except Exception as e: writeErrorFile(e) currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime try: currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: pycom.nvs_set("StatisticFactor",default_StatisticUplinkFactor) writeErrorFile(e) try: # second try if factor has really been set currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: currentStatFactor = 0 if debugON: print(' Firmware Major (HEX) : ', "%x" % firmwareVersionMajor) if debugON: print(' Firmware Minor (HEX) : ', "%x" % firmwareVersionMinor) if debugON: print(' Firmware Rev (HEX) : ', "%x" % firmwareVersionRev) if debugON: print(' BLE Scan Time (HEX) : ', "%x" % currentScanTime) if debugON: print(' DeepSleep Time (HEX) : ', "%x" % currentSleepTime) if debugON: print(' StatisticFactor (HEX) : ', "%x" % currentStatFactor) loraPayloadBytes = struct.pack(">B", firmwareVersionMajor) # 1 byte unsigned char loraPayloadBytes += struct.pack(">B", firmwareVersionMinor) loraPayloadBytes += struct.pack(">B", firmwareVersionRev) loraPayloadBytes += struct.pack(">B", currentScanTime) loraPayloadBytes += struct.pack(">H", currentSleepTime) # 2 byte unsigned int loraPayloadBytes += struct.pack(">B", currentStatFactor) if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) writeErrorFile(e) uplinkok = False s.setblocking(False) if(uplinkok): sendConfigInCurrentIteration = False if debugON: print(' Uplink OK, sent Bytes :', count) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, config uplink was not successfull') sendConfigInCurrentIteration = False # to avoid loops except Exception as e: ledPatterns.ErrorPattern() writeErrorFile(e) sendConfigInCurrentIteration = False # to avoid loops def processDownlink(data): global sendConfigInCurrentIteration writeLogfile("processDownlink()") try: ledPatterns.ResetLED() if debugON: print(' Received Downlink:', ubinascii.hexlify(data).decode("utf-8")) if debugON: print(' ACK RSSI :', math.fabs(lora.stats()[1])) if debugON: print(' ACK SNR :', lora.stats()[2]) if debugON: print(' ACK DR :', lora.stats()[3]) if(int.from_bytes(data[0:1],'big') == 1): if debugON: print(' configuration downlink for bt scan, sleep time and StatisticFactor') if debugON: print(' Scan Time:',int.from_bytes(data[1:2],'big')) pycom.nvs_set("BTScanTime",int.from_bytes(data[1:2],'big')) sleepT = int.from_bytes(data[2:4],'big') if debugON: print(' Sleep Time:',sleepT) pycom.nvs_set("SleepTime",sleepT) if debugON: print(' StatisticFactor:',int.from_bytes(data[4:6],'big')) pycom.nvs_set("StatisticFactor",int.from_bytes(data[4:6],'big')) sendConfigInCurrentIteration = True if(int.from_bytes(data[0:1],'big') == 2): if debugON: print(' action downlink received') if(int.from_bytes(data[1:2],'big') == 2): if debugON: print(' going to update from web..') downloadUpdateFromWeb("{:02d}".format(int.from_bytes(data[2:3],'big'))+"{:02d}".format(int.from_bytes(data[3:4],'big'))+"{:02d}".format(int.from_bytes(data[4:5],'big'))) if(int.from_bytes(data[1:2],'big') == 3): if debugON: print(' going to reset uplink counter..') pycom.nvs_set("UplinkCounter",0) if(int.from_bytes(data[1:2],'big') == 4): if debugON: print(' immediate sleep downlink received..') sleepT = int.from_bytes(data[2:4],'big') if debugON: print(' sleep time sec:', sleepT) if sleepT > 10: doDeepSleep(sleepT) except Exception as e: writeErrorFile(e) def doDeepSleep(seconds): try: writeLogfile("doDeepSleep(): " + str(seconds)) ledPatterns.ManualSleep() lora.nvram_save() time.sleep(0.5) py.setup_sleep(seconds) py.go_to_sleep() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) time.sleep(1) machine.reset() def gotoSleep(): global withDeepSleep try: writeLogfile("gotoSleep()") ledPatterns.ResetLED() try: currentSleepTime = pycom.nvs_get('SleepTime') except: currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime if(currentSleepTime == 0): if debugON: print('going to skip sleep and continue with fast-scanning-mode..') withDeepSleep = False time.sleep(0.5) else: if debugON: print('going to deep sleep for seconds: ' + str(currentSleepTime)) withDeepSleep = True ledPatterns.LongSleep() lora.nvram_save() time.sleep(0.5) py.setup_sleep(currentSleepTime) py.go_to_sleep() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) time.sleep(1) machine.reset() def downloadUpdateFromWeb(version = ""): writeLogfile("downloadUpdateFromWeb()") try: ledPatterns.ResetLED() if debugON: print('--------------------------------------------') if debugON: print('Connecting to wifi network to check for updates for version: ' + version) Pin('P12', mode=Pin.OUT)(True) wlan = WLAN(mode=WLAN.STA, antenna=WLAN.EXT_ANT, power_save=False) nets = wlan.scan() for net in nets: if debugON: print(' checking wifi network:' + net.ssid) if(net.ssid == updateSSID): if debugON: print(' correct update network found!') wlan.connect(net.ssid, auth=(net.sec, updatePWD), timeout=5000) while not wlan.isconnected(): machine.idle() # save power while waiting if debugON: print(' WLAN connection succeeded!') if debugON: print(' checking update files..') contentType = MicroWebCli.FileRequest('http://downloads.smartmakers.de/beaconscanner/9e35cf44-774a-421c-93d7-a8baab2a6752/main'+ version + '.txt', '/flash/main.py') if debugON: print(' content type: ', contentType) if debugON: print(' File of content type "%s" was saved to "%s"' % (contentType, '/flash/main.py')) break if debugON: print(' disconnecting from WLAN..') wlan.deinit() if debugON: print(' done, going to reboot') time.sleep(2) machine.reset() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR Fuota: ', str(e)) writeErrorFile(e) time.sleep(1) machine.reset() def writeLogfile(message): try: if withSDlogging: with open('/sd/logfile.txt','a') as f: f.write(message + "\r\n") f.close() except Exception as e: if debugON: print(' ERROR: ', str(e)) def writeErrorFile(exept): try: if withSDlogging: with open('/sd/logfile.txt','a') as f: sys.print_exception(exept, f) f.close() time.sleep(1) except Exception as e: if debugON: print(' ERROR: ', str(e)) # Main Program # ------------------------------------------------------------------------------------------------------- pycom.heartbeat(False) time.sleep(0.5) pycom.rgbled(0x000500) time.sleep(0.5) pycom.rgbled(0x050000) time.sleep(0.5) pycom.rgbled(0x000005) time.sleep(0.5) try: pycom.rgbled(0x000000) # off if pycom.wifi_on_boot() == True: print("############# setting wifi_on_boot() flag to false #################") pycom.wifi_on_boot(False) else: print("## setting wifi_on_boot() = false") if pycom.wdt_on_boot() == False: print("############# setting wdt_on_boot() flag to true #################") pycom.wdt_on_boot(True) pycom.wdt_on_boot_timeout(90000) else: print("## setting wdt_on_boot() = true") sendConfigInCurrentIteration = False rejoinLoraInCurrentIteration = False if withSDlogging: try: sd = machine.SD() os.mount(sd, '/sd') except Exception as e: print("ERROR: SD logging activated but unable to mount SD card : " + str(e)) withSDlogging = False if withSDlogging: with open('/sd/logfile.txt','a') as f: f.write('-------------------------START Lopy----------------------' + "\r\n") f.write('ResetCause : ' + str(machine.reset_cause()) + "\r\n") f.write('WakeupCause : ' + str(py.get_wake_reason()) + "\r\n") f.write('---------------------------------------------------------' + "\r\n") f.close() # os.uname() # get boards unique devEUI lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868 , device_class=LoRa.CLASS_A) devEUI = str(ubinascii.hexlify(lora.mac().decode('ascii')), 'utf-8') print("Pycom booted. Lets go! DevEUI: ", devEUI) writeLogfile("Pycom booted. Lets go! DevEUI: " + str(devEUI)) while True: # loop for fast-scanning-mode if we dont use deepsleep if debugON: print("==================================================================================") if debugON: print('ResetCause : ', machine.reset_cause()) print('WakeupCause : ', py.get_wake_reason()) if(int(machine.reset_cause()) == 0): # machine.PWRON_RESET if(int(py.get_wake_reason()) == 4): print("..woken up from deepsleep") if(int(py.get_wake_reason()) == 2): # machine.RTC_WAKE print("..woken up because of user button press") rejoinLoraInCurrentIteration = True if(int(py.get_wake_reason()) == 0): # machine.PWRON_WAKE print("..woken up after power loss") sendConfigInCurrentIteration = True if(int(machine.reset_cause()) == 2): # machine.WDT_RESET sendConfigInCurrentIteration = True if(int(py.get_wake_reason()) == 2): print("..restart after update from VSCode") if(int(py.get_wake_reason()) == 172): print("..restart after update from VSCode") if(withDeepSleep): # restore last LoRa status (join status, keys, etc.) from NVRam lora.nvram_restore() time.sleep(0.5) if(not lora.has_joined()): if debugON: print('Lora not joined according to NVRam restore !') rejoinLoraInCurrentIteration = True else: if debugON: print('Lora is still joined according to NVRam restore.') if(rejoinLoraInCurrentIteration): joinLoRa() #scan for bluetooth advertisements data_list=bluetoothScan() #filter bluetooth advertisements based on UUID filtered_beacons = bluetoothDataFilter(data_list) #assemble LoRa payload if filtered_beacons != ["FFFF"]: # FUOTA identifier loraPayload = assembleLoraPayload(filtered_beacons) #send LoRa payload sendBLEPayload(loraPayload) checkUplinkCounter() # check if statistic uplink is needed if(sendConfigInCurrentIteration): # final check before deepsleep time.sleep(5) # to avoid duty cycle / too fast uplinks after each other sendConfigPayload() if(rejoinLoraInCurrentIteration): #new 2.4.0 because statistic or config uplink (confirmed) may indicate that re-join is needed.. joinLoRa() time.sleep(5) gotoSleep() except Exception as e: ledPatterns.ErrorPattern() print("ERROR: ", str(e)) writeErrorFile(e) time.sleep(10) machine.reset()