# vscode: ctrl-shift-p > linting enabled > disable from network import LoRa from network import Bluetooth from machine import Pin from machine import SD import socket, ubinascii, pycom, machine, struct, math, os, sys import time # removed in 2.0.8 import pysense from SI7006A20 import SI7006A20 import ledPatterns # from network import WLAN removed in 2.0.8 from microWebCli import MicroWebCli # ------------- Parameters ------------------ firmwareVersionMajor = 2 firmwareVersionMinor = 0 firmwareVersionRev = 8 default_uplinkPortPayload = 100 default_uplinkPortConfig = 10 default_uplinkPortStatistic = 20 default_BTScanTime = 10 default_Sleeptime = 300 default_StatisticUplinkFactor = 10 #uuidWhitelist={'f4ca46d839c7454b84a5d1135b026b27'} # SM / Dml #uuidWhitelist={'f5317fb44ea52c723417ae4b62b0971e'} # NWBC #uuidWhitelist={'f2263aa92df99e533461df1c53b0046a'} # Solconia uuidWhitelist={'f5317fb44ea52c723417ae4b62b0971e','f4ca46d839c7454b84a5d1135b026b27','f2263aa92df99e533461df1c53b0046a'} # all known uuids # WLAN AP for updating firmware updateSSID = "Smartmakers" updatePWD = "#ImYS4Er$2" # for debugging / testing withDeepSleep = True debugON = True withSDlogging = True # global vars / state sendConfigInCurrentIteration = False rejoinLoraInCurrentIteration = False # if rejoin is successfull, sendConfigInCurrentIteration will be set to True automatically # LoRa parameters confirmed = False devEUI = '' #declarations bt = Bluetooth() bt = Bluetooth(antenna=Bluetooth.EXT_ANT) py = pysense.Pysense() si = SI7006A20(py) sd = None def joinLoRa(): global sendConfigInCurrentIteration #limits join time to 10 seconds to avoid infinite join loop writeLogfile("joinLora()") try: ledPatterns.ResetLED() t_join = time.time() + 10 if debugON: print('original board DevEUI: ', devEUI) app_eui = ubinascii.unhexlify('1111111111111111') app_key = ubinascii.unhexlify('11111111111111111111111111111111') if debugON: print('starting join process..') lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), dr=0, timeout=0) ledPatterns.JoiningPattern() while time.time() < t_join: time.sleep(1) if lora.has_joined(): break if lora.has_joined(): if debugON: print('join: OK') sendConfigInCurrentIteration = True else: if debugON: print('join: failed: waiting 5 sec, then do normal deepsleep, then try again in next iteration..') time.sleep(5) gotoSleep() except Exception as e: ledPatterns.ErrorPattern() if debugON: print('join: failed: waiting 5 sec, then do normal deepsleep, then try again in next iteration..', str(e)) writeErrorFile(e) time.sleep(5) gotoSleep() def bluetoothScan(): writeLogfile("bluetoothScan()") try: ledPatterns.ResetLED() try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime if debugON: print("Scanning for BLE advertisements for seconds: ", currentScanTime) bt.start_scan(currentScanTime) x=[] ledPatterns.BLEScanningPattern() while bt.isscanning(): y=bt.get_adv() if y: x.append(y) return x except Exception as e: ledPatterns.ErrorPattern() x=[] writeErrorFile(e) return x def bluetoothDataFilter(adv_list): writeLogfile("bluetoothDataFilter()") try: ledPatterns.ResetLED() if debugON: print("Filtering Bluetooth advertisements ") rssi_datalist=[] bleDict = {} values = [] if adv_list: if debugON: print(" total number of scanned advertisements: " + str(len(adv_list))) for i in range(len(adv_list)): adv_data = bt.resolve_adv_data(adv_list[i].data, Bluetooth.ADV_MANUFACTURER_DATA) rssi_data = abs(adv_list[i].rssi) if adv_data: mfg_data = ubinascii.hexlify(adv_data).decode("utf8") uuid=mfg_data[8:40] if uuid in uuidWhitelist: minor=mfg_data[44:48] try: values = bleDict[str(minor)] values.append(rssi_data) except: bleDict[str(minor)] = [rssi_data] if debugON: print(bleDict) filtered_beacons = [] for key in bleDict: rssi_min = ubinascii.hexlify(struct.pack('B', min(bleDict.get(key)))).decode("utf8") rssi_max = ubinascii.hexlify(struct.pack('B', max(bleDict.get(key)))).decode("utf8") rssi_avg = ubinascii.hexlify(struct.pack('B', int(sum(bleDict.get(key)) / len(bleDict.get(key))))).decode("utf8") if debugON: print("minor:" + key) if debugON: print(" rssimin:" + str(int(rssi_min,16)) + "=" + str(rssi_min)) if debugON: print(" rssimax:" + str(int(rssi_max,16)) + "=" + str(rssi_max)) if debugON: print(" rssiavg:" + str(int(rssi_avg,16)) + "=" + str(rssi_avg)) filtered_beacons.append(str(key) + rssi_avg) if debugON: print("Total number of received Beacons : ",len(filtered_beacons)) if debugON: print("Ending Filtering operation:", str(filtered_beacons)) return filtered_beacons except Exception as e: ledPatterns.ErrorPattern() filtered_beacons = [] writeErrorFile(e) return filtered_beacons def assembleLoraPayload(filtered_beacons): writeLogfile("assembleLoraPayload()") try: ledPatterns.ResetLED() if debugON: print("Assembling LoRa payload (max. first 50 beacons)") loraPayload=[] received_beacons=len(filtered_beacons) if received_beacons >= 50: len_payload = 50 else: len_payload=received_beacons for a in range(len_payload): loraPayload.append(ubinascii.unhexlify(filtered_beacons[a])) return loraPayload except Exception as e: ledPatterns.ErrorPattern() loraPayload=[] writeErrorFile(e) return loraPayload def readBatteryVoltage(): writeLogfile("readBatteryVoltage()") try: ledPatterns.ResetLED() tmpVolt = py.read_battery_voltage() tmpVolt = int(tmpVolt * 100) if debugON: print("BatteryVoltage mV:" , tmpVolt) return tmpVolt except Exception as e: ledPatterns.ErrorPattern() writeErrorFile(e) return 0 def sendBLEPayload(payload): writeLogfile("sendBLEPayload") try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 0) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5 ) # SF7 s.bind(default_uplinkPortPayload) s.setblocking(True) if(payload==[]): payload=[ubinascii.unhexlify('000102030405060708090A0B0C0D0E0F')] # because Pycom does not send empty uplinks if debugON: print(' BeaconData (HEX) : ', ubinascii.hexlify(b''.join(payload))) loraPayloadBytes = b''.join(payload) if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) rx_beacons = struct.pack('b',len(payload)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Sent Bytes :', count) s.setblocking(False) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') joinLoRa() except Exception as e: ledPatterns.ErrorPattern() writeErrorFile(e) pass def increaseUplinkCounter(): writeLogfile("increaseUplinkCounter()") try: ledPatterns.ResetLED() try: uplinkCounter = pycom.nvs_get('UplinkCounter') except Exception as e: uplinkCounter = 0 # NVRAM only supports 32bit Int if(uplinkCounter >= 4294967290): uplinkCounter = 0 uplinkCounter +=1 if debugON: print(' new uplink counter :', uplinkCounter) pycom.nvs_set("UplinkCounter",uplinkCounter) except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) def checkUplinkCounter(): writeLogfile("checkUplinkCounter()") try: if debugON: print(' checking uplink counter') currentStatFactor = 0 uplinkCounter = 1 try: currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: currentStatFactor = 0 if(currentStatFactor==0): if debugON: print(" no StatisticFactor set, using default factor:" + str(default_StatisticUplinkFactor)) currentStatFactor = default_StatisticUplinkFactor try: uplinkCounter = pycom.nvs_get('UplinkCounter') except Exception as e: uplinkCounter = 1 writeLogfile("current Uplink Counter:" + str(uplinkCounter)) if(uplinkCounter%currentStatFactor==0): sendStatisticPayload() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) def sendStatisticPayload(): global rejoinLoraInCurrentIteration writeLogfile("sendStatisticPayload()") try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() if debugON: print('Sending statistic uplink..') if debugON: print('---------------------------------------------') s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 0) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5 ) # SF7 s.bind(default_uplinkPortStatistic) s.setblocking(True) temp = si.temperature() tempPayload = 10000+int(temp*100) print("Temperature: " + str(tempPayload)) humidPayload = int(si.humid_ambient(temp)*100) print("Humidity : "+ str(humidPayload)) battVolt = readBatteryVoltage() if debugON: print(' Voltage (HEX) : ', "%x" % battVolt) try: uplinkCounter = pycom.nvs_get('UplinkCounter') except: uplinkCounter = 0 print("Uplink Count: "+ str(uplinkCounter)) loraPayloadBytes = struct.pack(">I", uplinkCounter) # unsigned int 4 bytes loraPayloadBytes += struct.pack(">H", battVolt) # 2 unsigned int 2 bytes (short) loraPayloadBytes += struct.pack(">H", tempPayload) loraPayloadBytes += struct.pack(">H", humidPayload) if debugON: print(' sending: Statistic Payload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Uplink OK, sent Bytes :', count) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') if debugON: print('---------------------------------------------') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') rejoinLoraInCurrentIteration = True except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) def sendConfigPayload(): global sendConfigInCurrentIteration writeLogfile("sendConfigPayload()") try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() if debugON: print('Sending firmware version and config uplink..') s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 1) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 0 ) # SF12 s.bind(default_uplinkPortConfig) s.setblocking(True) try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: writeErrorFile(e) currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime try: currentSleepTime = pycom.nvs_get('SleepTime') except Exception as e: writeErrorFile(e) currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime try: currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: pycom.nvs_set("StatisticFactor",default_StatisticUplinkFactor) writeErrorFile(e) try: # second try if factor has really been set currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: currentStatFactor = 0 if debugON: print(' Firmware Major (HEX) : ', "%x" % firmwareVersionMajor) if debugON: print(' Firmware Minor (HEX) : ', "%x" % firmwareVersionMinor) if debugON: print(' Firmware Rev (HEX) : ', "%x" % firmwareVersionRev) if debugON: print(' BLE Scan Time (HEX) : ', "%x" % currentScanTime) if debugON: print(' DeepSleep Time (HEX) : ', "%x" % currentSleepTime) if debugON: print(' StatisticFactor (HEX) : ', "%x" % currentStatFactor) loraPayloadBytes = struct.pack(">B", firmwareVersionMajor) # 1 byte unsigned char loraPayloadBytes += struct.pack(">B", firmwareVersionMinor) loraPayloadBytes += struct.pack(">B", firmwareVersionRev) loraPayloadBytes += struct.pack(">B", currentScanTime) loraPayloadBytes += struct.pack(">H", currentSleepTime) # 2 byte unsigned int loraPayloadBytes += struct.pack(">B", currentStatFactor) if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) writeErrorFile(e) uplinkok = False s.setblocking(False) if(uplinkok): sendConfigInCurrentIteration = False if debugON: print(' Uplink OK, sent Bytes :', count) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, config uplink was not successfull') sendConfigInCurrentIteration = False # to avoid loops except Exception as e: ledPatterns.ErrorPattern() writeErrorFile(e) sendConfigInCurrentIteration = False # to avoid loops def processDownlink(data): global sendConfigInCurrentIteration writeLogfile("processDownlink()") try: ledPatterns.ResetLED() if debugON: print(' Received Downlink:', ubinascii.hexlify(data).decode("utf-8")) if debugON: print(' ACK RSSI :', math.fabs(lora.stats()[1])) if debugON: print(' ACK SNR :', lora.stats()[2]) if debugON: print(' ACK DR :', lora.stats()[3]) if(int.from_bytes(data[0:1],'big') == 1): if debugON: print(' configuration downlink for bt scan, sleep time and StatisticFactor') if debugON: print(' Scan Time:',int.from_bytes(data[1:2],'big')) pycom.nvs_set("BTScanTime",int.from_bytes(data[1:2],'big')) sleepT = int.from_bytes(data[2:4],'big') if debugON: print(' Sleep Time:',sleepT) pycom.nvs_set("SleepTime",sleepT) if debugON: print(' StatisticFactor:',int.from_bytes(data[4:6],'big')) pycom.nvs_set("StatisticFactor",int.from_bytes(data[4:6],'big')) sendConfigInCurrentIteration = True if(int.from_bytes(data[0:1],'big') == 2): if debugON: print(' action downlink received') if(int.from_bytes(data[1:2],'big') == 2): if debugON: print(' going to update from web..') downloadUpdateFromWeb("{:02d}".format(int.from_bytes(data[2:3],'big'))+"{:02d}".format(int.from_bytes(data[3:4],'big'))+"{:02d}".format(int.from_bytes(data[4:5],'big'))) if(int.from_bytes(data[1:2],'big') == 3): if debugON: print(' going to reset uplink counter..') pycom.nvs_set("UplinkCounter",0) except Exception as e: writeErrorFile(e) def gotoSleep(): global withDeepSleep writeLogfile("gotoSleep()") try: ledPatterns.ResetLED() try: currentSleepTime = pycom.nvs_get('SleepTime') except: currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime if(currentSleepTime == 0): if debugON: print('going to skip sleep and continue with fast-scanning-mode..') withDeepSleep = False time.sleep(0.5) else: if debugON: print('going to deep sleep for seconds: ' + str(currentSleepTime)) withDeepSleep = True ledPatterns.LongSleep() lora.nvram_save() time.sleep(0.5) py.setup_sleep(currentSleepTime) py.go_to_sleep() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) time.sleep(1) machine.reset() def downloadUpdateFromWeb(version = ""): writeLogfile("downloadUpdateFromWeb()") try: ledPatterns.ResetLED() if debugON: print('--------------------------------------------') if debugON: print('Connecting to wifi network to check for updates for version: ' + version) Pin('P12', mode=Pin.OUT)(True) wlan = WLAN(mode=WLAN.STA, antenna=WLAN.EXT_ANT, power_save=False) nets = wlan.scan() for net in nets: if debugON: print(' checking wifi network:' + net.ssid) if(net.ssid == updateSSID): if debugON: print(' correct update network found!') wlan.connect(net.ssid, auth=(net.sec, updatePWD), timeout=5000) while not wlan.isconnected(): machine.idle() # save power while waiting if debugON: print(' WLAN connection succeeded!') if debugON: print(' checking update files..') contentType = MicroWebCli.FileRequest('http://downloads.smartmakers.de/beaconscanner/9e35cf44-774a-421c-93d7-a8baab2a6752/main'+ version + '.txt', '/flash/main.py') if debugON: print(' content type: ', contentType) if debugON: print(' File of content type "%s" was saved to "%s"' % (contentType, '/flash/main.py')) break if debugON: print(' disconnecting from WLAN..') wlan.deinit() if debugON: print(' done, going to reboot') time.sleep(2) machine.reset() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) writeErrorFile(e) time.sleep(1) machine.reset() def writeLogfile(message): try: if withSDlogging: with open('/sd/logfile.txt','a') as f: f.write(message + "\r\n") f.close() except Exception as e: if debugON: print(' ERROR: ', str(e)) def writeErrorFile(exept): try: if withSDlogging: with open('/sd/logfile.txt','a') as f: sys.print_exception(exept, f) f.close() time.sleep(1) except Exception as e: if debugON: print(' ERROR: ', str(e)) # Main Program # ------------------------------------------------------------------------------------------------------- pycom.heartbeat(False) time.sleep(0.5) pycom.rgbled(0x000500) time.sleep(0.5) pycom.rgbled(0x050000) time.sleep(0.5) pycom.rgbled(0x000005) time.sleep(0.5) try: pycom.rgbled(0x000000) sendConfigInCurrentIteration = False rejoinLoraInCurrentIteration = False if withSDlogging: try: sd = machine.SD() os.mount(sd, '/sd') except Exception as e: print("ERROR: SD logging activated but unable to mount SD card : " + str(e)) withSDlogging = False if withSDlogging: with open('/sd/logfile.txt','a') as f: f.write('-------------------------START Lopy----------------------' + "\r\n") f.write('ResetCause : ' + str(machine.reset_cause()) + "\r\n") f.write('WakeupCause : ' + str(py.get_wake_reason()) + "\r\n") f.write('---------------------------------------------------------' + "\r\n") f.close() # os.uname() # get boards unique devEUI lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868 , device_class=LoRa.CLASS_A) devEUI = str(ubinascii.hexlify(lora.mac().decode('ascii')), 'utf-8') print("Pycom booted. Lets go! DevEUI: ", devEUI) writeLogfile("Pycom booted. Lets go! DevEUI: " + str(devEUI)) while True: # loop for fast-scanning-mode if we dont use deepsleep if debugON: print("==================================================================================") if debugON: print('ResetCause : ', machine.reset_cause()) print('WakeupCause : ', py.get_wake_reason()) if(int(machine.reset_cause()) == 0): # machine.PWRON_RESET if(int(py.get_wake_reason()) == 4): print("..woken up from deepsleep") if(int(py.get_wake_reason()) == 2): # machine.RTC_WAKE print("..woken up because of user button press") rejoinLoraInCurrentIteration = True if(int(py.get_wake_reason()) == 0): # machine.PWRON_WAKE print("..woken up after power loss") sendConfigInCurrentIteration = True if(int(machine.reset_cause()) == 2): # machine.WDT_RESET sendConfigInCurrentIteration = True if(int(py.get_wake_reason()) == 2): print("..restart after update from VSCode") if(int(py.get_wake_reason()) == 172): print("..restart after update from VSCode") if(withDeepSleep): # restore last LoRa status (join status, keys, etc.) from NVRam lora.nvram_restore() time.sleep(0.5) if(not lora.has_joined()): if debugON: print('Lora not joined according to NVRam restore !') rejoinLoraInCurrentIteration = True else: if debugON: print('Lora is still joined according to NVRam restore.') if(rejoinLoraInCurrentIteration): joinLoRa() #scan for bluetooth advertisements data_list=bluetoothScan() #filter bluetooth advertisements based on UUID filtered_beacons = bluetoothDataFilter(data_list) #assemble LoRa payload loraPayload = assembleLoraPayload(filtered_beacons) #send LoRa payload sendBLEPayload(loraPayload) checkUplinkCounter() # check if statistic uplink is needed if(sendConfigInCurrentIteration): # final check before deepsleep time.sleep(5) sendConfigPayload() gotoSleep() except Exception as e: ledPatterns.ErrorPattern() print("ERROR: ", str(e)) writeErrorFile(e) time.sleep(10) machine.reset()