编写一个示例程序,展示如何使用 Fork/Join 框架进行并行计算。

示例程序:使用 Fork/Join 框架进行并行计算数组求和

以下示例展示了如何使用 Fork/Join 框架对一个大数组进行并行求和。


程序代码

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinSumExample {

    // 自定义任务类,继承 RecursiveTask
    static class ArraySumTask extends RecursiveTask<Long> {
        private final int[] array; // 待计算的数组
        private final int start; // 开始索引
        private final int end; // 结束索引
        private static final int THRESHOLD = 100; // 阈值:小于此值时不再拆分

        // 构造函数
        public ArraySumTask(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            // 如果任务小于等于阈值,直接计算
            if (end - start <= THRESHOLD) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                System.out.println(Thread.currentThread().getName() + " computed sum from " + start + " to " + end + " = " + sum);
                return sum;
            }

            // 拆分任务
            int mid = (start + end) / 2;
            ArraySumTask leftTask = new ArraySumTask(array, start, mid);
            ArraySumTask rightTask = new ArraySumTask(array, mid, end);

            // 异步执行子任务
            leftTask.fork();
            rightTask.fork();

            // 合并结果
            long leftResult = leftTask.join();
            long rightResult = rightTask.join();

            return leftResult + rightResult;
        }
    }

    public static void main(String[] args) {
        // 初始化大数组
        int[] array = new int[1000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i + 1; // 数组元素为 1 到 1000
        }

        // 创建 ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 提交任务
        ArraySumTask task = new ArraySumTask(array, 0, array.length);
        long result = forkJoinPool.invoke(task);

        // 输出结果
        System.out.println("Final sum: " + result);
    }
}

代码解析

  1. 任务拆分类:ArraySumTask
    • 继承 RecursiveTask<Long>,表示有返回值的任务。
    • 定义了一个 阈值(THRESHOLD),控制任务的最小粒度。
    • 当任务小于等于阈值时,直接计算;否则将任务分成左右两部分递归处理。
  2. Fork/Join 框架核心方法
    • fork():将子任务提交到线程池中,异步执行。
    • join():等待子任务完成,并获取其返回值。
  3. ForkJoinPool
    • ForkJoinPool 是 Fork/Join 框架的核心线程池,用于管理和调度任务。
    • 使用 invoke(task) 提交顶层任务并获取最终结果。
  4. 结果输出
    • 程序输出子任务的计算结果及最终总和,展示任务拆分与合并的过程。

示例输出

ForkJoinPool.commonPool-worker-1 computed sum from 0 to 100 = 5050
ForkJoinPool.commonPool-worker-2 computed sum from 100 to 200 = 15050
ForkJoinPool.commonPool-worker-3 computed sum from 200 to 300 = 25050
...
ForkJoinPool.commonPool-worker-1 computed sum from 900 to 1000 = 95050
Final sum: 500500

适用场景

  • 该程序使用了 Fork/Join 框架的核心功能,非常适合用于大数据集合的并行计算场景。
  • 通过设定阈值(THRESHOLD),我们可以控制任务的拆分粒度,从而优化计算性能。

性能优化建议

  1. 任务粒度控制
    • 阈值 THRESHOLD 应根据任务的规模和系统资源(如 CPU 核心数)进行调整。
    • 粒度过大会导致并行效率降低,过小会增加任务拆分的开销。
  2. 线程池配置
    • 默认使用 ForkJoinPool 的公共线程池(commonPool)。
    • 在资源充足或高性能需求场景下,可创建自定义线程池并调整线程数量。

总结

  • Fork/Join 框架非常适合处理可以递归拆分的任务。
  • 使用该框架时,应根据具体任务特点合理设置阈值和任务拆分策略,以获得最佳性能。
  • 示例程序展示了数组求和任务的并行实现,为理解 Fork/Join 框架的核心思想提供了直观示例。

发表评论

后才能评论