Java 常见队列的使用与缓冲实现

发布时间:2025-12-09 15:54:28 浏览次数:3

常见队列介绍与缓冲实现

  • 一 常见队列
  • 二 常见队列的方法
    • 1.ArrayBlockingQueue
    • 2.LinkedBlockingQueue
    • 3.LinkedBlockingDeque
    • 4.ConcurrentLinkedQueue
    • 5.SynchronousQueue
    • 6.LinkedTransferQueue
      • 1.定义缓冲队列类
      • 2.定义数据对象
      • 3.测试代码

一 常见队列

队列说明
ArrayBlockingQueue有界
LinkedBlockingQueue有/无界
LinkedBlockingDeque无界
ConcurrentLinkedQueue无界
SynchronousQueue无界
LinkedTransferQueue无界
DelayQueue延迟

二 常见队列的方法

1.ArrayBlockingQueue

public static void arrayBlockingQueue() throws InterruptedException {/*** 有界阻塞队列,初始化必须指定大小*/ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/queue.put(1);/*** 非阻塞插入,返回插入状态 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 异常插入,无空闲位置则抛出异常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}

2.LinkedBlockingQueue

public static void linkedBlockingQueue() throws InterruptedException {/*** 指定长度即为有界* 不指定长度即为无界*/LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/queue.put(1);/*** 非阻塞插入,返回插入状态 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 异常插入,无空闲位置则抛出异常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}

3.LinkedBlockingDeque

public static void linkedBlockingDeque() throws InterruptedException {/*** 消费默认从头开始*/LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>(1);/*** 阻塞插入,容量不足会阻塞,直到有空闲位置*/deque.put(1);/*** 非阻塞插入,返回插入状态 true/false*/deque.offer(1);/*** 阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/deque.offer(1,1, TimeUnit.SECONDS);/*** 异常插入,无空闲位置则抛出异常*///deque.add(1);/*** 从头非阻塞插入,返回插入状态 true/false*/deque.offerFirst(1);/*** 从头阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 从尾非阻塞插入,返回插入状态 true/false*/deque.offerLast(1);/*** 从尾阻塞指定时长插入,阻塞时间内有空闲位置则可插入,返回插入状态 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 非阻塞拉取,取出并移除元素*/deque.poll();/*** 阻塞指定时长拉取,阻塞时间内有数据则直接取数,取出并移除元素*/deque.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取头元素,取出并移除元素*/deque.pollFirst();/*** 阻塞指定时长拉取头元素,阻塞时间内有数据则直接取数,取出并移除元素*/deque.pollFirst(1,TimeUnit.SECONDS);/*** 非阻塞拉取尾元素,取出并移除元素*/deque.pollLast();/*** 阻塞指定时长拉取尾元素,阻塞时间内有数据则直接取数,取出并移除元素*/deque.pollLast(1,TimeUnit.SECONDS);/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/deque.peek();/*** 阻塞拉取,直到有元素*/deque.take();/*** 阻塞拉取,直到头有元素*/deque.takeFirst();/*** 阻塞拉取,直到尾有元素*/deque.takeLast();}

4.ConcurrentLinkedQueue

public static void concurrentLinkedQueue(){/*** 无界,不支持指定大小* 无锁,通过 CAS 控制并发;可能存在读写不一致*/ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();/*** 非阻塞插入,返回插入状态 true/false*/queue.offer(1);/*** 内部调用了 offer 方法*/queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 非阻塞拉取,队首为空返回NULL,取出不移除元素*/queue.peek();}public static void synchronousQueue () throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());/*** 同步队列 不存储数据 匹配生产者和消费者线程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,没有消费线程直接返回 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有消费线程与之匹配则可插入,返回插入状态 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 异常插入,无消费线程则抛出异常*///queue.add(1);/*** 非阻塞拉取,如果有生产线程,则消费其数据*/System.out.println("First:" + queue.poll());/*** 阻塞指定时长拉取,阻塞时间内有生产线程与之匹配则直接取数,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 无效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生产线程与之匹配*/System.out.println("Third:" + queue.take());}

5.SynchronousQueue

public static void synchronousQueue () throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());/*** 同步队列 不存储数据 匹配生产者和消费者线程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,没有消费线程直接返回 true/false*/queue.offer(1);/*** 阻塞指定时长插入,阻塞时间内有消费线程与之匹配则可插入,返回插入状态 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 异常插入,无消费线程则抛出异常*///queue.add(1);/*** 非阻塞拉取,如果有生产线程,则消费其数据*/System.out.println("First:" + queue.poll());/*** 阻塞指定时长拉取,阻塞时间内有生产线程与之匹配则直接取数,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 无效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消费线程与之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生产线程与之匹配*/System.out.println("Third:" + queue.take());}

6.LinkedTransferQueue

public static void linkedTransferQueue() throws InterruptedException {/*** 无界非阻塞队列 类似 LinkedBlockingQueue + SynchronousQueue* 可以存储实体并实现生产者和消费者线程匹配*/LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();/*** 非阻塞插入:实际都是调用 xfer(e, true, ASYNC, 0L)*/queue.put(1);queue.offer(2);queue.offer(3,1,TimeUnit.SECONDS);queue.add(4);/*** 非阻塞获取,并移除元素*/System.out.println(queue.poll());/*** 阻塞指定时长获取,并移除元素*/System.out.println(queue.poll(1, TimeUnit.SECONDS));/*** 非阻塞获取,不移除元素*/System.out.println(queue.peek());/*** 阻塞获取*/System.out.println(queue.take());}

三 缓冲队列实现

队列常用语 FIFO 场景的数据处理,同时如多线程生产单线程消费、多线程生产多线程消费和单线程生产多线程消费

