001package ca.uhn.fhir.jpa.subscription.submit.interceptor;
002
003import ca.uhn.fhir.context.FhirContext;
004import ca.uhn.fhir.interceptor.api.Hook;
005import ca.uhn.fhir.interceptor.api.HookParams;
006import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
007import ca.uhn.fhir.interceptor.api.Interceptor;
008import ca.uhn.fhir.interceptor.api.Pointcut;
009import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
010import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
011import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
012import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
013import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
014import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
015import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
016import ca.uhn.fhir.rest.api.server.RequestDetails;
017import com.google.common.annotations.VisibleForTesting;
018import org.apache.commons.lang3.Validate;
019import org.hl7.fhir.instance.model.api.IBaseResource;
020import org.slf4j.Logger;
021import org.slf4j.LoggerFactory;
022import org.springframework.beans.factory.annotation.Autowired;
023import org.springframework.context.event.ContextRefreshedEvent;
024import org.springframework.context.event.EventListener;
025import org.springframework.messaging.MessageChannel;
026import org.springframework.transaction.support.TransactionSynchronizationAdapter;
027import org.springframework.transaction.support.TransactionSynchronizationManager;
028
029/*-
030 * #%L
031 * HAPI FHIR Subscription Server
032 * %%
033 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
034 * %%
035 * Licensed under the Apache License, Version 2.0 (the "License");
036 * you may not use this file except in compliance with the License.
037 * You may obtain a copy of the License at
038 *
039 *      http://www.apache.org/licenses/LICENSE-2.0
040 *
041 * Unless required by applicable law or agreed to in writing, software
042 * distributed under the License is distributed on an "AS IS" BASIS,
043 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
044 * See the License for the specific language governing permissions and
045 * limitations under the License.
046 * #L%
047 */
048
049@Interceptor
050public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
051        private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
052        @Autowired
053        private FhirContext myFhirContext;
054        @Autowired
055        private IInterceptorBroadcaster myInterceptorBroadcaster;
056        @Autowired
057        private SubscriptionChannelFactory mySubscriptionChannelFactory;
058
059        private volatile MessageChannel myMatchingChannel;
060
061        /**
062         * Constructor
063         */
064        public SubscriptionMatcherInterceptor() {
065                super();
066        }
067
068        @EventListener(classes = {ContextRefreshedEvent.class})
069        public void startIfNeeded() {
070                if (myMatchingChannel == null) {
071                        myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME, null);
072                }
073        }
074
075        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
076        public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) {
077                startIfNeeded();
078                submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
079        }
080
081        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
082        public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) {
083                startIfNeeded();
084                submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
085        }
086
087        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
088        public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) {
089                startIfNeeded();
090                submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
091        }
092
093        /**
094         * This is an internal API - Use with caution!
095         */
096        @Override
097        public void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
098                ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType, theRequest);
099
100                // Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
101                HookParams params = new HookParams()
102                        .add(ResourceModifiedMessage.class, msg);
103                boolean outcome = JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequest, Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, params);
104                if (!outcome) {
105                        return;
106                }
107
108                submitResourceModified(msg);
109        }
110
111        /**
112         * This is an internal API - Use with caution!
113         */
114        @Override
115        public void submitResourceModified(final ResourceModifiedMessage theMsg) {
116                /*
117                 * We only want to submit the message to the processing queue once the
118                 * transaction is committed. We do this in order to make sure that the
119                 * data is actually in the DB, in case it's the database matcher.
120                 */
121                if (TransactionSynchronizationManager.isSynchronizationActive()) {
122                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
123                                @Override
124                                public int getOrder() {
125                                        return 0;
126                                }
127
128                                @Override
129                                public void afterCommit() {
130                                        sendToProcessingChannel(theMsg);
131                                }
132                        });
133                } else {
134                        sendToProcessingChannel(theMsg);
135                }
136        }
137
138        protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
139                ourLog.trace("Sending resource modified message to processing channel");
140                Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
141                myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
142        }
143
144        public void setFhirContext(FhirContext theCtx) {
145                myFhirContext = theCtx;
146        }
147
148        @VisibleForTesting
149        public LinkedBlockingChannel getProcessingChannelForUnitTest() {
150                return (LinkedBlockingChannel) myMatchingChannel;
151        }
152}