001package ca.uhn.fhir.jpa.subscription; 002 003/*- 004 * #%L 005 * HAPI FHIR JPA Server 006 * %% 007 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.context.ConfigurationException; 024import ca.uhn.fhir.context.FhirContext; 025import ca.uhn.fhir.i18n.Msg; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao; 029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 030import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; 031import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK; 032import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK; 033import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity; 034import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc; 035import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 036import ca.uhn.fhir.model.primitive.IdDt; 037import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 038import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 039import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 040import com.fasterxml.jackson.core.JsonProcessingException; 041import com.fasterxml.jackson.databind.ObjectMapper; 042import org.hl7.fhir.instance.model.api.IBaseResource; 043import org.hl7.fhir.instance.model.api.IIdType; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.springframework.data.domain.Page; 047import org.springframework.data.domain.Pageable; 048 049import java.util.Date; 050import java.util.Optional; 051 052import static ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK.with; 053 054/** 055 * This implementer provides the capability to persist subscription messages for asynchronous submission 056 * to the subscription processing pipeline with the purpose of offering a retry mechanism 057 * upon submission failure (see @link {@link AsyncResourceModifiedSubmitterSvc}). 058 */ 059public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModifiedMessagePersistenceSvc { 060 061 private final FhirContext myFhirContext; 062 063 private final IResourceModifiedDao myResourceModifiedDao; 064 065 private final DaoRegistry myDaoRegistry; 066 067 private final ObjectMapper myObjectMapper; 068 069 private final HapiTransactionService myHapiTransactionService; 070 071 private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedMessagePersistenceSvcImpl.class); 072 073 public ResourceModifiedMessagePersistenceSvcImpl( 074 FhirContext theFhirContext, 075 IResourceModifiedDao theResourceModifiedDao, 076 DaoRegistry theDaoRegistry, 077 HapiTransactionService theHapiTransactionService) { 078 myFhirContext = theFhirContext; 079 myResourceModifiedDao = theResourceModifiedDao; 080 myDaoRegistry = theDaoRegistry; 081 myHapiTransactionService = theHapiTransactionService; 082 myObjectMapper = new ObjectMapper(); 083 } 084 085 @Override 086 public Page<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime(Pageable thePageable) { 087 return myHapiTransactionService.withSystemRequest().execute(() -> { 088 return myResourceModifiedDao.findAllOrderedByCreatedTime(thePageable); 089 }); 090 } 091 092 @Override 093 public IPersistedResourceModifiedMessage persist(ResourceModifiedMessage theMsg) { 094 ResourceModifiedEntity resourceModifiedEntity = createEntityFrom(theMsg); 095 return myResourceModifiedDao.save(resourceModifiedEntity); 096 } 097 098 @Override 099 public ResourceModifiedMessage inflatePersistedResourceModifiedMessage( 100 ResourceModifiedMessage theResourceModifiedMessage) { 101 102 return inflateResourceModifiedMessageFromEntity(createEntityFrom(theResourceModifiedMessage)); 103 } 104 105 @Override 106 public Optional<ResourceModifiedMessage> inflatePersistedResourceModifiedMessageOrNull( 107 ResourceModifiedMessage theResourceModifiedMessage) { 108 ResourceModifiedMessage inflatedResourceModifiedMessage = null; 109 110 try { 111 inflatedResourceModifiedMessage = inflatePersistedResourceModifiedMessage(theResourceModifiedMessage); 112 } catch (ResourceNotFoundException e) { 113 IdDt idDt = new IdDt( 114 theResourceModifiedMessage.getPayloadType(myFhirContext), 115 theResourceModifiedMessage.getPayloadId(), 116 theResourceModifiedMessage.getPayloadVersion()); 117 118 ourLog.warn("Scheduled submission will be ignored since resource {} cannot be found", idDt.getIdPart(), e); 119 } catch (Exception ex) { 120 ourLog.error("Unknown error encountered on inflation of resources.", ex); 121 } 122 123 return Optional.ofNullable(inflatedResourceModifiedMessage); 124 } 125 126 @Override 127 public ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation( 128 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 129 ResourceModifiedMessage resourceModifiedMessage = getPayloadLessMessageFromString( 130 ((ResourceModifiedEntity) thePersistedResourceModifiedMessage).getSummaryResourceModifiedMessage()); 131 132 IdDt resourceId = 133 createIdDtFromResourceModifiedEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage); 134 resourceModifiedMessage.setPayloadId(resourceId); 135 136 return resourceModifiedMessage; 137 } 138 139 @Override 140 public long getMessagePersistedCount() { 141 return myResourceModifiedDao.count(); 142 } 143 144 @Override 145 public boolean deleteByPK(IPersistedResourceModifiedMessagePK theResourceModifiedPK) { 146 int removedCount = 147 myResourceModifiedDao.removeById((PersistedResourceModifiedMessageEntityPK) theResourceModifiedPK); 148 149 return removedCount == 1; 150 } 151 152 protected ResourceModifiedMessage inflateResourceModifiedMessageFromEntity( 153 ResourceModifiedEntity theResourceModifiedEntity) { 154 String resourceType = theResourceModifiedEntity.getResourceType(); 155 ResourceModifiedMessage retVal = 156 getPayloadLessMessageFromString(theResourceModifiedEntity.getSummaryResourceModifiedMessage()); 157 SystemRequestDetails systemRequestDetails = 158 new SystemRequestDetails().setRequestPartitionId(retVal.getPartitionId()); 159 160 IdDt resourceIdDt = createIdDtFromResourceModifiedEntity(theResourceModifiedEntity); 161 IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); 162 163 IBaseResource iBaseResource = dao.read(resourceIdDt, systemRequestDetails, true); 164 165 retVal.setNewPayload(myFhirContext, iBaseResource); 166 167 return retVal; 168 } 169 170 ResourceModifiedEntity createEntityFrom(ResourceModifiedMessage theMsg) { 171 IIdType theMsgId = theMsg.getPayloadId(myFhirContext); 172 173 ResourceModifiedEntity resourceModifiedEntity = new ResourceModifiedEntity(); 174 resourceModifiedEntity.setResourceModifiedEntityPK(with(theMsgId.getIdPart(), theMsgId.getVersionIdPart())); 175 176 String partialModifiedMessage = getPayloadLessMessageAsString(theMsg); 177 resourceModifiedEntity.setSummaryResourceModifiedMessage(partialModifiedMessage); 178 resourceModifiedEntity.setResourceType(theMsgId.getResourceType()); 179 resourceModifiedEntity.setCreatedTime(new Date()); 180 181 return resourceModifiedEntity; 182 } 183 184 private ResourceModifiedMessage getPayloadLessMessageFromString(String thePayloadLessMessage) { 185 try { 186 return myObjectMapper.readValue(thePayloadLessMessage, ResourceModifiedMessage.class); 187 } catch (JsonProcessingException e) { 188 throw new ConfigurationException(Msg.code(2334) + "Failed to json deserialize payloadless message", e); 189 } 190 } 191 192 private String getPayloadLessMessageAsString(ResourceModifiedMessage theMsg) { 193 ResourceModifiedMessage tempMessage = new PayloadLessResourceModifiedMessage(theMsg); 194 195 try { 196 return myObjectMapper.writeValueAsString(tempMessage); 197 } catch (JsonProcessingException e) { 198 throw new ConfigurationException(Msg.code(2335) + "Failed to serialize empty ResourceModifiedMessage", e); 199 } 200 } 201 202 private IdDt createIdDtFromResourceModifiedEntity(ResourceModifiedEntity theResourceModifiedEntity) { 203 String resourcePid = 204 theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid(); 205 String resourceVersion = 206 theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion(); 207 String resourceType = theResourceModifiedEntity.getResourceType(); 208 209 return new IdDt(resourceType, resourcePid, resourceVersion); 210 } 211 212 private static class PayloadLessResourceModifiedMessage extends ResourceModifiedMessage { 213 214 public PayloadLessResourceModifiedMessage(ResourceModifiedMessage theMsg) { 215 this.myPayloadId = theMsg.getPayloadId(); 216 this.myPayloadVersion = theMsg.getPayloadVersion(); 217 setSubscriptionId(theMsg.getSubscriptionId()); 218 setMediaType(theMsg.getMediaType()); 219 setOperationType(theMsg.getOperationType()); 220 setPartitionId(theMsg.getPartitionId()); 221 setTransactionId(theMsg.getTransactionId()); 222 setMessageKey(theMsg.getMessageKeyOrNull()); 223 copyAdditionalPropertiesFrom(theMsg); 224 } 225 } 226}