博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hbase thrift 访问队列
阅读量:5360 次
发布时间:2019-06-15

本文共 4678 字,大约阅读时间需要 15 分钟。

public class CallQueue implements BlockingQueue<Runnable> {
  private static Log LOG = LogFactory.getLog(CallQueue.class);
  private final BlockingQueue<Call> underlyingQueue;
  private final ThriftMetrics metrics;
  public CallQueue(BlockingQueue<Call> underlyingQueue,
                   ThriftMetrics metrics) {
    this.underlyingQueue = underlyingQueue;
    this.metrics = metrics;
  }
  private static long now() {
    return System.nanoTime();
  }

  //访问线程

  public static class Call implements Runnable {
    final long startTime;
    final Runnable underlyingRunnable;
    Call(Runnable underlyingRunnable) {
      this.underlyingRunnable = underlyingRunnable;
      this.startTime = now();
    }
    @Override
    public void run() {
      underlyingRunnable.run();
    }
    public long timeInQueue() {
      return now() - startTime;
    }
    @Override
    public boolean equals(Object other) {
      if (other instanceof Call) {
        Call otherCall = (Call)(other);
        return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
      } else if (other instanceof Runnable) {
        return this.underlyingRunnable.equals(other);
      }
      return false;
    }
    @Override
    public int hashCode() {
      return this.underlyingRunnable.hashCode();
    }
  }
//在队列中获取默认Runnable
  @Override
  public Runnable poll() {
    Call result = underlyingQueue.poll();
    updateMetrics(result);
    return result;
  }
  private void updateMetrics(Call result) {
    if (result == null) {
      return;
    }
    metrics.incTimeInQueue(result.timeInQueue());
    metrics.setCallQueueLen(this.size());
  }
//在队列中获取Runnable
  @Override
  public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
    Call result = underlyingQueue.poll(timeout, unit);
    updateMetrics(result);
    return result;
  }
 //队列中删除runnable
  @Override
  public Runnable remove() {
    Call result = underlyingQueue.remove();
    updateMetrics(result);
    return result;
  }
  @Override
  public Runnable take() throws InterruptedException {
    Call result = underlyingQueue.take();
    updateMetrics(result);
    return result;
  }

//添加到队列中

  @Override
  public int drainTo(Collection<? super Runnable> destination) {
    return drainTo(destination, Integer.MAX_VALUE);
  }

//添加到队列中

  @Override
  public int drainTo(Collection<? super Runnable> destination,
                     int maxElements) {
    if (destination == this) {
      throw new IllegalArgumentException(
          "A BlockingQueue cannot drain to itself.");
    }
    List<Call> drained = new ArrayList<Call>();
    underlyingQueue.drainTo(drained, maxElements);
    for (Call r : drained) {
      updateMetrics(r);
    }
    destination.addAll(drained);
    int sz = drained.size();
    LOG.info("Elements drained: " + sz);
    return sz;
  }

//队列中是否能提供call

  @Override
  public boolean offer(Runnable element) {
    return underlyingQueue.offer(new Call(element));
  }
  @Override
  public boolean offer(Runnable element, long timeout, TimeUnit unit)
      throws InterruptedException {
    return underlyingQueue.offer(new Call(element), timeout, unit);
  }

 

 @Override

  public void put(Runnable element) throws InterruptedException {
    underlyingQueue.put(new Call(element));
  }
  @Override
  public boolean add(Runnable element) {
    return underlyingQueue.add(new Call(element));
  }
  @Override
  public boolean addAll(Collection<? extends Runnable> elements) {
    int added = 0;
    for (Runnable r : elements) {
      added += underlyingQueue.add(new Call(r)) ? 1 : 0;
    }
    return added != 0;
  }
  @Override
  public Runnable element() {
    return underlyingQueue.element();
  }
  @Override
  public Runnable peek() {
    return underlyingQueue.peek();
  }
//清空队列
  @Override
  public void clear() {
    underlyingQueue.clear();
  }
  @Override
  public boolean containsAll(Collection<?> elements) {
    return underlyingQueue.containsAll(elements);
  }
  @Override
  public boolean isEmpty() {
    return underlyingQueue.isEmpty();
  }
  @Override
  public Iterator<Runnable> iterator() {
    return new Iterator<Runnable>() {
      final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
      @Override
      public Runnable next() {
        return underlyingIterator.next();
      }
      @Override
      public boolean hasNext() {
        return underlyingIterator.hasNext();
      }
      @Override
      public void remove() {
        underlyingIterator.remove();
      }
    };
  }
  @Override
  public boolean removeAll(Collection<?> elements) {
    return underlyingQueue.removeAll(elements);
  }
  @Override
  public boolean retainAll(Collection<?> elements) {
    return underlyingQueue.retainAll(elements);
  }
  @Override
  public int size() {
    return underlyingQueue.size();
  }
  @Override
  public Object[] toArray() {
    return underlyingQueue.toArray();
  }
  @Override
  public <T> T[] toArray(T[] array) {
    return underlyingQueue.toArray(array);
  }
  @Override
  public boolean contains(Object element) {
    return underlyingQueue.contains(element);
  }
  @Override
  public int remainingCapacity() {
    return underlyingQueue.remainingCapacity();
  }
  @Override
  public boolean remove(Object element) {
    return underlyingQueue.remove(element);
  }
}

转载于:https://www.cnblogs.com/cl1024cl/p/6205122.html

你可能感兴趣的文章
Linux内核分析第十八章读书笔记
查看>>
软工课后作业01 P18第四题
查看>>
MyBatis 详解(一对一,一对多,多对多)
查看>>
软件架构师的工作过程
查看>>
判断设备
查看>>
搞清楚基本问题
查看>>
教你如何一步步将项目部署到Github
查看>>
关于Android圆形图片的一种优化方案(可以显示网络图片)
查看>>
android ui定义自己的dialog(项目框架搭建时就写好,之后事半功倍)
查看>>
Android应用程序请求SurfaceFlinger服务渲染Surface的过程分析
查看>>
JDK内置工具--jvisualvm
查看>>
Windows路由表详解
查看>>
Linux at命令详解
查看>>
AI学习---TensorFlow框架介绍[图+会话+张量+变量OP+API]
查看>>
Fragment中启动一个新的Activity
查看>>
.NET性能优化方面的总结
查看>>
Windows下文件夹扩展名
查看>>
今天早上6:00起来,每天晚上回来6点多已经天黑
查看>>
debian开启cgroup memory子系统
查看>>
信息收集
查看>>