什么是线程

在一个程序中,多个任务并行执行,就需要线程。例如:一个WX程序,和每一个人的聊天都是一个线程。

关于线程的执行方式如下图:

image-PMGU.png

线程的状态

  • 新建(New):

  • 可运行(Runnable):

  • 阻塞(Blocked):lock

  • 等待(Waiting):condition

  • 计时等待(Timed Waiting):sleep

  • 终止(Terminated):

线程的属性

  • 中断线程:interrupt 状态,使用 sleep 时捕获InterruptException即可,无需判断 isInterrupted 状态

  • 守护线程:setDaemon

  • 线程名称:setName

  • 未捕获异常处理器:setUncaughtExceptionHandler

  • 线程优先级:setPriority

线程的同步 ★

竞态条件

多个线程修改同一个共享资源。

修改资源的步骤:A++

1、共享资源A加载到寄存器

2、A = A + 1

3、return A

线程① 执行了上述步骤的1、2,此时线程② 执行上述步骤1,在切换到线程① 执行步骤3,返回的A的值还是A,并没有+1。

锁对象和条件对象

锁对象 ReentrantLock:在方法中加锁可以保证上述竞态条件的原子性。

条件对象 Condition:在方法中需要满足某些条件方法才可以执行,这个时候用到条件对象阻塞线程,释放锁对象,让别的线程先执行。

锁对象和条件对象使用示例
package 并发;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class 锁与条件 {
    // 共享资源
    private static int count = 20;
    // 锁
    private final static ReentrantLock lock = new ReentrantLock();
    // 条件
    private final static Condition condition = lock.newCondition();

    /**
     * 线程A:count是偶数的时候,count减1
     * 线程B:count是奇数时候,count减1
     */
    public static void main(String[] args) {
        Runnable taskA = () -> {
            lock.lock();
            try {
                while (count > 0){
                    if (count % 2 != 0) condition.await();
                    System.out.println("线程A:" + count);
                    count--;
                    condition.signalAll();
                }
            }catch (Exception e) {
                System.out.println("线程A异常:" + e.getMessage());
            }finally {
                lock.unlock();
            }
        };
        Runnable taskB = () -> {
            lock.lock();
            try {
                while (count > 0){
                    if (count % 2 == 0) condition.await();
                    System.out.println("线程B:" + count);
                    count--;
                    condition.signalAll();
                }
            }catch (Exception e) {
                System.out.println("线程B异常:" + e.getMessage());
            }finally {
                lock.unlock();
            }
        };
        new Thread(taskA).start();
        new Thread(taskB).start();
    }
}

Synchronized关键字

Java中每个对象都有自己的内部锁,当方法声明了 Synchronized 代表对象锁保护该方法,调用该方法需要获取对象的内部锁,等同于ReentrantLock和Condition只不过灵活性不高。

Synchronized 使用示例
package 并发;

public class Synchronized关键字 {
    private static int count = 20;
    private static Synchronized关键字 instance = new Synchronized关键字();
    public static void main(String[] args) {
        Runnable taskA = () -> {

            try {
                instance.m1();
            } catch (InterruptedException e) {
                System.out.println("线程A异常:" + e.getMessage());
            }

        };
        Runnable taskB = () -> {
            try {
                instance.m2();
            } catch (InterruptedException e) {
                System.out.println("线程B异常" + e.getMessage());
            }
        };
        new Thread(taskA, "线程A").start();
        new Thread(taskB, "线程B").start();
    }

    // 偶数减一
    public synchronized void m1() throws InterruptedException {
        while (count > 0) {
            if (count % 2 != 0) wait();
            System.out.println(Thread.currentThread().getName() + " count = " + count);
            count--;
            notifyAll();
        }
    }

    // 奇数减一
    public synchronized void m2() throws InterruptedException {
        while (count > 0) {
            if (count % 2 == 0) wait();
            System.out.println(Thread.currentThread().getName() + " count = " + count);
            count--;
            notifyAll();
        }
    }
}

死锁

情况1:假设有两个线程,线程A因为某些条件挂起了,同时线程B也不满足条件挂起

情况2:signalAll 改成 signal

线程局部变量

线程局部变量:为每个线程构造一个实例,ThreadLocal

为什么需要:

