package it.cnr.isti.energia.control;
import java.util.Date;
import java.util.HashMap;
import java.util.Observable;
import org.richfaces.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCursor;
import com.mongodb.util.JSON;
import it.cnr.isti.energia.localization.LiveAlgorithmManager;
import it.cnr.isti.energia.util.SensedValue;
/**
* @author Antonino Crivello
* @author Giancarlo Riolo
*
* DataPusher estende Runnable e si occupa di notificare al server le
* notifiche in arrivo dal tailable cursor, cioè dai nuovi dati in
* arrivo sul db e notificati dal mongodb
*
*/
public class DataPusher extends Observable implements Runnable {
Logger log = LoggerFactory.getLogger(DataPusher.class);
private DBCursor cursor;
private boolean exit = false;
private String room;
private boolean unattended = false;
public DataPusher(String room) {
this.room = room;
}
public DataPusher(String room, boolean unattended) {
this.room = room;
this.unattended = unattended;
}
public DataPusher() {
this.room = "debug";
}
public void setCursor(DBCursor cursore) {
this.cursor = cursore;
cursore.close();
}
public void done() {
synchronized (this) {
this.exit = true;
}
}
public boolean isDone() {
synchronized (this) {
return this.exit == true;
}
}
/**
* Send messages
*/
@Override
public void run() {
log.info("DataPusher {}", room);
String message = "";
String id;
Date timestamp;
long timestampdb;
LiveAlgorithmManager manager = new LiveAlgorithmManager(room);
// +2 hour
// int gmt = 120*60*1000;
try {
while (cursor.hasNext() && !isDone()) {
id = (String) cursor.next().get("serviceId");
timestamp = (Date) cursor.curr().get("timestamp");
// timestampdb = timestamp.getTime() + gmt;
timestampdb = timestamp.getTime();
// cursor.curr().get("timestamp").toString();
BasicDBObject values = (BasicDBObject) cursor.curr().get("values");
SensedValue value = new SensedValue(id, timestampdb, values);
manager.addCurrentData(value);
HashMap results = manager.getResults();
message = id.toString() + ";" + timestampdb + ";" + values.toString() + ";" + results.get("stigma")
+ ";" + results.get("threshold") + ";";
setChanged();
log.info("Sono {} dell observable", id, this.toString());
notifyObservers(message);
}
} catch (Exception e) {
e.printStackTrace();
cursor.close();
}
cursor.close();
}
}