# vscode: ctrl-shift-p > linter enabled > disable from network import LoRa from network import Bluetooth from machine import Pin from machine import SD import socket, time, ubinascii, pycom, machine, struct, math, os import pysense #from LIS2HH12 import LIS2HH12 import ledPatterns from network import WLAN from microWebCli import MicroWebCli # ------------- Parameters ------------------ firmwareVersionMajor = 1 firmwareVersionMinor = 0 firmwareVersionRev = 6 uplinkPortConfig = 2 uplinkPortPayload = 10 default_BTScanTime = 5 default_Sleeptime = 300 #uuidWhitelist={'f4ca46d839c7454b84a5d1135b026b27'} #SM / Dml uuidWhitelist={'f5317fb44ea52c723417ae4b62b0971e'} # NWBC # WLAN AP for updating firmware updateSSID = 'assetscannerap' updatePWD = '4-K2=48i1P=6' rebootFile = 'reboot.now' default_WLANAPTimeout = 600 # for debugging / testing withDeepSleep = True debugON = False # LoRa parameters confirmed = False devEUI = '' #declarations bt = Bluetooth() bt = Bluetooth(antenna=Bluetooth.EXT_ANT) py = pysense.Pysense() # LoRa join procedure # ------------------------------------------------------------------------------------------------------- def joinLoRa(): global devEUI #limits join time to 10 seconds to avoid infinite join loop t_join = time.time() + 10 devEUI = str(ubinascii.hexlify(lora.mac().decode('ascii')), 'utf-8') if debugON: print('original board DevEUI: ', devEUI) app_eui = ubinascii.unhexlify('1111111111111111') app_key = ubinascii.unhexlify('11111111111111111111111111111111') if debugON: print('starting join process..') lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), dr=0, timeout=0) while time.time() < t_join: time.sleep(1) ledPatterns.JoiningPattern() if lora.has_joined(): break if lora.has_joined(): if debugON: print('join: OK') else: if debugON: print('join: failed, going to sleep for 300sec, reset the board and try again..') time.sleep(300) machine.reset() # Scanning for BLE advertizements # ------------------------------------------------------------------------------------------------------- def bluetoothScan(): try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime if debugON: print("Scanning for BLE advertisements for seconds: ", currentScanTime) bt.start_scan(currentScanTime) x=[] ledPatterns.BLEScanningPattern() while bt.isscanning(): y=bt.get_adv() if y: x.append(y) ledPatterns.BLEScanningPattern() return x # Filter advertisements by UUID # ------------------------------------------------------------------------------------------------------- def bluetoothDataFilter(adv_list): # bledatafilter if debugON: print("Filtering Bluetooth advertisements ") major_minor_data=[] rssi_datalist=[] bleDict = {} values = [] # if adv_list: if debugON: print(" total number of scanned advertisements: " + str(len(adv_list))) for i in range(len(adv_list)): adv_data = bt.resolve_adv_data(adv_list[i].data, Bluetooth.ADV_MANUFACTURER_DATA) rssi_data = abs(adv_list[i].rssi) if adv_data: mfg_data = ubinascii.hexlify(adv_data).decode("utf8") uuid=mfg_data[8:40] if uuid in uuidWhitelist: maj_min=mfg_data[40:48] try: values = bleDict[str(maj_min)] values.append(rssi_data) except: bleDict[str(maj_min)] = [rssi_data] if debugON: print(bleDict) filtered_beacons = [] for key in bleDict: rssi_min = ubinascii.hexlify(struct.pack('B', min(bleDict.get(key)))).decode("utf8") rssi_max = ubinascii.hexlify(struct.pack('B', max(bleDict.get(key)))).decode("utf8") rssi_avg = ubinascii.hexlify(struct.pack('B', int(sum(bleDict.get(key)) / len(bleDict.get(key))))).decode("utf8") print("maj-min:" + key) print(" rssimin:" + str(int(rssi_min,16)) + "=" + str(rssi_min)) print(" rssimax:" + str(int(rssi_max,16)) + "=" + str(rssi_max)) print(" rssiavg:" + str(int(rssi_avg,16)) + "=" + str(rssi_avg)) filtered_beacons.append(str(key) + rssi_min + rssi_max + rssi_avg) if debugON: print("Total number of received Beacons : ",len(filtered_beacons)) if debugON: print("Ending Filtering operation:", str(filtered_beacons)) return filtered_beacons def assembleLoraPayload(filtered_beacons): if debugON: print("Assembling LoRa payload") received_beacons=len(filtered_beacons) #--------------convert list to lora payload -------------------------------- loraPayload=[] #12 due to limitations in the lora payload capacity if received_beacons >= 12: len_payload = 12 else: len_payload=received_beacons for a in range(len_payload): loraPayload.append(ubinascii.unhexlify(filtered_beacons[a])) return loraPayload def readBatteryVoltage(): # read Battery Voltage from Expansion board #battVolt = smart_adc.measureBattery(True) tmpVolt = py.read_battery_voltage() tmpVolt = int(tmpVolt * 100) if debugON: print("BatteryVoltage mV:" , tmpVolt) return tmpVolt def sendScannerPayload(voltage, payload): s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) if(confirmed): s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 1) else: s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 0) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5 ) # SF7 s.bind(uplinkPortPayload) s.setblocking(True) if debugON: print(' Voltage (HEX) : ', "%x" % voltage) loraPayloadBytes = struct.pack(">H", voltage) # 2 bytes from int if debugON: print(' BeaconData (HEX) : ', ubinascii.hexlify(b''.join(payload))) loraPayloadBytes += b''.join(payload) if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) rx_beacons = struct.pack('b',len(payload)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Sent Bytes :', count) s.setblocking(False) data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink / no Ack..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') joinLoRa() def sendConfigPayload(): if debugON: print('Sending firmware version and config uplink..') s = socket.socket(socket.AF_LORA, socket.SOCK_RAW) s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 1) s.setsockopt(socket.SOL_LORA, socket.SO_DR, 0 ) # SF12 s.bind(uplinkPortConfig) s.setblocking(True) try: currentScanTime = pycom.nvs_get('BTScanTime') except Exception as e: currentScanTime = None if currentScanTime==None: currentScanTime = default_BTScanTime try: currentSleepTime = pycom.nvs_get('SleepTime') except Exception as e: currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime if debugON: print(' Firmware Major (HEX) : ', "%x" % firmwareVersionMajor) if debugON: print(' Firmware Minor (HEX) : ', "%x" % firmwareVersionMinor) if debugON: print(' Firmware Rev (HEX) : ', "%x" % firmwareVersionRev) if debugON: print(' BLE Scan Time (HEX) : ', "%x" % currentScanTime) if debugON: print(' DeepSleep Time (HEX) : ', "%x" % currentSleepTime) loraPayloadBytes = struct.pack(">B", firmwareVersionMajor) # 1 byte unsigned char loraPayloadBytes += struct.pack(">B", firmwareVersionMinor) loraPayloadBytes += struct.pack(">B", firmwareVersionRev) loraPayloadBytes += struct.pack(">B", currentScanTime) loraPayloadBytes += struct.pack(">H", currentSleepTime) # 2 byte unsigned int if debugON: print(' sending: LoraPayload (HEX): ', ubinascii.hexlify(loraPayloadBytes)) #------------Send Lora Uplink----------------------------------------------- try: count = s.send(loraPayloadBytes) uplinkok = True except Exception as e: if debugON: print(' ERROR while sending LoRa uplink: ', e) uplinkok = False s.setblocking(False) if(uplinkok): if debugON: print(' Sent Bytes :', count) s.setblocking(False) data = s.recv(64) if(str(data, 'utf8') != ""): processDownlink(data) else: if debugON: print(' no Downlink / no Ack..') else: # uplink not OK, supposed LoRa not anymore joined.. if debugON: print(' Error, going to re-join..') joinLoRa() def processDownlink(data): if debugON: print(' Received Downlink:', ubinascii.hexlify(data).decode("utf-8")) if debugON: print(' ACK RSSI :', math.fabs(lora.stats()[1])) if debugON: print(' ACK SNR :', lora.stats()[2]) if debugON: print(' ACK DR :', lora.stats()[3]) if(int.from_bytes(data[0:1],'big') == 1): if debugON: print(' configuration downlink for bt scan and sleep time') if debugON: print(' Scan Time:',int.from_bytes(data[1:2],'big')) pycom.nvs_set("BTScanTime",int.from_bytes(data[1:2],'big')) if debugON: print(' Sleep Time:',int.from_bytes(data[2:4],'big')) pycom.nvs_set("SleepTime",int.from_bytes(data[2:4],'big')) sendConfigPayload() if(int.from_bytes(data[0:1],'big') == 2): if debugON: print(' action downlink received') if(int.from_bytes(data[1:2],'big') == 1): if debugON: print(' going to create WLAN AP..') pycom.nvs_set("APTimeout",int.from_bytes(data[3:5],'big')) createWLANAP() if(int.from_bytes(data[1:2],'big') == 2): if debugON: print(' going to update from web..') downloadUpdateFromWeb("{:02d}".format(int.from_bytes(data[2:3],'big'))+"{:02d}".format(int.from_bytes(data[3:4],'big'))+"{:02d}".format(int.from_bytes(data[4:5],'big'))) def gotoSleep(): global withDeepSleep try: currentSleepTime = pycom.nvs_get('SleepTime') except Exception as e: currentSleepTime = None if currentSleepTime==None: currentSleepTime = default_Sleeptime if(currentSleepTime == 0): if debugON: print('going to skip sleep and continue with fast-scanning-operation..') withDeepSleep = False time.sleep(0.5) else: if debugON: print('going to deep sleep for seconds: ' + str(currentSleepTime)) withDeepSleep = True ledPatterns.LongSleep() lora.nvram_save() time.sleep(0.5) py.setup_sleep(currentSleepTime) py.go_to_sleep() def createWLANAP(): sleepTimeout = 5 sleepTimeOver = 0 if debugON: print('..creating WLAN AP: ' + devEUI[-4:]) try: f = open('/flash/'+ rebootFile, "r") exists = True f.close() except Exception as e: exists = False time.sleep(1) if(exists): os.remove('/flash/'+ rebootFile) try: currentAPTimeout = pycom.nvs_get('APTimeout') except Exception as e: currentAPTimeout = None if currentAPTimeout==None: currentAPTimeout = default_WLANAPTimeout wlan = WLAN() Pin('P12', mode=Pin.OUT)(True) wlan.init(mode=WLAN.AP, ssid='SAP-' + devEUI[-4:], auth=(WLAN.WPA2,'assetscanner'), channel=7, antenna=WLAN.EXT_ANT,power_save=False, hidden=False) if debugON: print('WLAN AP created, watching for: ' + rebootFile + ' file for ' + str(currentAPTimeout) + 'sec') waitTime = time.time() + currentAPTimeout while time.time() < waitTime: if debugON: print(' watching for: ' + str(currentAPTimeout-sleepTimeOver) + 'sec') if(rebootFile in os.listdir('/flash')): if debugON: print(' reboot file found, perform a machine.reset() in 2 sec..') time.sleep(2) machine.reset() time.sleep(sleepTimeout) sleepTimeOver += sleepTimeout if debugON: print('..waiting time is over, close AP and reboot..') wlan.deinit() time.sleep(2) machine.reset() def downloadUpdateFromWeb(version = ""): if debugON: print('--------------------------------------------') if debugON: print('Connecting to wifi network to check for updates for version: ' + version) try: Pin('P12', mode=Pin.OUT)(True) wlan = WLAN(mode=WLAN.STA, antenna=WLAN.EXT_ANT, power_save=False) nets = wlan.scan() for net in nets: if debugON: print(' checking wifi network:' + net.ssid) if(net.ssid == updateSSID): if debugON: print(' correct update network found!') wlan.connect(net.ssid, auth=(net.sec, updatePWD), timeout=5000) while not wlan.isconnected(): machine.idle() # save power while waiting if debugON: print(' WLAN connection succeeded!') if debugON: print(' checking update files..') contentType = MicroWebCli.FileRequest('http://downloads.smartmakers.de/beaconscanner/9e35cf44-774a-421c-93d7-a8baab2a6752/main'+ version + '.txt', '/flash/main.py') if debugON: print(' content type: ', contentType) if debugON: print(' File of content type "%s" was saved to "%s"' % (contentType, '/flash/main.py')) break if debugON: print(' disconnecting from WLAN..') wlan.deinit() if debugON: print(' done, going to reboot') time.sleep(2) machine.reset() except Exception as e: if debugON: print(' ERROR: ', str(e)) # Main Program # ------------------------------------------------------------------------------------------------------- sendConfig = False pycom.heartbeat(False) time.sleep(0.5) pycom.rgbled(0x000500) time.sleep(0.5) pycom.rgbled(0x050000) time.sleep(0.5) pycom.rgbled(0x000005) while True: time.sleep(0.5) if debugON: print("==================================================================================") if debugON: print('ResetCause: ', machine.reset_cause()) if debugON: print('WakeupCause: ', py.get_wake_reason()) if(int(machine.reset_cause()) == 0): if(int(py.get_wake_reason()) == 4): if debugON: print("..woken up from deepsleep") if(int(py.get_wake_reason()) == 2): if debugON: print("..woken up because of user button press") sendConfig = True if(int(py.get_wake_reason()) == 0): if debugON: print("..woken up after power loss") sendConfig = True if(int(machine.reset_cause()) == 2): if(int(py.get_wake_reason()) == 2): if debugON: print("..restart after update from VSCode") if(withDeepSleep): lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868 , device_class=LoRa.CLASS_A) lora.nvram_restore() time.sleep(0.5) if(not lora.has_joined()): if debugON: print('Lora not joined!') sendConfig = True joinLoRa() else: if debugON: print('Lora is still joined.') if(sendConfig): sendConfigPayload() else: battVolt = readBatteryVoltage() #scan for bluetooth advertisements data_list=bluetoothScan() #filter bluetooth advertisements based on UUID filtered_beacons = bluetoothDataFilter(data_list) #Assemble LoRa payload loraPayload = assembleLoraPayload(filtered_beacons) #Transmit LoRa payload sendScannerPayload(battVolt, loraPayload) gotoSleep() #downloadUpdateFromWeb() #createWLANAP()