001package ca.uhn.fhir.jpa.subscription.channel.impl;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
024import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
025import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
026import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
027import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
028import ca.uhn.fhir.jpa.subscription.channel.api.IChannelSettings;
029import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
030import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
031import ca.uhn.fhir.util.StopWatch;
032import org.apache.commons.lang3.concurrent.BasicThreadFactory;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import javax.annotation.PreDestroy;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.Map;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.ThreadFactory;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046
047public class LinkedBlockingChannelFactory implements IChannelFactory {
048
049        private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingChannelFactory.class);
050        private Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
051        private final IChannelNamer myChannelNamer;
052
053        public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer) {
054                myChannelNamer = theChannelNamer;
055        }
056
057        @Override
058        public IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theChannelSettings) {
059                return getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings);
060        }
061
062        @Override
063        public IChannelProducer getOrCreateProducer(String theChannelName, Class<?> theMessageType, ChannelProducerSettings theChannelSettings) {
064                return getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings);
065        }
066
067        @Override
068        public IChannelNamer getChannelNamer() {
069                return myChannelNamer;
070        }
071
072        private LinkedBlockingChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers, IChannelSettings theChannelSettings) {
073                final String channelName = myChannelNamer.getChannelName(theChannelName, theChannelSettings);
074
075                return myChannels.computeIfAbsent(channelName, t -> {
076
077                        String threadNamingPattern = channelName + "-%d";
078
079                        ThreadFactory threadFactory = new BasicThreadFactory.Builder()
080                                .namingPattern(threadNamingPattern)
081                                .daemon(false)
082                                .priority(Thread.NORM_PRIORITY)
083                                .build();
084
085                        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE);
086                        RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
087                                ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", queue.size());
088                                StopWatch sw = new StopWatch();
089                                try {
090                                        queue.put(theRunnable);
091                                } catch (InterruptedException e) {
092                                        Thread.currentThread().interrupt();
093                                        throw new RejectedExecutionException("Task " + theRunnable.toString() +
094                                                " rejected from " + e.toString());
095                                }
096                                ourLog.info("Slot become available after {}ms", sw.getMillis());
097                        };
098                        ThreadPoolExecutor executor = new ThreadPoolExecutor(
099                                1,
100                                theConcurrentConsumers,
101                                0L,
102                                TimeUnit.MILLISECONDS,
103                                queue,
104                                threadFactory,
105                                rejectedExecutionHandler);
106                        return new LinkedBlockingChannel(channelName, executor, queue);
107
108                });
109        }
110
111
112        @PreDestroy
113        public void stop() {
114                myChannels.clear();
115        }
116
117}