package it.cnr.isti.energia.control; import java.util.Date; import java.util.HashMap; import java.util.Observable; import org.richfaces.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mongodb.BasicDBObject; import com.mongodb.DBCursor; import com.mongodb.util.JSON; import it.cnr.isti.energia.localization.LiveAlgorithmManager; import it.cnr.isti.energia.util.SensedValue; /** * @author Antonino Crivello * @author Giancarlo Riolo * * DataPusher estende Runnable e si occupa di notificare al server le * notifiche in arrivo dal tailable cursor, cioè dai nuovi dati in * arrivo sul db e notificati dal mongodb * */ public class DataPusher extends Observable implements Runnable { Logger log = LoggerFactory.getLogger(DataPusher.class); private DBCursor cursor; private boolean exit = false; private String room; private boolean unattended = false; public DataPusher(String room) { this.room = room; } public DataPusher(String room, boolean unattended) { this.room = room; this.unattended = unattended; } public DataPusher() { this.room = "debug"; } public void setCursor(DBCursor cursore) { this.cursor = cursore; cursore.close(); } public void done() { synchronized (this) { this.exit = true; } } public boolean isDone() { synchronized (this) { return this.exit == true; } } /** * Send messages */ @Override public void run() { log.info("DataPusher {}", room); String message = ""; String id; Date timestamp; long timestampdb; LiveAlgorithmManager manager = new LiveAlgorithmManager(room); // +2 hour // int gmt = 120*60*1000; try { while (cursor.hasNext() && !isDone()) { id = (String) cursor.next().get("serviceId"); timestamp = (Date) cursor.curr().get("timestamp"); // timestampdb = timestamp.getTime() + gmt; timestampdb = timestamp.getTime(); // cursor.curr().get("timestamp").toString(); BasicDBObject values = (BasicDBObject) cursor.curr().get("values"); SensedValue value = new SensedValue(id, timestampdb, values); manager.addCurrentData(value); HashMap results = manager.getResults(); message = id.toString() + ";" + timestampdb + ";" + values.toString() + ";" + results.get("stigma") + ";" + results.get("threshold") + ";"; setChanged(); log.info("Sono {} dell observable", id, this.toString()); notifyObservers(message); } } catch (Exception e) { e.printStackTrace(); cursor.close(); } cursor.close(); } }