001/* 002 * JDrupes Builder 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.builder.core; 020 021import java.util.Iterator; 022import java.util.List; 023import java.util.Spliterators; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Future; 027import java.util.function.Consumer; 028import java.util.stream.Stream; 029import java.util.stream.StreamSupport; 030import org.jdrupes.builder.api.BuildException; 031import org.jdrupes.builder.api.Resource; 032import org.jdrupes.builder.api.ResourceProvider; 033import org.jdrupes.builder.api.ResourceRequest; 034 035/// Evaluate the stream from a provider asynchronously. 036/// 037/// @param <T> the provided resource type 038/// 039public class FutureStream<T extends Resource> { 040 041 @SuppressWarnings("PMD.FieldNamingConventions") 042 private static final ThreadLocal<Boolean> providerInvocationAllowed 043 = ThreadLocal.withInitial(() -> false); 044 private final Future<List<T>> source; 045 046 /// Instantiates a new future resources. 047 /// 048 /// @param executor the executor 049 /// @param provider the provider 050 /// @param requested the requested 051 /// 052 public FutureStream(ExecutorService executor, ResourceProvider provider, 053 ResourceRequest<T> requested) { 054 source = executor.submit(() -> { 055 try { 056 providerInvocationAllowed.set(true); 057 return provider.provide(requested).toList(); 058 } finally { 059 providerInvocationAllowed.set(false); 060 } 061 }); 062 } 063 064 /// Checks if is provider invocation is allowed. Clears the 065 /// allowed flag to also detect nested invocations. 066 /// 067 /// @return true, if is provider invocation allowed 068 /// 069 public static boolean isProviderInvocationAllowed() { 070 var allowed = providerInvocationAllowed.get(); 071 providerInvocationAllowed.set(false); 072 return allowed; 073 } 074 075 /// Returns the lazily evaluated stream of resources. 076 /// 077 /// @return the stream 078 /// 079 public Stream<T> stream() { 080 return StreamSupport 081 .stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, 0) { 082 083 private Iterator<T> theIterator; 084 085 private Iterator<T> iterator() { 086 if (theIterator == null) { 087 try { 088 theIterator = source.get().iterator(); 089 } catch (InterruptedException | ExecutionException e) { 090 throw new BuildException(e); 091 } 092 } 093 return theIterator; 094 } 095 096 @Override 097 public void forEachRemaining(Consumer<? super T> action) { 098 iterator().forEachRemaining(action); 099 } 100 101 @Override 102 public boolean tryAdvance(Consumer<? super T> action) { 103 if (!iterator().hasNext()) { 104 return false; 105 } 106 action.accept(iterator().next()); 107 return true; 108 } 109 110 }, false); 111 } 112 113}