001/* 002 * JDrupes Builder 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.builder.core; 020 021import com.google.common.flogger.FluentLogger; 022import static com.google.common.flogger.LazyArgs.*; 023import java.util.Iterator; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Spliterators; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.function.Consumer; 031import java.util.stream.Stream; 032import java.util.stream.StreamSupport; 033import org.jdrupes.builder.api.BuildException; 034import org.jdrupes.builder.api.Resource; 035import org.jdrupes.builder.api.ResourceProvider; 036import org.jdrupes.builder.api.ResourceRequest; 037import org.jdrupes.builder.api.StatusLine; 038 039/// Evaluate the stream from a provider asynchronously. 040/// 041/// @param <T> the provided resource type 042/// 043public class FutureStream<T extends Resource> { 044 045 private static final FluentLogger logger = FluentLogger.forEnclosingClass(); 046 @SuppressWarnings("PMD.FieldNamingConventions") 047 private static final ScopedValue<FutureStream<?>> caller 048 = ScopedValue.newInstance(); 049 /* default */@SuppressWarnings("PMD.FieldNamingConventions") 050 static final ScopedValue<StatusLine> statusLine 051 = ScopedValue.newInstance(); // <T> 052 private final FutureStream<?> initiallyCalledBy; 053 private final FutureStreamCache.Key<?> holding; 054 private final Future<List<T>> values; 055 056 /// Instantiates a new future resources. 057 /// 058 /// @param context the context 059 /// @param scopedBuildContext the scoped build context for re-binding 060 /// @param providerInvocationAllowed scoped provider invocation allowed 061 /// for re-binding 062 /// @param provider the provider 063 /// @param request the requested 064 /// 065 public FutureStream(DefaultBuildContext context, 066 ScopedValue<DefaultBuildContext> scopedBuildContext, 067 ScopedValue<AtomicBoolean> providerInvocationAllowed, 068 ResourceProvider provider, ResourceRequest<T> request) { 069 initiallyCalledBy = caller.isBound() ? caller.get() : null; 070 holding = new FutureStreamCache.Key<>(provider, request); 071 logger.atFiner().log("Evaluating %s → %s", holding.request(), 072 lazy(() -> holding.provider() + (initiallyCalledBy != null 073 ? " requested by " + initiallyCalledBy 074 : ""))); 075 logger.atFinest().log("Call chain: %s", lazy(this::callChain)); 076 boolean isAllowed = DefaultBuildContext.isProviderInvocationAllowed(); 077 values = context.executor().submit(() -> { 078 var origThreadName = Thread.currentThread().getName(); 079 try (var _ = context.executingFutureStreams().acquire(); 080 var statusLine = context.console().statusLine()) { 081 Thread.currentThread().setName( 082 provider + " ← " + request.type()); 083 statusLine.update(provider + " evaluating " + request); 084 return ScopedValue 085 .where(scopedBuildContext, context) 086 .where(providerInvocationAllowed, 087 new AtomicBoolean(isAllowed)) 088 .where(caller, this) 089 .where(FutureStream.statusLine, statusLine) 090 .call(() -> ((AbstractProvider) provider).toSpi() 091 .provide(request).toList()); 092 } finally { 093 Thread.currentThread().setName(origThreadName); 094 } 095 }); 096 } 097 098 private List<FutureStream<?>> callChain() { 099 List<FutureStream<?>> result = new LinkedList<>(); 100 FutureStream<?> cur = this; 101 do { 102 result.add(0, cur); 103 cur = cur.initiallyCalledBy; 104 } while (cur != null); 105 return result; 106 } 107 108 /// Returns the lazily evaluated stream of resources. 109 /// 110 /// @return the stream 111 /// 112 public Stream<T> stream() { 113 return StreamSupport.stream( 114 new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, 0) { 115 116 private Iterator<T> theIterator; 117 118 private Iterator<T> iterator() { 119 if (theIterator == null) { 120 try { 121 theIterator = values.get().iterator(); 122 } catch (InterruptedException | ExecutionException e) { 123 throw new BuildException().from(holding.provider()) 124 .cause(e); 125 } 126 } 127 return theIterator; 128 } 129 130 @Override 131 public void forEachRemaining(Consumer<? super T> action) { 132 iterator().forEachRemaining(action); 133 } 134 135 @Override 136 public boolean tryAdvance(Consumer<? super T> action) { 137 if (!iterator().hasNext()) { 138 return false; 139 } 140 action.accept(iterator().next()); 141 return true; 142 } 143 144 }, false); 145 } 146 147 @Override 148 public String toString() { 149 return "FutureStream [" + holding.request() + " → " 150 + holding.provider() + "]"; 151 } 152 153}