Storm Bolt Caching

I'm trying to cache some data in a storm, but not sure if this is the right way to do this or not. In the underlying class, the employee ID and employer name are cached on the hash map. To do this, a database query was made in the Employee table to select all employees and fill in the hash map in the preparation method (is this the right place to initialize the map?).

After some logging (when using a storm topology), the topology makes several connections to the database and initializes the map several times. Of course, I want to avoid this, so I want to cache the result so that it does not go to the database every time. Please, help?

public class TestBolt extends BaseRichBolt { private static final long serialVersionUID = 2946379346389650348L; private OutputCollector collector; private Map<String, String> employeeIdToNameMap; private static final Logger LOG = Logger.getLogger(TestBolt.class); @Override public void execute(Tuple tuple) { String employeeId = tuple.getStringByField("employeeId"); String employeeName = employeeIdToNameMap.get(employeeId); collector.emit(tuple, new Values(employeeId, employeeName)); collector.ack(tuple); } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.collector = collector; try { employeeIdToNameMap = createEmployeIdToNameMap(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(/*some fields*/)); } private Map<String, String> createEmployeIdToNameMap() throws SQLException { final Map<String, String> employeeIdToNameMap = new HashMap<>(); final DatabaseManager dbm = new PostgresManager(); final String query = "select id, name from employee;"; final Connection conn = dbm.createDefaultConnection(); final ResultSet result = dbm.executeSelectQuery(conn, query); while(result.next()) { String employeId = result.getString("id"); String name = result.getString("name"); employeeIdToNameMap.put(employeId, name); } conn.close(); return employeeIdToNameMap; } } 

SOLUTION I created a synchronized card and its work is excellent for me

 private static Map<String, String> employeeIdToNameMap = Collections .synchronizedMap(new HashMap<String, String>()); 
+5
source share
1 answer

Since you have several bolt tasks, you can mark employeeIdToNameMap static and unstable. Initialize the card in preparation mode as follows:

 try { synchronized(TestBolt.class) { if (null == employeeIdToNameMap) { employeeIdToNameMap = createEmployeIdToNameMap(); } } } catch (SQLException e) { ... } 
+1
source

Source: https://habr.com/ru/post/1242462/


All Articles