1.定义缓冲队列类

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.CollectionUtils;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.Semaphore;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.function.Function;/*** 任务缓冲队列 用于批量处理 或 顺序缓冲处理** @author * @date 2023-05-16 19:05* @since 1.8*/public class TaskQueue<T,R> {/*** 日志打印*/private static final Logger logger = LoggerFactory.getLogger(TaskQueue.class);/*** 任务批量大小*/private int taskSize = 300;/*** 延迟处理时间设置 默认 1 MIN*/private long handDelayTime = 1000 * 60 ;/*** 缓冲队列大小 默认 1W*/private int capacity = 10000;/*** 无界阻塞队列,用于数据/任务缓冲*/private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(capacity);/*** 待处理数据/任务集合*/private List<T> taskList = new ArrayList<>(taskSize);/*** 单线程处理*/private Semaphore handSemaphore = new Semaphore(1);/*** 是否强制处理*/private boolean isForceDeal = false;/*** 最迟处理超时时间(MS) 默认 3 * 60 * 1000*/private int lastDealTTL = 180000;/*** 上次处理时间*/private long lastDealTime = 0;/*** 阻塞队列阻塞超时时间(MS) 默认 3 * 60 * 1000*/private int queueTTL = 180000;/*** 同步信号量阻塞超时(MS) 默认 1MIN*/private int semaphoreTTL = 6000;/*** 定义消费线程池 仅通过核心线程循环消费队列数据即可*/private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));/*** 自定义处理方法*/private Function<List<T>,R> function;/*** 无参*/public TaskQueue(){}/*** 初始化队列大小* @param capacity*/public TaskQueue(int capacity){this.capacity = capacity;}/*** 初始化锁 用于线程检查*/private static final Integer INIT_LOCK = 0;/*** 消费方法* @return*/private void taskHandlerThread(){executor.execute(()->{//定义任务/数据对象T temp;//循环消费while (true){try {//取数据temp = queue.poll(queueTTL,TimeUnit.MILLISECONDS);//判断是否为 NULL 或是否超过最迟处理时间if (temp == null || isNeedDeal()){//阻塞等待后仍没有新数据则直接处理taskHandler();//阻塞直到有新数据进来if (temp == null){temp = queue.take();}}//填充到集合taskList.add(temp);// 判断是否需要发送if (taskList.size() >= taskSize){taskHandler();}} catch (InterruptedException e) {logger.error("Take Consumer InterruptedException:",e);} catch (Exception e){logger.error("Take Consumer Exception:",e);}}});}/*** 是否需要处理(针对超时情况)* 避免数据迟迟无法处理,否则最迟情况可能为 queueTTL * taskSize 时长无法处理* @return*/private boolean isNeedDeal(){return isForceDeal && System.currentTimeMillis() > lastDealTime;}/*** 数据/任务集合处理方法* Spring 管理下可用 @PreDestroy 在实例销毁前触发一次处理*/private void taskHandler(){boolean acq = false;try {//本次许可超时则等待下次触发(1MIN)acq = handSemaphore.tryAcquire(semaphoreTTL,TimeUnit.MILLISECONDS);//如果成功获取许可才发送if (acq){if (!CollectionUtils.isEmpty(taskList)){deal(taskList);}//重置时间lastDealTime = System.currentTimeMillis() + lastDealTTL;//处理完成清空集合taskList.clear();}} catch (Exception e) {logger.error("Task List Handle Exception:",e);} finally {if (acq){handSemaphore.release();}}}/*** 检查线程池是否就绪*/private boolean isHandlerReady(){//如果变量为 nullif (executor == null ){synchronized (INIT_LOCK){if (executor == null ){{executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));}}}taskHandlerThread();return false;} else {//如果活动任务数小于 1 重新添加任务int count = executor.getActiveCount();if (count < 1){synchronized (INIT_LOCK){count = executor.getActiveCount();if (count < 1){taskHandlerThread();}}return false;} else {//否则线程池健康 则返回 True 允许使用return true;}}}/*** 入队,失败则直接处理* @param t*/private boolean put(T t) {try {return queue.offer(t);} catch (Exception e) {logger.error("Offer Queue Exception:",e);return false;}}/*** 填充到阻塞队列,如果失败就立即发送* @param t*/private void handleByQueue(T t){boolean putSuccess = this.put(t);if (!putSuccess){//否则直接处理this.deal(Collections.singletonList(t));}}/*** 前置过滤,校验消费者状态* @param t*/public void taskHandlerPre(T t){// 默认使用连接池if (isHandlerReady()){this.handleByQueue(t);} else {//否则直接处理deal(Collections.singletonList(t));}}/*** 设置 Function* @param function*/public void setFunction(Function<List<T>,R> function){this.function = function;}/*** 重写处理方法* @param list*/public R deal(List<T> list) {if (null == function){return null;}return function.apply(list);}}

2.定义数据对象

public class User {private String code;private String name;private int age;public User(int age,String name){this.age = age;this.name = name;}public User(String code,String name,int age){this.code = code;this.name = name;this.age = age;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString(){return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"age\":" + age + "}";}}

3.测试代码

public void task(){/*** 初始化延迟队列* User 数据/任务类* String 返回值类型* 其他初始化参数自行修改或支持外部修改*/TaskQueue<User,String> queue = new TaskQueue<>();/*** 批量发送* 批量入库* 批量计算*/queue.setFunction(t->{t.forEach(k->{System.out.println(k);});return "";});/*** 模拟发送数据到缓冲队列*/for (int i=0;i<302;i++){queue.taskHandlerPre(new User(1,"Alycia:" + i));}}
需要做网站?需要网络推广?欢迎咨询客户经理 13272073477