001/* 002 * JDrupes Builder 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.builder.core; 020 021import com.google.common.flogger.FluentLogger; 022import static com.google.common.flogger.LazyArgs.*; 023import java.util.Iterator; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Spliterators; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Future; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.function.Consumer; 032import java.util.stream.Stream; 033import java.util.stream.StreamSupport; 034import org.jdrupes.builder.api.BuildException; 035import org.jdrupes.builder.api.Resource; 036import org.jdrupes.builder.api.ResourceProvider; 037import org.jdrupes.builder.api.ResourceRequest; 038 039/// Evaluate the stream from a provider asynchronously. 040/// 041/// @param <T> the provided resource type 042/// 043public class FutureStream<T extends Resource> { 044 045 private static final FluentLogger logger = FluentLogger.forEnclosingClass(); 046 @SuppressWarnings("PMD.FieldNamingConventions") 047 private static final ScopedValue<AtomicBoolean> providerInvocationAllowed 048 = ScopedValue.newInstance(); 049 @SuppressWarnings("PMD.FieldNamingConventions") 050 private static final ScopedValue<FutureStream<?>> caller 051 = ScopedValue.newInstance(); 052 private final FutureStream<?> initiallyCalledBy; 053 private final FutureStreamCache.Key<?> holding; 054 private final Future<List<T>> values; 055 056 /// Instantiates a new future resources. 057 /// 058 /// @param executor the executor 059 /// @param provider the provider 060 /// @param request the requested 061 /// 062 public FutureStream(ExecutorService executor, ResourceProvider provider, 063 ResourceRequest<T> request) { 064 initiallyCalledBy = caller.isBound() ? caller.get() : null; 065 holding = new FutureStreamCache.Key<>(provider, request); 066 logger.atFiner().log("Evaluating %s → %s", holding.request(), 067 lazy(() -> holding.provider() + (initiallyCalledBy != null 068 ? " requested by " + initiallyCalledBy 069 : ""))); 070 logger.atFinest().log("Call chain: %s", lazy(this::callChain)); 071 values = executor.submit(() -> { 072 return ScopedValue 073 .where(providerInvocationAllowed, new AtomicBoolean(true)) 074 .where(caller, this) 075 .call(() -> ((AbstractProvider) provider).toSpi() 076 .provide(request).toList()); 077 }); 078 } 079 080 private List<FutureStream<?>> callChain() { 081 List<FutureStream<?>> result = new LinkedList<>(); 082 FutureStream<?> cur = this; 083 do { 084 result.add(0, cur); 085 cur = cur.initiallyCalledBy; 086 } while (cur != null); 087 return result; 088 } 089 090 /// Checks if is provider invocation is allowed. Clears the 091 /// allowed flag to also detect nested invocations. 092 /// 093 /// @return true, if is provider invocation allowed 094 /// 095 public static boolean isProviderInvocationAllowed() { 096 return providerInvocationAllowed.isBound() 097 && providerInvocationAllowed.get().getAndSet(false); 098 } 099 100 /// Returns the lazily evaluated stream of resources. 101 /// 102 /// @return the stream 103 /// 104 public Stream<T> stream() { 105 return StreamSupport 106 .stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, 0) { 107 108 private Iterator<T> theIterator; 109 110 private Iterator<T> iterator() { 111 if (theIterator == null) { 112 try { 113 theIterator = values.get().iterator(); 114 } catch (InterruptedException | ExecutionException e) { 115 throw new BuildException().from(holding.provider()) 116 .cause(e); 117 } 118 } 119 return theIterator; 120 } 121 122 @Override 123 public void forEachRemaining(Consumer<? super T> action) { 124 iterator().forEachRemaining(action); 125 } 126 127 @Override 128 public boolean tryAdvance(Consumer<? super T> action) { 129 if (!iterator().hasNext()) { 130 return false; 131 } 132 action.accept(iterator().next()); 133 return true; 134 } 135 136 }, false); 137 } 138 139 @Override 140 public String toString() { 141 return "FutureStream [" + holding.request() + " → " 142 + holding.provider() + "]"; 143 } 144 145}