001/*
002 * JDrupes Builder
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.builder.core;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import java.util.stream.Stream;
025
026/// A [StreamCollector] allows the user to combine several [Stream]s
027/// into one. The collected streams are terminated when
028/// [#stream] is called. If the collector is cached, [#stream] can
029/// be invoked several times and each invocation returns a new Stream
030/// with the collected content.
031///
032/// Note that cached collectors are implemented in a straightforward
033/// manner by putting all contents in a list and then returning a
034/// stream for the list.
035///
036/// @param <T> the generic type
037///
038public class StreamCollector<T> {
039
040    private List<Stream<T>> sources = new ArrayList<>();
041    private List<T> cache;
042    private final boolean cached;
043
044    /// Instantiates a new collector.
045    ///
046    /// @param cached determines if contents is cached
047    ///
048    public StreamCollector(boolean cached) {
049        this.cached = cached;
050    }
051
052    /// Use all given streams as sources.
053    ///
054    /// @param sources the sources
055    /// @return the stream collector
056    ///
057    @SuppressWarnings("unchecked")
058    @SafeVarargs
059    public final StreamCollector<T> add(Stream<? extends T>... sources) {
060        if (this.sources == null) {
061            throw new IllegalStateException(
062                "Cannot add sources after stream() has been called.");
063        }
064        this.sources.addAll((List<Stream<T>>) (Object) Arrays.asList(sources));
065        return this;
066    }
067
068    /// Convenience method for adding a enumerated items.
069    ///
070    /// @param items the item
071    /// @return the stream collector
072    ///
073    @SafeVarargs
074    public final StreamCollector<T> add(T... items) {
075        if (sources == null) {
076            throw new IllegalStateException(
077                "Cannot add sources after stream() has been called.");
078        }
079        sources.add(Arrays.stream(items));
080        return this;
081    }
082
083    /// Provide the contents from the stream(s).
084    ///
085    /// @return the stream<? extends t>
086    ///
087    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
088    public Stream<T> stream() {
089        synchronized (this) {
090            if (cache != null) {
091                return cache.stream();
092            }
093            Stream<T> result;
094            if (cached) {
095                cache = sources.stream().flatMap(s -> s).toList();
096                result = cache.stream();
097            } else {
098                result = sources.stream().flatMap(s -> s);
099            }
100            sources = null;
101            return result;
102        }
103    }
104
105    /// Create a cached collector.
106    ///
107    /// @param <T> the generic type
108    /// @return the cached stream
109    ///
110    public static <T> StreamCollector<T> cached() {
111        return new StreamCollector<>(true);
112    }
113
114    /// Create a cached collector initially containing a single source stream.
115    ///
116    /// @param <T> the generic type
117    /// @param source the source
118    /// @return the cached stream
119    ///
120    public static <T> StreamCollector<T> cached(Stream<T> source) {
121        var result = new StreamCollector<T>(true);
122        result.add(source);
123        return result;
124    }
125
126    /// Create an un-cached collector.
127    ///
128    /// @param <T> the generic type
129    /// @return the cached stream
130    ///
131    public static <T> StreamCollector<T> uncached() {
132        return new StreamCollector<>(false);
133    }
134
135    /// Create an un-cached collector initially containing a single
136    /// source stream.
137    ///
138    /// @param <T> the generic type
139    /// @param source the source
140    /// @return the cached stream
141    ///
142    public static <T> StreamCollector<T> uncached(Stream<T> source) {
143        var result = new StreamCollector<T>(false);
144        result.add(source);
145        return result;
146    }
147}