001package ca.uhn.fhir.jpa.subscription.submit.interceptor;
002
003import ca.uhn.fhir.context.FhirContext;
004import ca.uhn.fhir.interceptor.api.Hook;
005import ca.uhn.fhir.interceptor.api.HookParams;
006import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
007import ca.uhn.fhir.interceptor.api.Interceptor;
008import ca.uhn.fhir.interceptor.api.Pointcut;
009import ca.uhn.fhir.jpa.api.config.DaoConfig;
010import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
011import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
012import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
013import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
014import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
015import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
016import ca.uhn.fhir.rest.api.server.RequestDetails;
017import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
018import com.google.common.annotations.VisibleForTesting;
019import org.apache.commons.lang3.Validate;
020import org.hl7.fhir.instance.model.api.IBaseResource;
021import org.slf4j.Logger;
022import org.slf4j.LoggerFactory;
023import org.springframework.beans.factory.annotation.Autowired;
024import org.springframework.context.event.ContextRefreshedEvent;
025import org.springframework.context.event.EventListener;
026import org.springframework.messaging.MessageChannel;
027import org.springframework.transaction.support.TransactionSynchronizationAdapter;
028import org.springframework.transaction.support.TransactionSynchronizationManager;
029
030import static org.apache.commons.lang3.StringUtils.isNotBlank;
031
032/*-
033 * #%L
034 * HAPI FHIR Subscription Server
035 * %%
036 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
037 * %%
038 * Licensed under the Apache License, Version 2.0 (the "License");
039 * you may not use this file except in compliance with the License.
040 * You may obtain a copy of the License at
041 *
042 *      http://www.apache.org/licenses/LICENSE-2.0
043 *
044 * Unless required by applicable law or agreed to in writing, software
045 * distributed under the License is distributed on an "AS IS" BASIS,
046 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
047 * See the License for the specific language governing permissions and
048 * limitations under the License.
049 * #L%
050 */
051
052@Interceptor
053public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
054        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
055        @Autowired
056        private FhirContext myFhirContext;
057        @Autowired
058        private IInterceptorBroadcaster myInterceptorBroadcaster;
059        @Autowired
060        private SubscriptionChannelFactory mySubscriptionChannelFactory;
061        @Autowired
062        private DaoConfig myDaoConfig;
063
064        private volatile MessageChannel myMatchingChannel;
065
066        /**
067         * Constructor
068         */
069        public SubscriptionMatcherInterceptor() {
070                super();
071        }
072
073        @EventListener(classes = {ContextRefreshedEvent.class})
074        public void startIfNeeded() {
075                if (myDaoConfig.getSupportedSubscriptionTypes().isEmpty()) {
076                        ourLog.debug("Subscriptions are disabled on this server.  Skipping {} channel creation.", SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME);
077                        return;
078                }
079                if (myMatchingChannel == null) {
080                        myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME, null);
081                }
082        }
083
084        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
085        public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) {
086                startIfNeeded();
087                submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
088        }
089
090        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
091        public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) {
092                startIfNeeded();
093                submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
094        }
095
096        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
097        public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) {
098                startIfNeeded();
099                if (!myDaoConfig.isTriggerSubscriptionsForNonVersioningChanges()) {
100                        if (theOldResource != null && theNewResource != null) {
101                                String oldVersion = theOldResource.getIdElement().getVersionIdPart();
102                                String newVersion = theNewResource.getIdElement().getVersionIdPart();
103                                if (isNotBlank(oldVersion) && isNotBlank(newVersion) && oldVersion.equals(newVersion)) {
104                                        return;
105                                }
106                        }
107                }
108
109                submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
110        }
111
112        /**
113         * This is an internal API - Use with caution!
114         */
115        @Override
116        public void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
117                ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType, theRequest);
118
119                // Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
120                HookParams params = new HookParams()
121                        .add(ResourceModifiedMessage.class, msg);
122                boolean outcome = CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequest, Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, params);
123                if (!outcome) {
124                        return;
125                }
126
127                submitResourceModified(msg);
128        }
129
130        /**
131         * This is an internal API - Use with caution!
132         */
133        @Override
134        public void submitResourceModified(final ResourceModifiedMessage theMsg) {
135                /*
136                 * We only want to submit the message to the processing queue once the
137                 * transaction is committed. We do this in order to make sure that the
138                 * data is actually in the DB, in case it's the database matcher.
139                 */
140                if (TransactionSynchronizationManager.isSynchronizationActive()) {
141                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
142                                @Override
143                                public int getOrder() {
144                                        return 0;
145                                }
146
147                                @Override
148                                public void afterCommit() {
149                                        sendToProcessingChannel(theMsg);
150                                }
151                        });
152                } else {
153                        sendToProcessingChannel(theMsg);
154                }
155        }
156
157        protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
158                ourLog.trace("Sending resource modified message to processing channel");
159                Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
160                myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
161        }
162
163        public void setFhirContext(FhirContext theCtx) {
164                myFhirContext = theCtx;
165        }
166
167        @VisibleForTesting
168        public LinkedBlockingChannel getProcessingChannelForUnitTest() {
169                return (LinkedBlockingChannel) myMatchingChannel;
170        }
171}