001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.submit.interceptor;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.interceptor.api.Hook;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Interceptor;
027import ca.uhn.fhir.interceptor.api.Pointcut;
028import ca.uhn.fhir.interceptor.model.RequestPartitionId;
029import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
030import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
031import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
032import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
033import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
034import ca.uhn.fhir.rest.api.server.RequestDetails;
035import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
036import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
037import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
038import org.hl7.fhir.instance.model.api.IBaseResource;
039import org.hl7.fhir.instance.model.api.IIdType;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.springframework.beans.factory.annotation.Autowired;
043import org.springframework.messaging.MessageDeliveryException;
044
045import static java.util.Objects.isNull;
046import static org.apache.commons.lang3.StringUtils.isBlank;
047
048/**
049 *
050 * This interceptor is responsible for submitting operations on resources to the subscription pipeline.
051 *
052 */
053@Interceptor
054public class SubscriptionMatcherInterceptor {
055        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
056
057        @Autowired
058        private FhirContext myFhirContext;
059
060        @Autowired
061        private IInterceptorBroadcaster myInterceptorBroadcaster;
062
063        @Autowired
064        private SubscriptionSettings mySubscriptionSettings;
065
066        @Autowired
067        private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
068
069        @Autowired
070        private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
071
072        @Autowired
073        private IResourceModifiedConsumer myResourceModifiedConsumer;
074
075        /**
076         * Constructor
077         */
078        public SubscriptionMatcherInterceptor() {
079                super();
080        }
081
082        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
083        public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) {
084
085                processResourceModifiedEvent(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
086        }
087
088        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
089        public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest, IIdType previousId) {
090
091                // When matching for deleted resources, we want to match for the version before it was deleted because
092                // the resource version record for the deleted resource doesn't contain a body.
093                theResource.setId(previousId);
094                processResourceModifiedEvent(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
095        }
096
097        @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
098        public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) {
099                boolean dontTriggerSubscriptionWhenVersionsAreTheSame =
100                                !mySubscriptionSettings.isTriggerSubscriptionsForNonVersioningChanges();
101                boolean resourceVersionsAreTheSame = isSameResourceVersion(theOldResource, theNewResource);
102
103                if (dontTriggerSubscriptionWhenVersionsAreTheSame && resourceVersionsAreTheSame) {
104                        return;
105                }
106
107                processResourceModifiedEvent(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
108        }
109
110        /**
111         * This is an internal API - Use with caution!
112         *
113         * This method will create a {@link ResourceModifiedMessage}, persist it and arrange for its delivery to the
114         * subscription pipeline after the resource was committed.  The message is persisted to provide asynchronous submission
115         * in the event where submission would fail.
116         */
117        protected void processResourceModifiedEvent(
118                        IBaseResource theNewResource,
119                        ResourceModifiedMessage.OperationTypeEnum theOperationType,
120                        RequestDetails theRequest) {
121
122                ResourceModifiedMessage msg = createResourceModifiedMessage(theNewResource, theOperationType, theRequest);
123
124                // Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
125                IInterceptorBroadcaster compositeBroadcaster =
126                                CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, theRequest);
127                if (compositeBroadcaster.hasHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED)) {
128                        HookParams params = new HookParams().add(ResourceModifiedMessage.class, msg);
129                        boolean outcome = compositeBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, params);
130
131                        if (!outcome) {
132                                return;
133                        }
134                }
135
136                processResourceModifiedMessage(msg);
137        }
138
139        protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) {
140                // Persist the message for async submission to the processing pipeline.
141                // see {@link AsyncResourceModifiedProcessingSchedulerSvc}
142                // If enabled in {@link JpaStorageSettings} the subscription will be handled immediately.
143
144                if (mySubscriptionSettings.isSubscriptionChangeQueuedImmediately()
145                                && theResourceModifiedMessage.hasResourceType(myFhirContext, "Subscription")) {
146                        try {
147                                myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
148                                return;
149                        } catch (MessageDeliveryException exception) {
150                                String payloadId = theResourceModifiedMessage.getPayloadId();
151                                String subscriptionId = theResourceModifiedMessage.getSubscriptionId();
152                                ourLog.error(
153                                                "Channel submission failed for resource with id {} matching subscription with id {}.  Further attempts will be performed at later time.",
154                                                payloadId,
155                                                subscriptionId,
156                                                exception);
157                        }
158                }
159
160                IPersistedResourceModifiedMessage persistedResourceModifiedMessage =
161                                myResourceModifiedMessagePersistenceSvc.persist(theResourceModifiedMessage);
162        }
163
164        protected ResourceModifiedMessage createResourceModifiedMessage(
165                        IBaseResource theNewResource,
166                        BaseResourceMessage.OperationTypeEnum theOperationType,
167                        RequestDetails theRequest) {
168                // Even though the resource is being written, the subscription will be interacting with it by effectively
169                // "reading" it so we set the RequestPartitionId as a read request
170                RequestPartitionId requestPartitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(
171                                theRequest, theNewResource.getIdElement());
172                return new ResourceModifiedMessage(
173                                myFhirContext, theNewResource, theOperationType, theRequest, requestPartitionId);
174        }
175
176        private boolean isSameResourceVersion(IBaseResource theOldResource, IBaseResource theNewResource) {
177                if (isNull(theOldResource) || isNull(theNewResource)) {
178                        return false;
179                }
180
181                String oldVersion = theOldResource.getIdElement().getVersionIdPart();
182                String newVersion = theNewResource.getIdElement().getVersionIdPart();
183
184                if (isBlank(oldVersion) || isBlank(newVersion)) {
185                        return false;
186                }
187
188                return oldVersion.equals(newVersion);
189        }
190
191        public void setFhirContext(FhirContext theCtx) {
192                myFhirContext = theCtx;
193        }
194}