//访问线程
public static class Call implements Runnable { final long startTime; final Runnable underlyingRunnable; Call(Runnable underlyingRunnable) { this.underlyingRunnable = underlyingRunnable; this.startTime = now(); } @Override public void run() { underlyingRunnable.run(); } public long timeInQueue() { return now() - startTime; } @Override public boolean equals(Object other) { if (other instanceof Call) { Call otherCall = (Call)(other); return this.underlyingRunnable.equals(otherCall.underlyingRunnable); } else if (other instanceof Runnable) { return this.underlyingRunnable.equals(other); } return false; } @Override public int hashCode() { return this.underlyingRunnable.hashCode(); } } //在队列中获取默认Runnable @Override public Runnable poll() { Call result = underlyingQueue.poll(); updateMetrics(result); return result; } private void updateMetrics(Call result) { if (result == null) { return; } metrics.incTimeInQueue(result.timeInQueue()); metrics.setCallQueueLen(this.size()); }//在队列中获取Runnable @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { Call result = underlyingQueue.poll(timeout, unit); updateMetrics(result); return result; } //队列中删除runnable @Override public Runnable remove() { Call result = underlyingQueue.remove(); updateMetrics(result); return result; } @Override public Runnable take() throws InterruptedException { Call result = underlyingQueue.take(); updateMetrics(result); return result; }//添加到队列中
@Override public int drainTo(Collection<? super Runnable> destination) { return drainTo(destination, Integer.MAX_VALUE); }//添加到队列中
@Override public int drainTo(Collection<? super Runnable> destination, int maxElements) { if (destination == this) { throw new IllegalArgumentException( "A BlockingQueue cannot drain to itself."); } List<Call> drained = new ArrayList<Call>(); underlyingQueue.drainTo(drained, maxElements); for (Call r : drained) { updateMetrics(r); } destination.addAll(drained); int sz = drained.size(); LOG.info("Elements drained: " + sz); return sz; }//队列中是否能提供call
@Override public boolean offer(Runnable element) { return underlyingQueue.offer(new Call(element)); } @Override public boolean offer(Runnable element, long timeout, TimeUnit unit) throws InterruptedException { return underlyingQueue.offer(new Call(element), timeout, unit); }
@Override
public void put(Runnable element) throws InterruptedException { underlyingQueue.put(new Call(element)); } @Override public boolean add(Runnable element) { return underlyingQueue.add(new Call(element)); } @Override public boolean addAll(Collection<? extends Runnable> elements) { int added = 0; for (Runnable r : elements) { added += underlyingQueue.add(new Call(r)) ? 1 : 0; } return added != 0; } @Override public Runnable element() { return underlyingQueue.element(); } @Override public Runnable peek() { return underlyingQueue.peek(); } //清空队列 @Override public void clear() { underlyingQueue.clear(); } @Override public boolean containsAll(Collection<?> elements) { return underlyingQueue.containsAll(elements); } @Override public boolean isEmpty() { return underlyingQueue.isEmpty(); } @Override public Iterator<Runnable> iterator() { return new Iterator<Runnable>() { final Iterator<Call> underlyingIterator = underlyingQueue.iterator(); @Override public Runnable next() { return underlyingIterator.next(); } @Override public boolean hasNext() { return underlyingIterator.hasNext(); } @Override public void remove() { underlyingIterator.remove(); } }; } @Override public boolean removeAll(Collection<?> elements) { return underlyingQueue.removeAll(elements); } @Override public boolean retainAll(Collection<?> elements) { return underlyingQueue.retainAll(elements); } @Override public int size() { return underlyingQueue.size(); } @Override public Object[] toArray() { return underlyingQueue.toArray(); } @Override public <T> T[] toArray(T[] array) { return underlyingQueue.toArray(array); } @Override public boolean contains(Object element) { return underlyingQueue.contains(element); } @Override public int remainingCapacity() { return underlyingQueue.remainingCapacity(); } @Override public boolean remove(Object element) { return underlyingQueue.remove(element); } }