En utilisant les techniques de la partie précédente, le code de la tache d’envoi des données à Salesforce effectuant le travail suivant :

  • Essaye de se connecter à Salesforce
  • Si cela réussit : 
    • Poser le verrou DB : bloque l’accès à la base SQ
      • Action sécurisée : Il lit tout ce qui est dans la base SQL pour être envoyé
    • Débloquer le verrou DB (cela a duré presque 1 seconde, car on écrit sur le disque)
    • S’il y a des nouvelles données à envoyer à Salesforce
      • Envoyer les informations à Salesforce. Cela peut durer plusieurs secondes : temps de réaction de Salesforce, la vitesse du réseau, etc.
      • Poser le verrou DB : bloque l’accès à la base SQ
        • Action sécurisée : Il marque ou supprime les données déjà envoyées
      • Débloquer le verrou DB (cela a duré presque 1 seconde, car on écrit sur le disque)
  • Il attend les nouvelles données (pause) produites par autres programmes.

Ressemble à

def send_data_to_sf(delay):

    # if mode shared DB : cree la connexion une fois
    if MODE_SHARED_DB_CONNECTION == True:
        permanentDBConnectionA = sqlite3.connect(CAR_DB, isolation_level=None);

    while True:
        # on demande l'acces exclusif à la DB
        threadDbLock.acquire()
        print(C_SF+"SF : Got DB lock", ENDC);

        try:
            # if mode shared DB : crée connexion à chaque fois
            if MODE_SHARED_DB_CONNECTION == True:
                conn = permanentDBConnectionA
            else :
                print(C_SF+"SF : Connect to DB", ENDC);
                conn = sqlite3.connect(CAR_DB, isolation_level=None);

            print(C_SF+"SF : Look for new Status", ENDC);
            
            # on demande un curseur sur la DB
            cursor = conn.cursor()
            # on lit les lignes qu'on n'a pas encore envoyée (  where sent != 1)
            cursor.execute("SELECT rowid, statusdate, gps_longitude , gps_latitude, gps_speed, gps_elevation, obd_speed, obd_fuellevel, obd_rpm,  gps_measure_time, obd_measure_time, gps_time FROM car_status where sent != 1 order by statusdate asc")
            rows = cursor.fetchall()
            # on referme le curseur
            cursor.close()

            # on regarde combien de mesure on a eu
            count = 0;
            for row in rows:
                count = count + 1

            # si il y a des lignes, on va les mettre dans SF
            if count>0 :
                print(C_SF+"SF : Status to transfer to SF :" , count, ENDC);

                print(C_SF+'SF : Connect to SF', ENDC)

                #on se connecte à SF
                sf = Salesforce(
  
                );

                # on cree :
                # - un tableau new_positions avec toutes les positions au format SF
                # - un tableau rowsToMark avec les lignes à marquer comme lues
                print(C_SF+"SF : Prepare new position objects", ENDC);
                new_positions = []
                last_positions = []
                rowsToMark = []
                for row in rows:
                    # conversion de format pour les dates
                    datestring = datetime.utcfromtimestamp(row[1]).strftime('%Y-%m-%dT%H:%M:%SZ')
                    gmtime = datetime.utcfromtimestamp(row[9]).strftime('%Y-%m-%dT%H:%M:%SZ')
                    omtime = datetime.utcfromtimestamp(row[10]).strftime('%Y-%m-%dT%H:%M:%SZ')
                    gtime = datetime.utcfromtimestamp(row[11]).strftime('%Y-%m-%dT%H:%M:%SZ')
                    
                    new_positions.append({
                        'Tracker_ID__c': RASPBERRY_NUMBER,
                        'Requested_On__c': datestring,
                        'GPS_Longitude__c' : row[2],
                        'GPS_Latitude__c' : row[3],
                        'GPS_Speed__c' : row[4],
                        'GPS_Elevation__c' : row[5],                
                        'OBD_Speed__c' : row[6],                
                        'OBD_Fuel_Level__c' : row[7],                
                        'OBD_RPM__c' : row[8],
                        'GPS_Measure_Time__c' : gmtime,
                        'OBD_Measure_Time__c' : omtime,
                        'GPS_Time__c' : gtime,
                        
                    })
                        
                    rowsToMark.append((1,row[0]));
                
                # on demande à SF d'inserer toutes les nouvelles lignes
                print(C_SF+'SF : Send Big Object Positions', ENDC)
                print(C_SF+'SF : All Positions are : ',new_positions, ENDC)
                createdPositions1 = sf.bulk.Car_Monitoring_Data__b.insert(new_positions)
                
                # on dit à la DB que toutes ces lignes ont éte utilisées
                print(C_SF+'SF : Update database records', ENDC)
                print(C_SF+'SF : Records to update are : ',rowsToMark, ENDC)
                cursor = conn.cursor()
                cursor.executemany('UPDATE car_status SET sent=? WHERE rowid=?', (rowsToMark) )
                cursor.close()

                # on demande à la DB de valider la sauvearde des lignes
                print(C_SF+'SF : Commit changes in DB', ENDC)
                conn.commit

                print(C_SF+'SF : Transfer to SF done', ENDC)
            else :
                print(C_SF+'SF : Nothing to transfer to SF', ENDC)

            # close DB connection si one est en mode de connection non permanente
            if MODE_SHARED_DB_CONNECTION == False:
                conn.close()
                print("SF : DB Connection closed");

        except Exception as e:
            print(C_ERROR+"SF : Error: unable to send data to SF", ENDC)
            print(C_ERROR+str(e), ENDC)

        # on libere l'accès à la DB pour qu'elle puisse etre mise à jour par les autres taches
        threadDbLock.release()
        print(C_SF+"SF : DB lock released", ENDC);

        # on attend le delai demandé
        print(C_SF+'SF : Wait', ENDC)
        time.sleep(delay)