001/*
002 * JDrupes Builder
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.builder.core;
020
021import com.google.common.flogger.FluentLogger;
022import static com.google.common.flogger.LazyArgs.lazy;
023import java.util.Collection;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Future;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.stream.Collectors;
028import java.util.stream.Stream;
029import org.jdrupes.builder.api.BuildException;
030import org.jdrupes.builder.api.ConfigurationException;
031import org.jdrupes.builder.api.Resource;
032import org.jdrupes.builder.api.StatusLine;
033
034/// Evaluate the stream from a provider asynchronously.
035///
036/// @param <T> the provided resource type
037///
038public class FutureStream<T extends Resource> {
039
040    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
041    @SuppressWarnings("PMD.FieldNamingConventions")
042    private static final AtomicInteger futureCount = new AtomicInteger(0);
043    private final DefaultBuildContext context;
044    /* default */@SuppressWarnings("PMD.FieldNamingConventions")
045    static final ScopedValue<StatusLine> statusLine
046        = ScopedValue.newInstance();
047    private final ProviderInvocation<?> invocation;
048    private final Future<Collection<T>> values;
049    private final int id = futureCount.getAndIncrement();
050
051    static {
052        ScopedValueContext.add(statusLine);
053    }
054
055    /// Instantiates a new future stream of resources.
056    ///
057    /// @param invocation the invocation
058    ///
059    public FutureStream(ProviderInvocation<T> invocation) {
060        context = LauncherBase.context();
061        this.invocation = invocation;
062        values = ScopedValueContext.submitTo(context.executor(), () -> {
063            var origThreadName = Thread.currentThread().getName();
064            try (var _ = context.executingFutureStreams().acquire();
065                    var statusLine = context.console().statusLine()) {
066                final var provider = invocation.provider();
067                final var request = invocation.request();
068                Thread.currentThread().setName(
069                    provider + " ← " + request.type());
070                logger.atFiner().log(
071                    "Task [%s] evaluating", Thread.currentThread().getName());
072                // Wait for the build-project to be fully constructed
073                context.buildProject().get();
074                statusLine.update(
075                    provider + " providing " + request.toRequestedString());
076                return context.inScopeForProviderCall()
077                    .where(FutureStream.statusLine, statusLine)
078                    .call(() -> ((AbstractProvider) provider).toSpi()
079                        .provide(request));
080            } finally {
081                logger.atFiner().log(
082                    "Task [%s] terminated", Thread.currentThread().getName());
083                Thread.currentThread().setName(origThreadName);
084            }
085        });
086    }
087
088    /// Returns the lazily evaluated stream of resources.
089    ///
090    /// @return the stream
091    ///
092    public Stream<T> stream() {
093        return LazyCollectionStream.of(() -> {
094            if (!context.buildProject().isDone()) {
095                throw new ConfigurationException().from(invocation.provider())
096                    .message("Attempt to consume resource stream"
097                        + " while constructing the build project.");
098            }
099            try {
100                logger.atFiner().log("%s awaiting result, request chain: %s",
101                    this, lazy(() -> context.requestChain().stream()
102                        .map(ProviderInvocation::toString)
103                        .collect(Collectors.joining(" ≪ "))));
104                return values.get();
105            } catch (InterruptedException | ExecutionException e) {
106                throw new BuildException()
107                    .from(invocation.provider()).cause(e);
108            } finally {
109                logger.atFiner().log("%s is done", this);
110            }
111        });
112    }
113
114    @Override
115    public String toString() {
116        return "FutureStream#" + id + " [" + invocation.provider() + " ← "
117            + invocation.request().toRequestedString() + "]";
118    }
119
120}