001/*
002 * JDrupes Builder
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.builder.core;
020
021import com.google.common.flogger.FluentLogger;
022import static com.google.common.flogger.LazyArgs.*;
023import java.util.Iterator;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Spliterators;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Future;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.function.Consumer;
032import java.util.stream.Stream;
033import java.util.stream.StreamSupport;
034import org.jdrupes.builder.api.BuildException;
035import org.jdrupes.builder.api.Resource;
036import org.jdrupes.builder.api.ResourceProvider;
037import org.jdrupes.builder.api.ResourceRequest;
038
039/// Evaluate the stream from a provider asynchronously.
040///
041/// @param <T> the provided resource type
042///
043public class FutureStream<T extends Resource> {
044
045    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
046    @SuppressWarnings("PMD.FieldNamingConventions")
047    private static final ScopedValue<AtomicBoolean> providerInvocationAllowed
048        = ScopedValue.newInstance();
049    @SuppressWarnings("PMD.FieldNamingConventions")
050    private static final ScopedValue<FutureStream<?>> caller
051        = ScopedValue.newInstance();
052    private final FutureStream<?> initiallyCalledBy;
053    private final FutureStreamCache.Key<?> holding;
054    private final Future<List<T>> values;
055
056    /// Instantiates a new future resources.
057    ///
058    /// @param executor the executor
059    /// @param provider the provider
060    /// @param request the requested
061    ///
062    public FutureStream(ExecutorService executor, ResourceProvider provider,
063            ResourceRequest<T> request) {
064        initiallyCalledBy = caller.isBound() ? caller.get() : null;
065        holding = new FutureStreamCache.Key<>(provider, request);
066        logger.atFiner().log("Evaluating %s → %s", holding.request(),
067            lazy(() -> holding.provider() + (initiallyCalledBy != null
068                ? " requested by " + initiallyCalledBy
069                : "")));
070        logger.atFinest().log("Call chain: %s", lazy(this::callChain));
071        values = executor.submit(() -> {
072            return ScopedValue
073                .where(providerInvocationAllowed, new AtomicBoolean(true))
074                .where(caller, this)
075                .call(() -> ((AbstractProvider) provider).toSpi()
076                    .provide(request).toList());
077        });
078    }
079
080    private List<FutureStream<?>> callChain() {
081        List<FutureStream<?>> result = new LinkedList<>();
082        FutureStream<?> cur = this;
083        do {
084            result.add(0, cur);
085            cur = cur.initiallyCalledBy;
086        } while (cur != null);
087        return result;
088    }
089
090    /// Checks if is provider invocation is allowed. Clears the
091    /// allowed flag to also detect nested invocations.
092    ///
093    /// @return true, if is provider invocation allowed
094    ///
095    public static boolean isProviderInvocationAllowed() {
096        return providerInvocationAllowed.isBound()
097            && providerInvocationAllowed.get().getAndSet(false);
098    }
099
100    /// Returns the lazily evaluated stream of resources.
101    ///
102    /// @return the stream
103    ///
104    public Stream<T> stream() {
105        return StreamSupport
106            .stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, 0) {
107
108                private Iterator<T> theIterator;
109
110                private Iterator<T> iterator() {
111                    if (theIterator == null) {
112                        try {
113                            theIterator = values.get().iterator();
114                        } catch (InterruptedException | ExecutionException e) {
115                            throw new BuildException().from(holding.provider())
116                                .cause(e);
117                        }
118                    }
119                    return theIterator;
120                }
121
122                @Override
123                public void forEachRemaining(Consumer<? super T> action) {
124                    iterator().forEachRemaining(action);
125                }
126
127                @Override
128                public boolean tryAdvance(Consumer<? super T> action) {
129                    if (!iterator().hasNext()) {
130                        return false;
131                    }
132                    action.accept(iterator().next());
133                    return true;
134                }
135
136            }, false);
137    }
138
139    @Override
140    public String toString() {
141        return "FutureStream [" + holding.request() + " → "
142            + holding.provider() + "]";
143    }
144
145}