001/* 002 * JDrupes Builder 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.builder.core; 020 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import java.util.stream.Stream; 025 026/// A [StreamCollector] allows the user to combine several [Stream]s 027/// into one. The collected streams are only consumed when 028/// [#stream] is called. If the collector is cached, [#stream] can 029/// be invoked several times and each invocation returns a new Stream 030/// with the collected content. 031/// 032/// Note that cached collectors are implemented in a straightforward 033/// manner by putting all contents in a list and then returning a 034/// stream for the list. 035/// 036/// @param <T> the generic type 037/// 038public class StreamCollector<T> { 039 040 private List<Stream<T>> sources = new ArrayList<>(); 041 private List<T> cache; 042 private final boolean cached; 043 044 /// Instantiates a new collector. 045 /// 046 /// @param cached determines if contents is cached 047 /// 048 public StreamCollector(boolean cached) { 049 this.cached = cached; 050 } 051 052 /// Use all given streams as sources. 053 /// 054 /// @param sources the sources 055 /// @return the stream collector 056 /// 057 @SuppressWarnings({ "unchecked", "PMD.AvoidSynchronizedStatement", 058 "PMD.CloseResource" }) 059 @SafeVarargs 060 public final StreamCollector<T> add(Stream<? extends T>... sources) { 061 synchronized (this) { 062 if (this.sources == null) { 063 throw new IllegalStateException( 064 "Cannot add sources after stream() has been called."); 065 } 066 for (var src : sources) { 067 this.sources.add((Stream<T>) src); 068 } 069 } 070 return this; 071 } 072 073 /// Convenience method for adding a enumerated items. 074 /// 075 /// @param items the item 076 /// @return the stream collector 077 /// 078 @SafeVarargs 079 public final StreamCollector<T> add(T... items) { 080 add(Arrays.stream(items)); 081 return this; 082 } 083 084 /// Provide the contents from the stream(s). 085 /// 086 /// @return the stream<? extends t> 087 /// 088 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 089 public Stream<T> stream() { 090 if (cache != null) { 091 return cache.stream(); 092 } 093 synchronized (this) { 094 if (cache != null) { 095 return cache.stream(); 096 } 097 Stream<T> result; 098 if (cached) { 099 cache = sources.stream().flatMap(s -> s).toList(); 100 result = cache.stream(); 101 } else { 102 result = sources.stream().flatMap(s -> s); 103 } 104 sources = null; 105 return result; 106 } 107 } 108 109 /// Create a cached collector. 110 /// 111 /// @param <T> the generic type 112 /// @return the cached stream 113 /// 114 public static <T> StreamCollector<T> cached() { 115 return new StreamCollector<>(true); 116 } 117 118 /// Create a cached collector initially containing a single source stream. 119 /// 120 /// @param <T> the generic type 121 /// @param source the source 122 /// @return the cached stream 123 /// 124 public static <T> StreamCollector<T> cached(Stream<T> source) { 125 var result = new StreamCollector<T>(true); 126 result.add(source); 127 return result; 128 } 129 130 /// Create an un-cached collector. 131 /// 132 /// @param <T> the generic type 133 /// @return the cached stream 134 /// 135 public static <T> StreamCollector<T> uncached() { 136 return new StreamCollector<>(false); 137 } 138 139 /// Create an un-cached collector initially containing a single 140 /// source stream. 141 /// 142 /// @param <T> the generic type 143 /// @param source the source 144 /// @return the cached stream 145 /// 146 public static <T> StreamCollector<T> uncached(Stream<T> source) { 147 var result = new StreamCollector<T>(false); 148 result.add(source); 149 return result; 150 } 151}