001/* 002 * JDrupes Builder 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.builder.core; 020 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import java.util.stream.Stream; 025 026/// A [StreamCollector] allows the user to combine several [Stream]s 027/// into one. The collected streams are terminated when 028/// [#stream] is called. If the collector is cached, [#stream] can 029/// be invoked several times and each invocation returns a new Stream 030/// with the collected content. 031/// 032/// Note that cached collectors are implemented in a straightforward 033/// manner by putting all contents in a list and then returning a 034/// stream for the list. 035/// 036/// @param <T> the generic type 037/// 038public class StreamCollector<T> { 039 040 private List<Stream<T>> sources = new ArrayList<>(); 041 private List<T> cache; 042 private final boolean cached; 043 044 /// Instantiates a new collector. 045 /// 046 /// @param cached determines if contents is cached 047 /// 048 public StreamCollector(boolean cached) { 049 this.cached = cached; 050 } 051 052 /// Use all given streams as sources. 053 /// 054 /// @param sources the sources 055 /// @return the stream collector 056 /// 057 @SuppressWarnings("unchecked") 058 @SafeVarargs 059 public final StreamCollector<T> add(Stream<? extends T>... sources) { 060 if (this.sources == null) { 061 throw new IllegalStateException( 062 "Cannot add sources after stream() has been called."); 063 } 064 this.sources.addAll((List<Stream<T>>) (Object) Arrays.asList(sources)); 065 return this; 066 } 067 068 /// Convenience method for adding a enumerated items. 069 /// 070 /// @param items the item 071 /// @return the stream collector 072 /// 073 @SafeVarargs 074 public final StreamCollector<T> add(T... items) { 075 if (sources == null) { 076 throw new IllegalStateException( 077 "Cannot add sources after stream() has been called."); 078 } 079 sources.add(Arrays.stream(items)); 080 return this; 081 } 082 083 /// Provide the contents from the stream(s). 084 /// 085 /// @return the stream<? extends t> 086 /// 087 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 088 public Stream<T> stream() { 089 synchronized (this) { 090 if (cache != null) { 091 return cache.stream(); 092 } 093 Stream<T> result; 094 if (cached) { 095 cache = sources.stream().flatMap(s -> s).toList(); 096 result = cache.stream(); 097 } else { 098 result = sources.stream().flatMap(s -> s); 099 } 100 sources = null; 101 return result; 102 } 103 } 104 105 /// Create a cached collector. 106 /// 107 /// @param <T> the generic type 108 /// @return the cached stream 109 /// 110 public static <T> StreamCollector<T> cached() { 111 return new StreamCollector<>(true); 112 } 113 114 /// Create a cached collector initially containing a single source stream. 115 /// 116 /// @param <T> the generic type 117 /// @param source the source 118 /// @return the cached stream 119 /// 120 public static <T> StreamCollector<T> cached(Stream<T> source) { 121 var result = new StreamCollector<T>(true); 122 result.add(source); 123 return result; 124 } 125 126 /// Create an un-cached collector. 127 /// 128 /// @param <T> the generic type 129 /// @return the cached stream 130 /// 131 public static <T> StreamCollector<T> uncached() { 132 return new StreamCollector<>(false); 133 } 134 135 /// Create an un-cached collector initially containing a single 136 /// source stream. 137 /// 138 /// @param <T> the generic type 139 /// @param source the source 140 /// @return the cached stream 141 /// 142 public static <T> StreamCollector<T> uncached(Stream<T> source) { 143 var result = new StreamCollector<T>(false); 144 result.add(source); 145 return result; 146 } 147}