001/*
002 * JDrupes Builder
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.builder.core;
020
021import com.google.common.flogger.FluentLogger;
022import static com.google.common.flogger.LazyArgs.*;
023import java.util.Iterator;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Spliterators;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Future;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.function.Consumer;
031import java.util.stream.Stream;
032import java.util.stream.StreamSupport;
033import org.jdrupes.builder.api.BuildException;
034import org.jdrupes.builder.api.Resource;
035import org.jdrupes.builder.api.ResourceProvider;
036import org.jdrupes.builder.api.ResourceRequest;
037import org.jdrupes.builder.api.StatusLine;
038
039/// Evaluate the stream from a provider asynchronously.
040///
041/// @param <T> the provided resource type
042///
043public class FutureStream<T extends Resource> {
044
045    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
046    @SuppressWarnings("PMD.FieldNamingConventions")
047    private static final ScopedValue<FutureStream<?>> caller
048        = ScopedValue.newInstance();
049    /* default */@SuppressWarnings("PMD.FieldNamingConventions")
050    static final ScopedValue<StatusLine> statusLine
051        = ScopedValue.newInstance(); // <T>
052    private final FutureStream<?> initiallyCalledBy;
053    private final FutureStreamCache.Key<?> holding;
054    private final Future<List<T>> values;
055
056    /// Instantiates a new future resources.
057    ///
058    /// @param context the context
059    /// @param scopedBuildContext the scoped build context for re-binding
060    /// @param providerInvocationAllowed scoped provider invocation allowed
061    /// for re-binding
062    /// @param provider the provider
063    /// @param request the requested
064    ///
065    public FutureStream(DefaultBuildContext context,
066            ScopedValue<DefaultBuildContext> scopedBuildContext,
067            ScopedValue<AtomicBoolean> providerInvocationAllowed,
068            ResourceProvider provider, ResourceRequest<T> request) {
069        initiallyCalledBy = caller.isBound() ? caller.get() : null;
070        holding = new FutureStreamCache.Key<>(provider, request);
071        logger.atFiner().log("Evaluating %s → %s", holding.request(),
072            lazy(() -> holding.provider() + (initiallyCalledBy != null
073                ? " requested by " + initiallyCalledBy
074                : "")));
075        logger.atFinest().log("Call chain: %s", lazy(this::callChain));
076        boolean isAllowed = DefaultBuildContext.isProviderInvocationAllowed();
077        values = context.executor().submit(() -> {
078            var origThreadName = Thread.currentThread().getName();
079            try (var _ = context.executingFutureStreams().acquire();
080                    var statusLine = context.console().statusLine()) {
081                Thread.currentThread().setName(
082                    provider + " ← " + request.type());
083                statusLine.update(provider + " evaluating " + request);
084                return ScopedValue
085                    .where(scopedBuildContext, context)
086                    .where(providerInvocationAllowed,
087                        new AtomicBoolean(isAllowed))
088                    .where(caller, this)
089                    .where(FutureStream.statusLine, statusLine)
090                    .call(() -> ((AbstractProvider) provider).toSpi()
091                        .provide(request).toList());
092            } finally {
093                Thread.currentThread().setName(origThreadName);
094            }
095        });
096    }
097
098    private List<FutureStream<?>> callChain() {
099        List<FutureStream<?>> result = new LinkedList<>();
100        FutureStream<?> cur = this;
101        do {
102            result.add(0, cur);
103            cur = cur.initiallyCalledBy;
104        } while (cur != null);
105        return result;
106    }
107
108    /// Returns the lazily evaluated stream of resources.
109    ///
110    /// @return the stream
111    ///
112    public Stream<T> stream() {
113        return StreamSupport.stream(
114            new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, 0) {
115
116                private Iterator<T> theIterator;
117
118                private Iterator<T> iterator() {
119                    if (theIterator == null) {
120                        try {
121                            theIterator = values.get().iterator();
122                        } catch (InterruptedException | ExecutionException e) {
123                            throw new BuildException().from(holding.provider())
124                                .cause(e);
125                        }
126                    }
127                    return theIterator;
128                }
129
130                @Override
131                public void forEachRemaining(Consumer<? super T> action) {
132                    iterator().forEachRemaining(action);
133                }
134
135                @Override
136                public boolean tryAdvance(Consumer<? super T> action) {
137                    if (!iterator().hasNext()) {
138                        return false;
139                    }
140                    action.accept(iterator().next());
141                    return true;
142                }
143
144            }, false);
145    }
146
147    @Override
148    public String toString() {
149        return "FutureStream [" + holding.request() + " → "
150            + holding.provider() + "]";
151    }
152
153}