001/*
002 * JDrupes Builder
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.builder.core;
020
021import java.util.Iterator;
022import java.util.List;
023import java.util.Spliterators;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Future;
027import java.util.function.Consumer;
028import java.util.stream.Stream;
029import java.util.stream.StreamSupport;
030import org.jdrupes.builder.api.BuildException;
031import org.jdrupes.builder.api.Resource;
032import org.jdrupes.builder.api.ResourceProvider;
033import org.jdrupes.builder.api.ResourceRequest;
034
035/// Evaluate the stream from a provider asynchronously.
036///
037/// @param <T> the provided resource type
038///
039public class FutureStream<T extends Resource> {
040
041    @SuppressWarnings("PMD.FieldNamingConventions")
042    private static final ThreadLocal<Boolean> providerInvocationAllowed
043        = ThreadLocal.withInitial(() -> false);
044    private final Future<List<T>> source;
045
046    /// Instantiates a new future resources.
047    ///
048    /// @param executor the executor
049    /// @param provider the provider
050    /// @param requested the requested
051    ///
052    public FutureStream(ExecutorService executor, ResourceProvider provider,
053            ResourceRequest<T> requested) {
054        source = executor.submit(() -> {
055            try {
056                providerInvocationAllowed.set(true);
057                return provider.provide(requested).toList();
058            } finally {
059                providerInvocationAllowed.set(false);
060            }
061        });
062    }
063
064    /// Checks if is provider invocation is allowed. Clears the
065    /// allowed flag to also detect nested invocations.
066    ///
067    /// @return true, if is provider invocation allowed
068    ///
069    public static boolean isProviderInvocationAllowed() {
070        var allowed = providerInvocationAllowed.get();
071        providerInvocationAllowed.set(false);
072        return allowed;
073    }
074
075    /// Returns the lazily evaluated stream of resources.
076    ///
077    /// @return the stream
078    ///
079    public Stream<T> stream() {
080        return StreamSupport
081            .stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, 0) {
082
083                private Iterator<T> theIterator;
084
085                private Iterator<T> iterator() {
086                    if (theIterator == null) {
087                        try {
088                            theIterator = source.get().iterator();
089                        } catch (InterruptedException | ExecutionException e) {
090                            throw new BuildException(e);
091                        }
092                    }
093                    return theIterator;
094                }
095
096                @Override
097                public void forEachRemaining(Consumer<? super T> action) {
098                    iterator().forEachRemaining(action);
099                }
100
101                @Override
102                public boolean tryAdvance(Consumer<? super T> action) {
103                    if (!iterator().hasNext()) {
104                        return false;
105                    }
106                    action.accept(iterator().next());
107                    return true;
108                }
109
110            }, false);
111    }
112
113}