public interface Collector<T, A, R> {
* A function that creates and returns a new mutable result container.
* @return a function which returns a new, mutable result container
* 容器提供者
Supplier<A> supplier();
* A function that folds a value into a mutable result container.
* @return a function which folds a value into a mutable result container
* 累加操作
BiConsumer<A, T> accumulator();
* A function that accepts two partial results and merges them. The
* combiner function may fold state from one argument into the other and
* return that, or may return a new result container.
* @return a function which combines two partial results into a combined
* result
* 并发的情况将每个线程的中间容器A合并
BinaryOperator<A> combiner();
* Perform the final transformation from the intermediate accumulation type
* {@code A} to the final result type {@code R}.
* <p>If the characteristic {@code IDENTITY_TRANSFORM} is
* set, this function may be presumed to be an identity transform with an
* unchecked cast from {@code A} to {@code R}.
* @return a function which transforms the intermediate result to the final
* result
* 终止操作
Function<A, R> finisher();
* Returns a {@code Set} of {@code Collector.Characteristics} indicating
* the characteristics of this Collector. This set should be immutable.
* @return an immutable set of collector characteristics
* 收集器特性
Set<Characteristics> characteristics();
* Returns a new {@code Collector} described by the given {@code supplier},
* {@code accumulator}, and {@code combiner} functions. The resulting
* {@code Collector} has the {@code Collector.Characteristics.IDENTITY_FINISH}
* characteristic.
* @param supplier The supplier function for the new collector
* @param accumulator The accumulator function for the new collector
* @param combiner The combiner function for the new collector
* @param characteristics The collector characteristics for the new
* collector
* @param <T> The type of input elements for the new collector
* @param <R> The type of intermediate accumulation result, and final result,
* for the new collector
* @throws NullPointerException if any argument is null
* @return the new {@code Collector}
public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,
BiConsumer<R, T> accumulator,
BinaryOperator<R> combiner,
Characteristics... characteristics) {
Set<Characteristics> cs = (characteristics.length == 0)
? Collectors.CH_ID
: Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,
return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
* Returns a new {@code Collector} described by the given {@code supplier},
* {@code accumulator}, {@code combiner}, and {@code finisher} functions.
* @param supplier The supplier function for the new collector
* @param accumulator The accumulator function for the new collector
* @param combiner The combiner function for the new collector
* @param finisher The finisher function for the new collector
* @param characteristics The collector characteristics for the new
* collector
* @param <T> The type of input elements for the new collector
* @param <A> The intermediate accumulation type of the new collector
* @param <R> The final result type of the new collector
* @throws NullPointerException if any argument is null
* @return the new {@code Collector}
public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Function<A, R> finisher,
Characteristics... characteristics) {
Set<Characteristics> cs = Collectors.CH_NOID;
if (characteristics.length > 0) {
cs = EnumSet.noneOf(Characteristics.class);
Collections.addAll(cs, characteristics);
cs = Collections.unmodifiableSet(cs);
return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs);
* Characteristics indicating properties of a {@code Collector}, which can
* be used to optimize reduction implementations.
enum Characteristics {
* Indicates that this collector is <em>concurrent</em>, meaning that
* the result container can support the accumulator function being
* called concurrently with the same result container from multiple
* threads.
* <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
* then it should only be evaluated concurrently if applied to an
* unordered data source.
* Indicates that the collection operation does not commit to preserving
* the encounter order of input elements. (This might be true if the
* result container has no intrinsic order, such as a {@link Set}.)
* Indicates that the finisher function is the identity function and
* can be elided. If set, it must be the case that an unchecked cast
* from A to R will succeed.
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
A {@code Collector} is specified by four functions that work together to
* accumulate entries into a mutable result container, and optionally perform
* a final transform on the result. They are:
*- creation of a new result container ({@link #supplier()})
- *
- incorporating a new data element into a result container ({@link #accumulator()})
- *
- combining two result containers into one ({@link #combiner()})
- *
- performing an optional final transform on the container ({@link #finisher()})
- *
supplier:是一个容器提供者,提供容器A,比如:List list = new ArrayList();
combiner:用于在多线程并发的情况下,每个线程都有一个supplier和,如果有N个线程那么就有N个supplier提供的容器A,执行的是类似List.addAll(listB)这样的操作,只有在characteristics没有被设置成CONCURRENT并且是并发的情况下 才会被调用。ps:characteristics被设置成CONCURRENT时,整个收集器只有一个容器,而不是每个线程都有一个容器,此时combiner()方法不会被调用,这种情况会出现java.util.ConcurrentModificationException异常,此时需要使用线程安全的容器作为supplier返回的对象。
* Characteristics indicating properties of a {@code Collector}, which can
* be used to optimize reduction implementations.
enum Characteristics {
* Indicates that this collector is <em>concurrent</em>, meaning that
* the result container can support the accumulator function being
* called concurrently with the same result container from multiple
* threads.
*如果一个收集器被标记为concurrent特性,那么accumulator 方法可以被多线程并发的的调用,并且只使用一个容器A
* <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
* then it should only be evaluated concurrently if applied to an
* unordered data source.
* 如果收集器被标记为concurrent,但是要操作的集合失败有序的,那么最终得到的结果不能保证原来的顺序
* Indicates that the collection operation does not commit to preserving
* the encounter order of input elements. (This might be true if the
* result container has no intrinsic order, such as a {@link Set}.)
* 适用于无序的集合
* Indicates that the finisher function is the identity function and
* can be elided. If set, it must be the case that an unchecked cast
* from A to R will succeed.
* 如果收集器特性被设置IDENTITY_FINISH,那么会强制将中间容器A类型转换为结果类型R
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
package com.ceaser.jdk8lambda.stream2;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import static java.util.stream.Collector.Characteristics.CONCURRENT;
import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
import static java.util.stream.Collector.Characteristics.UNORDERED;
* Created by CeaserWang on 2017/2/27.
* 此收集器的作用是将Set集合转换为Map
public class MyCollectorA<T> implements Collector<T,Set<T>,Map<T,T>> {
public Supplier<Set<T>> supplier() {
System.out.println("supplier invoked...");
return HashSet::new;//实例化一个存放中间结果的集合Set
public BiConsumer<Set<T>,T> accumulator() {
System.out.println("accumulator invoked...");
return (item1,item2) -> {
* * A a1 = supplier.get();
* accumulator.accept(a1, t1);
* accumulator.accept(a1, t2);
* R r1 = finisher.apply(a1); // result without splitting
* A a2 = supplier.get();
* accumulator.accept(a2, t1);
* A a3 = supplier.get();
* accumulator.accept(a3, t2);
* R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
// System.out.println("current thread : "+item1+" , "+Thread.currentThread().getName());
public BinaryOperator<Set<T>> combiner() {
System.out.println("combiner invoked...");
return (item1,item2) -> {
return item1;
public Function<Set<T>,Map<T,T>> finisher() {
System.out.println("finisher invoked...");
return (item1) ->{
Map<T,T> rm = new HashMap<T,T>();
item1.stream(). forEach( (bean) -> rm.put(bean,bean) );//将Set集合的每个元素加入到新的map之中
return rm;
public Set<Characteristics> characteristics() {
System.out.println("characteristics invoked...");
return Collections.unmodifiableSet(EnumSet.of(UNORDERED,CONCURRENT));//支持并发操作,并且是不能保证原始集合的顺序。
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
* Created by Ceaser Wang on 2017/2/27.
public class MyCollectorATest {
public static void main(String[] args) {
List<String> list = Arrays.asList("hello","world","welcome","helloworld","helloworldA");
Set<String> set = new HashSet<>();
Map<String,String> maped = set.parallelStream().collect(new MyCollectorA<>());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
characteristics invoked...
supplier invoked...
accumulator invoked...
characteristics invoked...
finisher invoked...
- 1
- 2
- 3
- 4
- 5
- 1
System.out.println("current thread : "+item1+" , "+Thread.currentThread().getName());
- 1
Exception in thread "main" java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
at com.ceaser.jdk8lambda.stream2.MyCollectorATest.main(MyCollectorATest.java:17)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
at java.util.HashMap$KeyIterator.next(HashMap.java:1461)
at java.util.AbstractCollection.toString(AbstractCollection.java:461)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at com.ceaser.jdk8lambda.stream2.MyCollectorA.lambda$accumulator$0(MyCollectorA.java:43)
at java.util.stream.ReferencePipeline.lambda$collect$1(ReferencePipeline.java:496)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
at java.util.AbstractCollection.toString(AbstractCollection.java:461)
- 1
public String toString() {
Iterator<E> it = iterator();
if (! it.hasNext())
return "[]";
StringBuilder sb = new StringBuilder();
for (;;) {
**E e = it.next();**//这样代码是对Set集合进行遍历
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
E e = it.next();
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
else {
container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18