001/* 002 * JDrupes Builder 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.builder.core; 020 021import com.google.common.flogger.FluentLogger; 022import static com.google.common.flogger.LazyArgs.lazy; 023import java.util.Collection; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Future; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.stream.Collectors; 028import java.util.stream.Stream; 029import org.jdrupes.builder.api.BuildException; 030import org.jdrupes.builder.api.ConfigurationException; 031import org.jdrupes.builder.api.Resource; 032import org.jdrupes.builder.api.StatusLine; 033 034/// Evaluate the stream from a provider asynchronously. 035/// 036/// @param <T> the provided resource type 037/// 038public class FutureStream<T extends Resource> { 039 040 private static final FluentLogger logger = FluentLogger.forEnclosingClass(); 041 @SuppressWarnings("PMD.FieldNamingConventions") 042 private static final AtomicInteger futureCount = new AtomicInteger(0); 043 private final DefaultBuildContext context; 044 /* default */@SuppressWarnings("PMD.FieldNamingConventions") 045 static final ScopedValue<StatusLine> statusLine 046 = ScopedValue.newInstance(); 047 private final ProviderInvocation<?> invocation; 048 private final Future<Collection<T>> values; 049 private final int id = futureCount.getAndIncrement(); 050 051 static { 052 ScopedValueContext.add(statusLine); 053 } 054 055 /// Instantiates a new future stream of resources. 056 /// 057 /// @param invocation the invocation 058 /// 059 public FutureStream(ProviderInvocation<T> invocation) { 060 context = LauncherBase.context(); 061 this.invocation = invocation; 062 values = ScopedValueContext.submitTo(context.executor(), () -> { 063 var origThreadName = Thread.currentThread().getName(); 064 try (var _ = context.executingFutureStreams().acquire(); 065 var statusLine = context.console().statusLine()) { 066 final var provider = invocation.provider(); 067 final var request = invocation.request(); 068 Thread.currentThread().setName( 069 provider + " ← " + request.type()); 070 logger.atFiner().log( 071 "Task [%s] evaluating", Thread.currentThread().getName()); 072 // Wait for the build-project to be fully constructed 073 context.buildProject().get(); 074 statusLine.update( 075 provider + " providing " + request.toRequestedString()); 076 return context.inScopeForProviderCall() 077 .where(FutureStream.statusLine, statusLine) 078 .call(() -> ((AbstractProvider) provider).toSpi() 079 .provide(request)); 080 } finally { 081 logger.atFiner().log( 082 "Task [%s] terminated", Thread.currentThread().getName()); 083 Thread.currentThread().setName(origThreadName); 084 } 085 }); 086 } 087 088 /// Returns the lazily evaluated stream of resources. 089 /// 090 /// @return the stream 091 /// 092 public Stream<T> stream() { 093 return LazyCollectionStream.of(() -> { 094 if (!context.buildProject().isDone()) { 095 throw new ConfigurationException().from(invocation.provider()) 096 .message("Attempt to consume resource stream" 097 + " while constructing the build project."); 098 } 099 try { 100 logger.atFiner().log("%s awaiting result, request chain: %s", 101 this, lazy(() -> context.requestChain().stream() 102 .map(ProviderInvocation::toString) 103 .collect(Collectors.joining(" ≪ ")))); 104 return values.get(); 105 } catch (InterruptedException | ExecutionException e) { 106 throw new BuildException() 107 .from(invocation.provider()).cause(e); 108 } finally { 109 logger.atFiner().log("%s is done", this); 110 } 111 }); 112 } 113 114 @Override 115 public String toString() { 116 return "FutureStream#" + id + " [" + invocation.provider() + " ← " 117 + invocation.request().toRequestedString() + "]"; 118 } 119 120}