001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.submit.interceptor;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.interceptor.api.Hook;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Interceptor;
027import ca.uhn.fhir.interceptor.api.Pointcut;
028import ca.uhn.fhir.interceptor.model.RequestPartitionId;
029import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
030import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
031import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
032import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
033import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
034import ca.uhn.fhir.rest.api.server.RequestDetails;
035import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
036import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
037import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
038import org.hl7.fhir.instance.model.api.IBaseResource;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.springframework.beans.factory.annotation.Autowired;
042import org.springframework.messaging.MessageDeliveryException;
043
044import static java.util.Objects.isNull;
045import static org.apache.commons.lang3.StringUtils.isBlank;
046
047/**
048 *
049 * This interceptor is responsible for submitting operations on resources to the subscription pipeline.
050 *
051 */
052@Interceptor
053public class SubscriptionMatcherInterceptor {
054        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
055
056        @Autowired
057        private FhirContext myFhirContext;
058
059        @Autowired
060        private IInterceptorBroadcaster myInterceptorBroadcaster;
061
062        @Autowired
063        private SubscriptionSettings mySubscriptionSettings;
064
065        @Autowired
066        private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
067
068        @Autowired
069        private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
070
071        @Autowired
072        private IResourceModifiedConsumer myResourceModifiedConsumer;
073
074        /**
075         * Constructor
076         */
077        public SubscriptionMatcherInterceptor() {
078                super();
079        }
080
081        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
082        public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) {
083
084                processResourceModifiedEvent(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
085        }
086
087        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
088        public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) {
089
090                processResourceModifiedEvent(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
091        }
092
093        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
094        public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) {
095                boolean dontTriggerSubscriptionWhenVersionsAreTheSame =
096                                !mySubscriptionSettings.isTriggerSubscriptionsForNonVersioningChanges();
097                boolean resourceVersionsAreTheSame = isSameResourceVersion(theOldResource, theNewResource);
098
099                if (dontTriggerSubscriptionWhenVersionsAreTheSame && resourceVersionsAreTheSame) {
100                        return;
101                }
102
103                processResourceModifiedEvent(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
104        }
105
106        /**
107         * This is an internal API - Use with caution!
108         *
109         * This method will create a {@link ResourceModifiedMessage}, persist it and arrange for its delivery to the
110         * subscription pipeline after the resource was committed.  The message is persisted to provide asynchronous submission
111         * in the event where submission would fail.
112         */
113        protected void processResourceModifiedEvent(
114                        IBaseResource theNewResource,
115                        ResourceModifiedMessage.OperationTypeEnum theOperationType,
116                        RequestDetails theRequest) {
117
118                ResourceModifiedMessage msg = createResourceModifiedMessage(theNewResource, theOperationType, theRequest);
119
120                // Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
121                IInterceptorBroadcaster compositeBroadcaster =
122                                CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, theRequest);
123                if (compositeBroadcaster.hasHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED)) {
124                        HookParams params = new HookParams().add(ResourceModifiedMessage.class, msg);
125                        boolean outcome = compositeBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, params);
126
127                        if (!outcome) {
128                                return;
129                        }
130                }
131
132                processResourceModifiedMessage(msg);
133        }
134
135        protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) {
136                // Persist the message for async submission to the processing pipeline.
137                // see {@link AsyncResourceModifiedProcessingSchedulerSvc}
138                // If enabled in {@link JpaStorageSettings} the subscription will be handled immediately.
139
140                if (mySubscriptionSettings.isSubscriptionChangeQueuedImmediately()
141                                && theResourceModifiedMessage.hasPayloadType(myFhirContext, "Subscription")) {
142                        try {
143                                myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
144                                return;
145                        } catch (MessageDeliveryException exception) {
146                                String payloadId = theResourceModifiedMessage.getPayloadId();
147                                String subscriptionId = theResourceModifiedMessage.getSubscriptionId();
148                                ourLog.error(
149                                                "Channel submission failed for resource with id {} matching subscription with id {}.  Further attempts will be performed at later time.",
150                                                payloadId,
151                                                subscriptionId,
152                                                exception);
153                        }
154                }
155
156                IPersistedResourceModifiedMessage persistedResourceModifiedMessage =
157                                myResourceModifiedMessagePersistenceSvc.persist(theResourceModifiedMessage);
158        }
159
160        protected ResourceModifiedMessage createResourceModifiedMessage(
161                        IBaseResource theNewResource,
162                        BaseResourceMessage.OperationTypeEnum theOperationType,
163                        RequestDetails theRequest) {
164                // Even though the resource is being written, the subscription will be interacting with it by effectively
165                // "reading" it so we set the RequestPartitionId as a read request
166                RequestPartitionId requestPartitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(
167                                theRequest, theNewResource.getIdElement());
168                return new ResourceModifiedMessage(
169                                myFhirContext, theNewResource, theOperationType, theRequest, requestPartitionId);
170        }
171
172        private boolean isSameResourceVersion(IBaseResource theOldResource, IBaseResource theNewResource) {
173                if (isNull(theOldResource) || isNull(theNewResource)) {
174                        return false;
175                }
176
177                String oldVersion = theOldResource.getIdElement().getVersionIdPart();
178                String newVersion = theNewResource.getIdElement().getVersionIdPart();
179
180                if (isBlank(oldVersion) || isBlank(newVersion)) {
181                        return false;
182                }
183
184                return oldVersion.equals(newVersion);
185        }
186
187        public void setFhirContext(FhirContext theCtx) {
188                myFhirContext = theCtx;
189        }
190}