CustomBlockingQueue
package com.gunjan; import java.util.concurrent.BlockingQueue; public abstract class CustomBlockingQueue<E> implements BlockingQueue<E> { public BlockingQueue<E> blockingQueue; public CustomBlockingQueue(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override final public boolean offer(E e) { return false; } final public boolean customOffer(E e) { return blockingQueue.offer(e); } }
ThreadPoolBlockingQueue
package com.gunjan; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class ThreadPoolBlockingQueue<E> extends CustomBlockingQueue<E> { public ThreadPoolBlockingQueue(BlockingQueue blockingQueue) { super(blockingQueue); } @Override public E remove() { return this.blockingQueue.remove(); } @Override public E poll() { return this.blockingQueue.poll(); } @Override public E element() { return this.blockingQueue.element(); } @Override public E peek() { return this.blockingQueue.peek(); } @Override public int size() { return this.blockingQueue.size(); } @Override public boolean isEmpty() { return this.blockingQueue.isEmpty(); } @Override public Iterator<E> iterator() { return this.blockingQueue.iterator(); } @Override public Object[] toArray() { return this.blockingQueue.toArray(); } @Override public <T> T[] toArray(T[] a) { return this.blockingQueue.toArray(a); } @Override public boolean containsAll(Collection<?> c) { return this.blockingQueue.containsAll(c); } @Override public boolean addAll(Collection<? extends E> c) { return this.blockingQueue.addAll(c); } @Override public boolean removeAll(Collection<?> c) { return this.blockingQueue.removeAll(c); } @Override public boolean retainAll(Collection<?> c) { return this.blockingQueue.retainAll(c); } @Override public void clear() { this.blockingQueue.clear(); } @Override public boolean add(E e) { return this.blockingQueue.add(e); } @Override public void put(E e) throws InterruptedException { this.blockingQueue.put(e); } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return this.blockingQueue.offer(e, timeout, unit); } @Override public E take() throws InterruptedException { return this.blockingQueue.take(); } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { return this.blockingQueue.poll(timeout, unit); } @Override public int remainingCapacity() { return this.blockingQueue.remainingCapacity(); } @Override public boolean remove(Object o) { return this.blockingQueue.remove(o); } @Override public boolean contains(Object o) { return this.blockingQueue.contains(o); } @Override public int drainTo(Collection<? super E> c) { return this.blockingQueue.drainTo(c); } @Override public int drainTo(Collection<? super E> c, int maxElements) { return this.blockingQueue.drainTo(c, maxElements); } }
RejectedExecutionHandlerImpl
package com.gunjan; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { boolean inserted = ((CustomBlockingQueue) executor.getQueue()).customOffer(r); if (!inserted) { throw new RejectedExecutionException(); } } }
CustomThreadPoolExecutorTest
package com.gunjan; import java.util.concurrent.*; public class CustomThreadPoolExecutorTest { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(500); CustomBlockingQueue customLinkedBlockingQueue = new ThreadPoolBlockingQueue<Runnable>(linkedBlockingQueue); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 100, 60, TimeUnit.SECONDS, customLinkedBlockingQueue, new RejectedExecutionHandlerImpl()); for (int i = 0; i < 750; i++) { try { threadPoolExecutor.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(threadPoolExecutor); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (RejectedExecutionException e) { e.printStackTrace(); } } threadPoolExecutor.shutdown(); threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES); System.out.println(threadPoolExecutor); } }
source share