001/*
002 * JDrupes Builder
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.builder.core;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import java.util.stream.Stream;
025
026/// A [StreamCollector] allows the user to combine several [Stream]s
027/// into one. The collected streams are only consumed when
028/// [#stream] is called. If the collector is cached, [#stream] can
029/// be invoked several times and each invocation returns a new Stream
030/// with the collected content.
031///
032/// Note that cached collectors are implemented in a straightforward
033/// manner by putting all contents in a list and then returning a
034/// stream for the list.
035///
036/// @param <T> the generic type
037///
038public class StreamCollector<T> {
039
040    private List<Stream<T>> sources = new ArrayList<>();
041    private List<T> cache;
042    private final boolean cached;
043
044    /// Instantiates a new collector.
045    ///
046    /// @param cached determines if contents is cached
047    ///
048    public StreamCollector(boolean cached) {
049        this.cached = cached;
050    }
051
052    /// Use all given streams as sources.
053    ///
054    /// @param sources the sources
055    /// @return the stream collector
056    ///
057    @SuppressWarnings({ "unchecked", "PMD.AvoidSynchronizedStatement",
058        "PMD.CloseResource" })
059    @SafeVarargs
060    public final StreamCollector<T> add(Stream<? extends T>... sources) {
061        synchronized (this) {
062            if (this.sources == null) {
063                throw new IllegalStateException(
064                    "Cannot add sources after stream() has been called.");
065            }
066            for (var src : sources) {
067                this.sources.add((Stream<T>) src);
068            }
069        }
070        return this;
071    }
072
073    /// Convenience method for adding a enumerated items.
074    ///
075    /// @param items the item
076    /// @return the stream collector
077    ///
078    @SafeVarargs
079    public final StreamCollector<T> add(T... items) {
080        add(Arrays.stream(items));
081        return this;
082    }
083
084    /// Provide the contents from the stream(s).
085    ///
086    /// @return the stream<? extends t>
087    ///
088    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
089    public Stream<T> stream() {
090        if (cache != null) {
091            return cache.stream();
092        }
093        synchronized (this) {
094            if (cache != null) {
095                return cache.stream();
096            }
097            Stream<T> result;
098            if (cached) {
099                cache = sources.stream().flatMap(s -> s).toList();
100                result = cache.stream();
101            } else {
102                result = sources.stream().flatMap(s -> s);
103            }
104            sources = null;
105            return result;
106        }
107    }
108
109    /// Create a cached collector.
110    ///
111    /// @param <T> the generic type
112    /// @return the cached stream
113    ///
114    public static <T> StreamCollector<T> cached() {
115        return new StreamCollector<>(true);
116    }
117
118    /// Create a cached collector initially containing a single source stream.
119    ///
120    /// @param <T> the generic type
121    /// @param source the source
122    /// @return the cached stream
123    ///
124    public static <T> StreamCollector<T> cached(Stream<T> source) {
125        var result = new StreamCollector<T>(true);
126        result.add(source);
127        return result;
128    }
129
130    /// Create an un-cached collector.
131    ///
132    /// @param <T> the generic type
133    /// @return the cached stream
134    ///
135    public static <T> StreamCollector<T> uncached() {
136        return new StreamCollector<>(false);
137    }
138
139    /// Create an un-cached collector initially containing a single
140    /// source stream.
141    ///
142    /// @param <T> the generic type
143    /// @param source the source
144    /// @return the cached stream
145    ///
146    public static <T> StreamCollector<T> uncached(Stream<T> source) {
147        var result = new StreamCollector<T>(false);
148        result.add(source);
149        return result;
150    }
151}