# vscode: ctrl-shift-p > linting enabled > disable from network import LoRa from network import Bluetooth from machine import Pin from machine import SD import socket, time, ubinascii, pycom, machine, struct, math, os, sys import pysense from SI7006A20 import SI7006A20 import ledPatterns from network import WLAN from microWebCli import MicroWebCli # ------------- Parameters ------------------ firmwareVersionMajor = 2 firmwareVersionMinor = 0 firmwareVersionRev = 4 default_uplinkPortPayload = 100 default_uplinkPortConfig = 10 default_uplinkPortStatistic = 20 default_BTScanTime = 10 default_Sleeptime = 300 default_StatisticUplinkFactor = 10 #uuidWhitelist={'f4ca46d839c7454b84a5d1135b026b27'} # SM / Dml #uuidWhitelist={'f5317fb44ea52c723417ae4b62b0971e'} # NWBC #uuidWhitelist={'f2263aa92df99e533461df1c53b0046a'} # Solconia uuidWhitelist={'f5317fb44ea52c723417ae4b62b0971e','f4ca46d839c7454b84a5d1135b026b27','f2263aa92df99e533461df1c53b0046a'} # all known uuids # WLAN AP for updating firmware updateSSID = "Smartmakers" updatePWD = "#ImYS4Er$2" # for debugging / testing withDeepSleep = True debugON = True # LoRa parameters confirmed = False devEUI = '' #declarations bt = Bluetooth() bt = Bluetooth(antenna=Bluetooth.EXT_ANT) py = pysense.Pysense() si = SI7006A20(py) # try: # 1/0 # except Exception as e: # import sys # with open("error.log", "a") as f: # sys.print_exception(e, f) def joinLoRa(): #limits join time to 10 seconds to avoid infinite join loop try: ledPatterns.ResetLED() t_join = time.time() + 10 if debugON: print('original board DevEUI: ', devEUI) app_eui = ubinascii.unhexlify('1111111111111111') app_key = ubinascii.unhexlify('11111111111111111111111111111111') if debugON: print('starting join process..') lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), dr=0, timeout=0) ledPatterns.JoiningPattern() while time.time() < t_join: time.sleep(1) if lora.has_joined(): break if lora.has_joined(): if debugON: print('join: OK') sendConfigPayload() else: if debugON: print('join: failed, going to reset the board and try again..') time.sleep(0.5) gotoSleep() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) time.sleep(1) machine.reset() def bluetoothScan(): try: ledPatterns.ResetLED() try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime if debugON: print("Scanning for BLE advertisements for seconds: ", currentScanTime) bt.start_scan(currentScanTime) x=[] ledPatterns.BLEScanningPattern() while bt.isscanning(): y=bt.get_adv() if y: x.append(y) return x except Exception as e: ledPatterns.ErrorPattern() x=[] return x def bluetoothDataFilter(adv_list): # bledatafilter try: ledPatterns.ResetLED() if debugON: print("Filtering Bluetooth advertisements ") #minor_data=[] rssi_datalist=[] bleDict = {} values = [] # if adv_list: if debugON: print(" total number of scanned advertisements: " + str(len(adv_list))) for i in range(len(adv_list)): adv_data = bt.resolve_adv_data(adv_list[i].data, Bluetooth.ADV_MANUFACTURER_DATA) rssi_data = abs(adv_list[i].rssi) if adv_data: mfg_data = ubinascii.hexlify(adv_data).decode("utf8") uuid=mfg_data[8:40] if uuid in uuidWhitelist: minor=mfg_data[44:48] try: values = bleDict[str(minor)] values.append(rssi_data) except: bleDict[str(minor)] = [rssi_data] if debugON: print(bleDict) filtered_beacons = [] for key in bleDict: rssi_min = ubinascii.hexlify(struct.pack('B', min(bleDict.get(key)))).decode("utf8") rssi_max = ubinascii.hexlify(struct.pack('B', max(bleDict.get(key)))).decode("utf8") rssi_avg = ubinascii.hexlify(struct.pack('B', int(sum(bleDict.get(key)) / len(bleDict.get(key))))).decode("utf8") if debugON: print("minor:" + key) if debugON: print(" rssimin:" + str(int(rssi_min,16)) + "=" + str(rssi_min)) if debugON: print(" rssimax:" + str(int(rssi_max,16)) + "=" + str(rssi_max)) if debugON: print(" rssiavg:" + str(int(rssi_avg,16)) + "=" + str(rssi_avg)) filtered_beacons.append(str(key) + rssi_avg) if debugON: print("Total number of received Beacons : ",len(filtered_beacons)) if debugON: print("Ending Filtering operation:", str(filtered_beacons)) return filtered_beacons except Exception as e: ledPatterns.ErrorPattern() filtered_beacons = [] return filtered_beacons def assembleLoraPayload(filtered_beacons): try: ledPatterns.ResetLED() if debugON: print("Assembling LoRa payload (max. first 50 beacons)") loraPayload=[] received_beacons=len(filtered_beacons) if received_beacons >= 50: len_payload = 50 else: len_payload=received_beacons for a in range(len_payload): loraPayload.append(ubinascii.unhexlify(filtered_beacons[a])) return loraPayload except Exception as e: ledPatterns.ErrorPattern() loraPayload=[] return loraPayload def readBatteryVoltage(): try: ledPatterns.ResetLED() tmpVolt = py.read_battery_voltage() tmpVolt = int(tmpVolt * 100) if debugON: print("BatteryVoltage mV:" , tmpVolt) return tmpVolt except Exception as e: ledPatterns.ErrorPattern() return 0 def sendBLEPayload(payload): try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 0) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5 ) # SF7 s.bind(default_uplinkPortPayload) s.setblocking(True) if(payload==[]): payload=[ubinascii.unhexlify('000102030405060708090A0B0C0D0E0F')] # because Pycom does not send empty uplinks if debugON: print(' BeaconData (HEX) : ', ubinascii.hexlify(b''.join(payload))) loraPayloadBytes = b''.join(payload) if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) rx_beacons = struct.pack('b',len(payload)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Sent Bytes :', count) s.setblocking(False) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') joinLoRa() except: ledPatterns.ErrorPattern() pass def increaseUplinkCounter(): ledPatterns.ResetLED() try: uplinkCounter = pycom.nvs_get('UplinkCounter') except Exception as e: uplinkCounter = 0 # NVRAM only supports 32bit Int if(uplinkCounter >= 4294967290): uplinkCounter = 0 uplinkCounter +=1 if debugON: print(' current uplink counter :', uplinkCounter) pycom.nvs_set("UplinkCounter",uplinkCounter) def checkUplinkCounter(): #try: if debugON: print(' checking uplink counter') currentStatFactor = 0 uplinkCounter = 1 try: currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: currentStatFactor = 0 if(currentStatFactor==0): if debugON: print(" no StatisticFactor set, using default factor:" + str(default_StatisticUplinkFactor)) currentStatFactor = default_StatisticUplinkFactor try: uplinkCounter = pycom.nvs_get('UplinkCounter') except Exception as e: uplinkCounter = 1 if(uplinkCounter%currentStatFactor==0): sendStatisticPayload() #except Exception as e: # ledPatterns.ErrorPattern() # if debugON: print(' ERROR: ', str(e)) def sendStatisticPayload(): try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() if debugON: print('Sending statistic uplink..') if debugON: print('---------------------------------------------') s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 0) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5 ) # SF7 s.bind(default_uplinkPortStatistic) s.setblocking(True) temp = si.temperature() tempPayload = 10000+int(temp*100) print("Temperature: " + str(tempPayload)) humidPayload = int(si.humid_ambient(temp)*100) print("Humidity : "+ str(humidPayload)) battVolt = readBatteryVoltage() if debugON: print(' Voltage (HEX) : ', "%x" % battVolt) try: uplinkCounter = pycom.nvs_get('UplinkCounter') except: uplinkCounter = 0 print("Uplink Count: "+ str(uplinkCounter)) loraPayloadBytes = struct.pack(">I", uplinkCounter) # unsigned int 4 bytes loraPayloadBytes += struct.pack(">H", battVolt) # 2 unsigned int 2 bytes (short) loraPayloadBytes += struct.pack(">H", tempPayload) loraPayloadBytes += struct.pack(">H", humidPayload) if debugON: print(' sending: Statistic Payload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Sent Bytes :', count) s.setblocking(False) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') if debugON: print('---------------------------------------------') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') joinLoRa() except: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) def sendConfigPayload(): try: ledPatterns.ResetLED() ledPatterns.LoRaSendingPattern() if debugON: print('Sending firmware version and config uplink..') s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 1) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 0 ) # SF12 s.bind(default_uplinkPortConfig) s.setblocking(True) try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime try: currentSleepTime = pycom.nvs_get('SleepTime') except Exception as e: currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime try: currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: pycom.nvs_set("StatisticFactor",default_StatisticUplinkFactor) try: # second try if factor has really been set currentStatFactor = pycom.nvs_get('StatisticFactor') except Exception as e: currentStatFactor = 0 if debugON: print(' Firmware Major (HEX) : ', "%x" % firmwareVersionMajor) if debugON: print(' Firmware Minor (HEX) : ', "%x" % firmwareVersionMinor) if debugON: print(' Firmware Rev (HEX) : ', "%x" % firmwareVersionRev) if debugON: print(' BLE Scan Time (HEX) : ', "%x" % currentScanTime) if debugON: print(' DeepSleep Time (HEX) : ', "%x" % currentSleepTime) if debugON: print(' EnvironmentalFactor : ', "%x" % currentStatFactor) loraPayloadBytes = struct.pack(">B", firmwareVersionMajor) # 1 byte unsigned char loraPayloadBytes += struct.pack(">B", firmwareVersionMinor) loraPayloadBytes += struct.pack(">B", firmwareVersionRev) loraPayloadBytes += struct.pack(">B", currentScanTime) loraPayloadBytes += struct.pack(">H", currentSleepTime) # 2 byte unsigned int loraPayloadBytes += struct.pack(">B", currentStatFactor) if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Sent Bytes :', count) s.setblocking(False) increaseUplinkCounter() data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') joinLoRa() except: ledPatterns.ErrorPattern() pass def processDownlink(data): try: ledPatterns.ResetLED() if debugON: print(' Received Downlink:', ubinascii.hexlify(data).decode("utf-8")) if debugON: print(' ACK RSSI :', math.fabs(lora.stats()[1])) if debugON: print(' ACK SNR :', lora.stats()[2]) if debugON: print(' ACK DR :', lora.stats()[3]) if(int.from_bytes(data[0:1],'big') == 1): if debugON: print(' configuration downlink for bt scan, sleep time and EnvironmentalFactor') if debugON: print(' Scan Time:',int.from_bytes(data[1:2],'big')) pycom.nvs_set("BTScanTime",int.from_bytes(data[1:2],'big')) sleepT = int.from_bytes(data[2:4],'big') if debugON: print(' Sleep Time:',sleepT) if(sleepT < 10): # to avoid NVRAM problem that keys which are not stored yet return 0 if being read sleepT = 1 pycom.nvs_set("SleepTime",sleepT) if debugON: print(' StatisticFactor:',int.from_bytes(data[4:6],'big')) pycom.nvs_set("StatisticFactor",int.from_bytes(data[4:6],'big')) sendConfigPayload() if(int.from_bytes(data[0:1],'big') == 2): if debugON: print(' action downlink received') if(int.from_bytes(data[1:2],'big') == 2): if debugON: print(' going to update from web..') downloadUpdateFromWeb("{:02d}".format(int.from_bytes(data[2:3],'big'))+"{:02d}".format(int.from_bytes(data[3:4],'big'))+"{:02d}".format(int.from_bytes(data[4:5],'big'))) if(int.from_bytes(data[1:2],'big') == 3): if debugON: print(' going to reset uplink counter..') pycom.nvs_set("UplinkCounter",0) except: pass def gotoSleep(): global withDeepSleep try: ledPatterns.ResetLED() try: currentSleepTime = pycom.nvs_get('SleepTime') except Exception as e: currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime if(currentSleepTime == 1): if debugON: print('going to skip sleep and continue with fast-scanning-operation..') withDeepSleep = False time.sleep(0.5) else: if debugON: print('going to deep sleep for seconds: ' + str(currentSleepTime)) withDeepSleep = True ledPatterns.LongSleep() lora.nvram_save() time.sleep(0.5) py.setup_sleep(currentSleepTime) py.go_to_sleep() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) time.sleep(1) machine.reset() def downloadUpdateFromWeb(version = ""): try: ledPatterns.ResetLED() if debugON: print('--------------------------------------------') if debugON: print('Connecting to wifi network to check for updates for version: ' + version) Pin('P12', mode=Pin.OUT)(True) wlan = WLAN(mode=WLAN.STA, antenna=WLAN.EXT_ANT, power_save=False) nets = wlan.scan() for net in nets: if debugON: print(' checking wifi network:' + net.ssid) if(net.ssid == updateSSID): if debugON: print(' correct update network found!') wlan.connect(net.ssid, auth=(net.sec, updatePWD), timeout=5000) while not wlan.isconnected(): machine.idle() # save power while waiting if debugON: print(' WLAN connection succeeded!') if debugON: print(' checking update files..') contentType = MicroWebCli.FileRequest('http://downloads.smartmakers.de/beaconscanner/9e35cf44-774a-421c-93d7-a8baab2a6752/main'+ version + '.txt', '/flash/main.py') if debugON: print(' content type: ', contentType) if debugON: print(' File of content type "%s" was saved to "%s"' % (contentType, '/flash/main.py')) break if debugON: print(' disconnecting from WLAN..') wlan.deinit() if debugON: print(' done, going to reboot') time.sleep(2) machine.reset() except Exception as e: ledPatterns.ErrorPattern() if debugON: print(' ERROR: ', str(e)) time.sleep(1) machine.reset() # todo # bestätigung auf ulink counter reset # downlink auch mit environment uplink auswertren # Main Program # ------------------------------------------------------------------------------------------------------- try: pycom.heartbeat(False) pycom.rgbled(0x000200) time.sleep(1.5) pycom.rgbled(0x000000) sendConfig = False # get boards unique devEUI lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868 , device_class=LoRa.CLASS_A) devEUI = str(ubinascii.hexlify(lora.mac().decode('ascii')), 'utf-8') print("Pycom booted. Lets go! DevEUI: ", devEUI) #data_list=bluetoothScan() #bluetoothDataFilter(data_list) #sys.exit() while True: # for fast scanning mode if we dont use deepsleep time.sleep(0.5) if debugON: print("==================================================================================") if debugON: print('ResetCause : ', machine.reset_cause()) print('WakeupCause : ', py.get_wake_reason()) if(int(machine.reset_cause()) == 0): # machine.PWRON_RESET if(int(py.get_wake_reason()) == 4): print("..woken up from deepsleep") if(int(py.get_wake_reason()) == 2): # machine.RTC_WAKE print("..woken up because of user button press") joinLoRa() sendConfig = True if(int(py.get_wake_reason()) == 0): # machine.PWRON_WAKE print("..woken up after power loss") sendConfig = True if(int(machine.reset_cause()) == 2): # machine.WDT_RESET if(int(py.get_wake_reason()) == 2): print("..restart after update from VSCode") if(int(py.get_wake_reason()) == 172): print("..restart after update from VSCode") if(withDeepSleep): lora.nvram_restore() time.sleep(0.5) if(not lora.has_joined()): if debugON: print('Lora not joined!') sendConfig = False joinLoRa() else: if debugON: print('assuming Lora is still joined.') if(sendConfig): sendConfigPayload() else: #scan for bluetooth advertisements data_list=bluetoothScan() #filter bluetooth advertisements based on UUID filtered_beacons = bluetoothDataFilter(data_list) #Assemble LoRa payload loraPayload = assembleLoraPayload(filtered_beacons) #Transmit LoRa payload sendBLEPayload(loraPayload) checkUplinkCounter() gotoSleep() except Exception as e: ledPatterns.ErrorPattern() print("ERROR: ", str(e)) time.sleep(2) machine.reset()