001package ca.uhn.fhir.jpa.subscription;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.ConfigurationException;
024import ca.uhn.fhir.context.FhirContext;
025import ca.uhn.fhir.i18n.Msg;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
028import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
030import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
031import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
032import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
033import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
034import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc;
035import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
036import ca.uhn.fhir.model.primitive.IdDt;
037import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
038import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
039import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
040import com.fasterxml.jackson.core.JsonProcessingException;
041import com.fasterxml.jackson.databind.ObjectMapper;
042import org.hl7.fhir.instance.model.api.IBaseResource;
043import org.hl7.fhir.instance.model.api.IIdType;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.springframework.data.domain.Page;
047import org.springframework.data.domain.Pageable;
048
049import java.util.Date;
050import java.util.Optional;
051
052import static ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK.with;
053
054/**
055 * This implementer provides the capability to persist subscription messages for asynchronous submission
056 * to the subscription processing pipeline with the purpose of offering a retry mechanism
057 * upon submission failure (see @link {@link AsyncResourceModifiedSubmitterSvc}).
058 */
059public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModifiedMessagePersistenceSvc {
060
061        private final FhirContext myFhirContext;
062
063        private final IResourceModifiedDao myResourceModifiedDao;
064
065        private final DaoRegistry myDaoRegistry;
066
067        private final ObjectMapper myObjectMapper;
068
069        private final HapiTransactionService myHapiTransactionService;
070
071        private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedMessagePersistenceSvcImpl.class);
072
073        public ResourceModifiedMessagePersistenceSvcImpl(
074                        FhirContext theFhirContext,
075                        IResourceModifiedDao theResourceModifiedDao,
076                        DaoRegistry theDaoRegistry,
077                        HapiTransactionService theHapiTransactionService) {
078                myFhirContext = theFhirContext;
079                myResourceModifiedDao = theResourceModifiedDao;
080                myDaoRegistry = theDaoRegistry;
081                myHapiTransactionService = theHapiTransactionService;
082                myObjectMapper = new ObjectMapper();
083        }
084
085        @Override
086        public Page<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime(Pageable thePageable) {
087                return myHapiTransactionService.withSystemRequest().execute(() -> {
088                        return myResourceModifiedDao.findAllOrderedByCreatedTime(thePageable);
089                });
090        }
091
092        @Override
093        public IPersistedResourceModifiedMessage persist(ResourceModifiedMessage theMsg) {
094                ResourceModifiedEntity resourceModifiedEntity = createEntityFrom(theMsg);
095                return myResourceModifiedDao.save(resourceModifiedEntity);
096        }
097
098        @Override
099        public ResourceModifiedMessage inflatePersistedResourceModifiedMessage(
100                        ResourceModifiedMessage theResourceModifiedMessage) {
101
102                return inflateResourceModifiedMessageFromEntity(createEntityFrom(theResourceModifiedMessage));
103        }
104
105        @Override
106        public Optional<ResourceModifiedMessage> inflatePersistedResourceModifiedMessageOrNull(
107                        ResourceModifiedMessage theResourceModifiedMessage) {
108                ResourceModifiedMessage inflatedResourceModifiedMessage = null;
109
110                try {
111                        inflatedResourceModifiedMessage = inflatePersistedResourceModifiedMessage(theResourceModifiedMessage);
112                } catch (ResourceNotFoundException e) {
113                        IdDt idDt = new IdDt(
114                                        theResourceModifiedMessage.getPayloadType(myFhirContext),
115                                        theResourceModifiedMessage.getPayloadId(),
116                                        theResourceModifiedMessage.getPayloadVersion());
117
118                        ourLog.warn("Scheduled submission will be ignored since resource {} cannot be found", idDt.getIdPart(), e);
119                } catch (Exception ex) {
120                        ourLog.error("Unknown error encountered on inflation of resources.", ex);
121                }
122
123                return Optional.ofNullable(inflatedResourceModifiedMessage);
124        }
125
126        @Override
127        public ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation(
128                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
129                ResourceModifiedMessage resourceModifiedMessage = getPayloadLessMessageFromString(
130                                ((ResourceModifiedEntity) thePersistedResourceModifiedMessage).getSummaryResourceModifiedMessage());
131
132                IdDt resourceId =
133                                createIdDtFromResourceModifiedEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage);
134                resourceModifiedMessage.setPayloadId(resourceId);
135
136                return resourceModifiedMessage;
137        }
138
139        @Override
140        public long getMessagePersistedCount() {
141                return myResourceModifiedDao.count();
142        }
143
144        @Override
145        public boolean deleteByPK(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
146                int removedCount =
147                                myResourceModifiedDao.removeById((PersistedResourceModifiedMessageEntityPK) theResourceModifiedPK);
148
149                return removedCount == 1;
150        }
151
152        protected ResourceModifiedMessage inflateResourceModifiedMessageFromEntity(
153                        ResourceModifiedEntity theResourceModifiedEntity) {
154                String resourceType = theResourceModifiedEntity.getResourceType();
155                ResourceModifiedMessage retVal =
156                                getPayloadLessMessageFromString(theResourceModifiedEntity.getSummaryResourceModifiedMessage());
157                SystemRequestDetails systemRequestDetails =
158                                new SystemRequestDetails().setRequestPartitionId(retVal.getPartitionId());
159
160                IdDt resourceIdDt = createIdDtFromResourceModifiedEntity(theResourceModifiedEntity);
161                IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType);
162
163                IBaseResource iBaseResource = dao.read(resourceIdDt, systemRequestDetails, true);
164
165                retVal.setNewPayload(myFhirContext, iBaseResource);
166
167                return retVal;
168        }
169
170        ResourceModifiedEntity createEntityFrom(ResourceModifiedMessage theMsg) {
171                IIdType theMsgId = theMsg.getPayloadId(myFhirContext);
172
173                ResourceModifiedEntity resourceModifiedEntity = new ResourceModifiedEntity();
174                resourceModifiedEntity.setResourceModifiedEntityPK(with(theMsgId.getIdPart(), theMsgId.getVersionIdPart()));
175
176                String partialModifiedMessage = getPayloadLessMessageAsString(theMsg);
177                resourceModifiedEntity.setSummaryResourceModifiedMessage(partialModifiedMessage);
178                resourceModifiedEntity.setResourceType(theMsgId.getResourceType());
179                resourceModifiedEntity.setCreatedTime(new Date());
180
181                return resourceModifiedEntity;
182        }
183
184        private ResourceModifiedMessage getPayloadLessMessageFromString(String thePayloadLessMessage) {
185                try {
186                        return myObjectMapper.readValue(thePayloadLessMessage, ResourceModifiedMessage.class);
187                } catch (JsonProcessingException e) {
188                        throw new ConfigurationException(Msg.code(2334) + "Failed to json deserialize payloadless  message", e);
189                }
190        }
191
192        private String getPayloadLessMessageAsString(ResourceModifiedMessage theMsg) {
193                ResourceModifiedMessage tempMessage = new PayloadLessResourceModifiedMessage(theMsg);
194
195                try {
196                        return myObjectMapper.writeValueAsString(tempMessage);
197                } catch (JsonProcessingException e) {
198                        throw new ConfigurationException(Msg.code(2335) + "Failed to serialize empty ResourceModifiedMessage", e);
199                }
200        }
201
202        private IdDt createIdDtFromResourceModifiedEntity(ResourceModifiedEntity theResourceModifiedEntity) {
203                String resourcePid =
204                                theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid();
205                String resourceVersion =
206                                theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion();
207                String resourceType = theResourceModifiedEntity.getResourceType();
208
209                return new IdDt(resourceType, resourcePid, resourceVersion);
210        }
211
212        private static class PayloadLessResourceModifiedMessage extends ResourceModifiedMessage {
213
214                public PayloadLessResourceModifiedMessage(ResourceModifiedMessage theMsg) {
215                        this.myPayloadId = theMsg.getPayloadId();
216                        this.myPayloadVersion = theMsg.getPayloadVersion();
217                        setSubscriptionId(theMsg.getSubscriptionId());
218                        setMediaType(theMsg.getMediaType());
219                        setOperationType(theMsg.getOperationType());
220                        setPartitionId(theMsg.getPartitionId());
221                        setTransactionId(theMsg.getTransactionId());
222                        setMessageKey(theMsg.getMessageKeyOrNull());
223                        copyAdditionalPropertiesFrom(theMsg);
224                }
225        }
226}