1、如果为一个变量专门加锁系统开销太大了,

2、如果在需要时构造对象太浪费性能了,对象频繁的创建和销毁在高并发情况下会频繁触发GC

3、例如 SimpleDateFormat 是线程不安全的,直接作为类的变量使用,会有线程安全问题

ThreadLocal 使用
package 并发;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadLocal的使用 {

    // 线程不安全,多个线程同时访问时会破坏内部数据结构
    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    // 线程安全
    private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));

    public static void main(String[] args) throws InterruptedException {
        System.out.println("========== 不使用ThreadLocal(线程不安全)==========");
        testWithoutThreadLocal();
        
        System.out.println("\n========== 使用ThreadLocal(线程安全)==========");
        testWithThreadLocal();
    }

    /**
     * 不使用ThreadLocal - 会出现错误结果或异常
     */
    private static void testWithoutThreadLocal() throws InterruptedException {
        int threadCount = 20;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final long timestamp = System.currentTimeMillis() + i * 1000;
            executor.submit(() -> {
                try {
                    Date date = new Date(timestamp);
                    String result = simpleDateFormat.format(date);
                    
                    // 验证结果是否正确
                    Date parsedDate = simpleDateFormat.parse(result);
                    if (!result.equals(simpleDateFormat.format(parsedDate))) {
                        System.err.println("❌ 数据错乱! 线程: " + Thread.currentThread().getName() + 
                                         ", 原始日期: " + date + 
                                         ", 格式化结果: " + result + 
                                         ", 解析后: " + parsedDate);
                    } else {
                        System.out.println("✅ 线程: " + Thread.currentThread().getName() + 
                                         ", 结果: " + result);
                    }
                } catch (Exception e) {
                    System.err.println("❌ 异常! 线程: " + Thread.currentThread().getName() + 
                                     ", 错误: " + e.getClass().getSimpleName() + ": " + e.getMessage());
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executor.shutdown();
    }

    /**
     * 使用ThreadLocal - 所有结果都正确
     */
    private static void testWithThreadLocal() throws InterruptedException {
        int threadCount = 20;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final long timestamp = System.currentTimeMillis() + i * 1000;
            executor.submit(() -> {
                try {
                    Date date = new Date(timestamp);
                    String result = dateFormatThreadLocal.get().format(date);
                    
                    // 验证结果是否正确
                    Date parsedDate = dateFormatThreadLocal.get().parse(result);
                    if (!result.equals(dateFormatThreadLocal.get().format(parsedDate))) {
                        System.err.println("❌ 数据错乱! 线程: " + Thread.currentThread().getName());
                    } else {
                        System.out.println("✅ 线程: " + Thread.currentThread().getName() + 
                                         ", 结果: " + result);
                    }
                } catch (Exception e) {
                    System.err.println("❌ 异常! 线程: " + Thread.currentThread().getName() + 
                                     ", 错误: " + e.getClass().getSimpleName());
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executor.shutdown();
    }
}

线程安全的集合 ★

阻塞队列:

ArrayBlockingQueue:

LinkedBlockingQueue:

PriorityBlockingQueue:

高效的映射、集和队列:

Map:

ConcurrentHashMap:

ConcurrentSkipListMap:

Set:

ConcurrentSkipListSet:

Queue:

ConcurrentLinkedQueue:

任务和线程池 ★

构建一个新的线程系统开销大,所以尽量使用线程池。

Runnable:没有参数和返回值的异步方法

Callable:有返回值的异步方法

Future:保存异步计算结果

执行器

Executors 执行器存在多个静态工厂方法,用来构造线程池

newCachedThreadPool:必要时创建线程,空闲线程保留 60 秒

newFixedThreadPool:池中包含固定数目的线程,空闲线程会一直保留

newWorkStealingPool:适合“fork-join”任务的线程池,复杂任务会分解为简单任务,空闲线程“密取”简单任务

newSingleThreadExecutor:只有一个线程的池,顺序执行提交的任务

newScheduledThreadPool:用于调度执行的固定线程池

newSingleThreadScheduledExecutor:用于调度执行的单线程池

异步计算

Future 的 get方法会阻塞线程,等待结果返回。

CompletableFuture 提供当结果可用时,利用结果调用回调。