I had the same need several times, and I wanted to get other thoughts on the right way to structure the solution. It is necessary to perform some operation on many elements in many threads, without immediately requiring all the elements in memory, but only those that are under calculation. As in Iterables.partition, it’s not enough, because it displays all the elements in memory in front.
Expressing this in code, I want to write BulkCalc2, which does the same as BulkCalc1, in parallel. Below is an example code that illustrates my best attempt. I am not satisfied because it is big and ugly, but it seems to fulfill my tasks of maintaining threads, which will be used until the work is done, spreading any exceptions in the calculation and having no more NumThreads BigThing instances are required in memory immediately.
I agree with the answer that meets the set goals in the most concise way, be it a way to improve my BulkCalc2 or a completely different solution.
interface BigThing {
int getId();
String getString();
}
class Calc {
double calc(BigThing bigThing) {
Random r = new Random(bigThing.getString().hashCode());
double d = 0;
for (int i = 0; i < 100000; i++) {
d += r.nextDouble();
}
return d;
}
}
class BulkCalc1 {
final Calc calc;
public BulkCalc1(Calc calc) {
this.calc = calc;
}
public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
TreeMap<Integer, Double> results = Maps.newTreeMap();
while (in.hasNext()) {
BigThing o = in.next();
results.put(o.getId(), calc.calc(o));
}
return results;
}
}
class SafeIterator<T> {
final Iterator<T> in;
SafeIterator(Iterator<T> in) {
this.in = in;
}
synchronized T nextOrNull() {
if (in.hasNext()) {
return in.next();
}
return null;
}
}
class BulkCalc2 {
final Calc calc;
final int numThreads;
public BulkCalc2(Calc calc, int numThreads) {
this.calc = calc;
this.numThreads = numThreads;
}
public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
ExecutorService e = Executors.newFixedThreadPool(numThreads);
List<Future<?>> futures = Lists.newLinkedList();
final Map<Integer, Double> results = new MapMaker().concurrencyLevel(numThreads).makeMap();
final SafeIterator<BigThing> it = new SafeIterator<BigThing>(in);
for (int i = 0; i < numThreads; i++) {
futures.add(e.submit(new Runnable() {
@Override
public void run() {
while (true) {
BigThing o = it.nextOrNull();
if (o == null) {
return;
}
results.put(o.getId(), calc.calc(o));
}
}
}));
}
e.shutdown();
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException ex) {
} catch (ExecutionException ex) {
throw Throwables.propagate(ex.getCause());
}
}
return new TreeMap<Integer, Double>(results);
}
}
source
share