Le Logiciel Raspberry Car – Tâche d’envoi des données à Salesforce (4/5)
En utilisant les techniques de la partie précédente, le code de la tache d’envoi des données à Salesforce effectuant le travail suivant :
- Essaye de se connecter à Salesforce
- Si cela réussit :
- Poser le verrou DB : bloque l’accès à la base SQ
- Action sécurisée : Il lit tout ce qui est dans la base SQL pour être envoyé
- Débloquer le verrou DB (cela a duré presque 1 seconde, car on écrit sur le disque)
- S’il y a des nouvelles données à envoyer à Salesforce
- Envoyer les informations à Salesforce. Cela peut durer plusieurs secondes : temps de réaction de Salesforce, la vitesse du réseau, etc.
- Poser le verrou
DB : bloque l’accès à la base SQ
- Action sécurisée : Il marque ou supprime les données déjà envoyées
- Débloquer le verrou DB (cela a duré presque 1 seconde, car on écrit sur le disque)
- Poser le verrou DB : bloque l’accès à la base SQ
- Il attend les nouvelles données (pause) produites par autres programmes.
Ressemble à
def send_data_to_sf(delay):
# if mode shared DB : cree la connexion une fois
if MODE_SHARED_DB_CONNECTION == True:
permanentDBConnectionA = sqlite3.connect(CAR_DB, isolation_level=None);
while True:
# on demande l'acces exclusif à la DB
threadDbLock.acquire()
print(C_SF+"SF : Got DB lock", ENDC);
try:
# if mode shared DB : crée connexion à chaque fois
if MODE_SHARED_DB_CONNECTION == True:
conn = permanentDBConnectionA
else :
print(C_SF+"SF : Connect to DB", ENDC);
conn = sqlite3.connect(CAR_DB, isolation_level=None);
print(C_SF+"SF : Look for new Status", ENDC);
# on demande un curseur sur la DB
cursor = conn.cursor()
# on lit les lignes qu'on n'a pas encore envoyée ( where sent != 1)
cursor.execute("SELECT rowid, statusdate, gps_longitude , gps_latitude, gps_speed, gps_elevation, obd_speed, obd_fuellevel, obd_rpm, gps_measure_time, obd_measure_time, gps_time FROM car_status where sent != 1 order by statusdate asc")
rows = cursor.fetchall()
# on referme le curseur
cursor.close()
# on regarde combien de mesure on a eu
count = 0;
for row in rows:
count = count + 1
# si il y a des lignes, on va les mettre dans SF
if count>0 :
print(C_SF+"SF : Status to transfer to SF :" , count, ENDC);
print(C_SF+'SF : Connect to SF', ENDC)
#on se connecte à SF
sf = Salesforce(
);
# on cree :
# - un tableau new_positions avec toutes les positions au format SF
# - un tableau rowsToMark avec les lignes à marquer comme lues
print(C_SF+"SF : Prepare new position objects", ENDC);
new_positions = []
last_positions = []
rowsToMark = []
for row in rows:
# conversion de format pour les dates
datestring = datetime.utcfromtimestamp(row[1]).strftime('%Y-%m-%dT%H:%M:%SZ')
gmtime = datetime.utcfromtimestamp(row[9]).strftime('%Y-%m-%dT%H:%M:%SZ')
omtime = datetime.utcfromtimestamp(row[10]).strftime('%Y-%m-%dT%H:%M:%SZ')
gtime = datetime.utcfromtimestamp(row[11]).strftime('%Y-%m-%dT%H:%M:%SZ')
new_positions.append({
'Tracker_ID__c': RASPBERRY_NUMBER,
'Requested_On__c': datestring,
'GPS_Longitude__c' : row[2],
'GPS_Latitude__c' : row[3],
'GPS_Speed__c' : row[4],
'GPS_Elevation__c' : row[5],
'OBD_Speed__c' : row[6],
'OBD_Fuel_Level__c' : row[7],
'OBD_RPM__c' : row[8],
'GPS_Measure_Time__c' : gmtime,
'OBD_Measure_Time__c' : omtime,
'GPS_Time__c' : gtime,
})
rowsToMark.append((1,row[0]));
# on demande à SF d'inserer toutes les nouvelles lignes
print(C_SF+'SF : Send Big Object Positions', ENDC)
print(C_SF+'SF : All Positions are : ',new_positions, ENDC)
createdPositions1 = sf.bulk.Car_Monitoring_Data__b.insert(new_positions)
# on dit à la DB que toutes ces lignes ont éte utilisées
print(C_SF+'SF : Update database records', ENDC)
print(C_SF+'SF : Records to update are : ',rowsToMark, ENDC)
cursor = conn.cursor()
cursor.executemany('UPDATE car_status SET sent=? WHERE rowid=?', (rowsToMark) )
cursor.close()
# on demande à la DB de valider la sauvearde des lignes
print(C_SF+'SF : Commit changes in DB', ENDC)
conn.commit
print(C_SF+'SF : Transfer to SF done', ENDC)
else :
print(C_SF+'SF : Nothing to transfer to SF', ENDC)
# close DB connection si one est en mode de connection non permanente
if MODE_SHARED_DB_CONNECTION == False:
conn.close()
print("SF : DB Connection closed");
except Exception as e:
print(C_ERROR+"SF : Error: unable to send data to SF", ENDC)
print(C_ERROR+str(e), ENDC)
# on libere l'accès à la DB pour qu'elle puisse etre mise à jour par les autres taches
threadDbLock.release()
print(C_SF+"SF : DB lock released", ENDC);
# on attend le delai demandé
print(C_SF+'SF : Wait', ENDC)
time.sleep(delay)