001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.impl; 021 022import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; 023import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; 024import jakarta.annotation.Nonnull; 025import org.springframework.messaging.MessageHandler; 026import org.springframework.messaging.support.ExecutorSubscribableChannel; 027 028import java.util.ArrayList; 029import java.util.Optional; 030import java.util.concurrent.Executor; 031import java.util.function.Supplier; 032 033import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 034 035public class LinkedBlockingChannel extends ExecutorSubscribableChannel implements IChannelProducer, IChannelReceiver { 036 037 private final String myName; 038 private final Supplier<Integer> myQueueSizeSupplier; 039 040 public LinkedBlockingChannel(String theName, Executor theExecutor, Supplier<Integer> theQueueSizeSupplier) { 041 super(theExecutor); 042 myName = theName; 043 myQueueSizeSupplier = theQueueSizeSupplier; 044 } 045 046 public int getQueueSizeForUnitTest() { 047 return defaultIfNull(myQueueSizeSupplier.get(), 0); 048 } 049 050 public void clearInterceptorsForUnitTest() { 051 setInterceptors(new ArrayList<>()); 052 } 053 054 @Override 055 public String getName() { 056 return myName; 057 } 058 059 @Override 060 public boolean hasSubscription(@Nonnull MessageHandler handler) { 061 return getSubscribers().stream() 062 .map(t -> (RetryingMessageHandlerWrapper) t) 063 .anyMatch(t -> t.getWrappedHandler() == handler); 064 } 065 066 @Override 067 public boolean subscribe(@Nonnull MessageHandler theHandler) { 068 return super.subscribe(new RetryingMessageHandlerWrapper(theHandler, getName())); 069 } 070 071 @Override 072 public boolean unsubscribe(@Nonnull MessageHandler handler) { 073 Optional<RetryingMessageHandlerWrapper> match = getSubscribers().stream() 074 .map(t -> (RetryingMessageHandlerWrapper) t) 075 .filter(t -> t.getWrappedHandler() == handler) 076 .findFirst(); 077 match.ifPresent(super::unsubscribe); 078 return match.isPresent(); 079 } 080 081 @Override 082 public void destroy() { 083 // nothing 084 } 085 086 /** 087 * Creates a synchronous channel, mostly intended for testing 088 */ 089 public static LinkedBlockingChannel newSynchronous(String theName) { 090 return new LinkedBlockingChannel(theName, null, () -> 0); 091 } 092}