001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.binstore; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.binary.api.StoredDetails; 024import ca.uhn.fhir.jpa.binary.svc.BaseBinaryStorageSvcImpl; 025import ca.uhn.fhir.jpa.dao.data.IBinaryStorageEntityDao; 026import ca.uhn.fhir.jpa.model.entity.BinaryStorageEntity; 027import ca.uhn.fhir.rest.api.server.RequestDetails; 028import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 029import com.google.common.hash.HashingInputStream; 030import com.google.common.io.ByteStreams; 031import jakarta.annotation.Nonnull; 032import jakarta.persistence.EntityManager; 033import jakarta.persistence.PersistenceContext; 034import jakarta.persistence.PersistenceContextType; 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.io.input.CountingInputStream; 037import org.hibernate.LobHelper; 038import org.hibernate.Session; 039import org.hl7.fhir.instance.model.api.IIdType; 040import org.springframework.beans.factory.annotation.Autowired; 041import org.springframework.transaction.annotation.Propagation; 042import org.springframework.transaction.annotation.Transactional; 043 044import java.io.ByteArrayInputStream; 045import java.io.IOException; 046import java.io.InputStream; 047import java.io.OutputStream; 048import java.sql.Blob; 049import java.sql.SQLException; 050import java.util.Date; 051import java.util.Optional; 052 053@Transactional 054public class DatabaseBinaryContentStorageSvcImpl extends BaseBinaryStorageSvcImpl { 055 056 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 057 private EntityManager myEntityManager; 058 059 @Autowired 060 private IBinaryStorageEntityDao myBinaryStorageEntityDao; 061 062 @Nonnull 063 @Override 064 @Transactional(propagation = Propagation.REQUIRED) 065 public StoredDetails storeBinaryContent( 066 IIdType theResourceId, 067 String theBinaryContentIdOrNull, 068 String theContentType, 069 InputStream theInputStream, 070 RequestDetails theRequestDetails) 071 throws IOException { 072 073 /* 074 * Note on transactionality: This method used to have a propagation value of SUPPORTS and then do the actual 075 * write in a new transaction.. I don't actually get why that was the original design, but it causes 076 * connection pool deadlocks under load! 077 */ 078 079 Date publishedDate = new Date(); 080 081 HashingInputStream hashingInputStream = createHashingInputStream(theInputStream); 082 CountingInputStream countingInputStream = createCountingInputStream(hashingInputStream); 083 084 BinaryStorageEntity entity = new BinaryStorageEntity(); 085 entity.setResourceId(theResourceId.toUnqualifiedVersionless().getValue()); 086 entity.setContentType(theContentType); 087 entity.setPublished(publishedDate); 088 089 Session session = (Session) myEntityManager.getDelegate(); 090 LobHelper lobHelper = session.getLobHelper(); 091 092 byte[] loadedStream = IOUtils.toByteArray(countingInputStream); 093 String id = super.provideIdForNewBinaryContent( 094 theBinaryContentIdOrNull, loadedStream, theRequestDetails, theContentType); 095 096 entity.setContentId(id); 097 entity.setStorageContentBin(loadedStream); 098 099 // TODO: remove writing Blob in a future release 100 Blob dataBlob = lobHelper.createBlob(loadedStream); 101 entity.setBlob(dataBlob); 102 103 // Update the entity with the final byte count and hash 104 long bytes = countingInputStream.getByteCount(); 105 String hash = hashingInputStream.hash().toString(); 106 entity.setSize(bytes); 107 entity.setHash(hash); 108 109 // Save the entity 110 myEntityManager.persist(entity); 111 112 return new StoredDetails() 113 .setBinaryContentId(id) 114 .setBytes(bytes) 115 .setPublished(publishedDate) 116 .setHash(hash) 117 .setContentType(theContentType); 118 } 119 120 @Override 121 public StoredDetails fetchBinaryContentDetails(IIdType theResourceId, String theBinaryContentId) { 122 123 Optional<BinaryStorageEntity> entityOpt = myBinaryStorageEntityDao.findByIdAndResourceId( 124 theBinaryContentId, theResourceId.toUnqualifiedVersionless().getValue()); 125 if (entityOpt.isEmpty()) { 126 return null; 127 } 128 129 BinaryStorageEntity entity = entityOpt.get(); 130 return new StoredDetails() 131 .setBinaryContentId(theBinaryContentId) 132 .setContentType(entity.getContentType()) 133 .setHash(entity.getHash()) 134 .setPublished(entity.getPublished()) 135 .setBytes(entity.getSize()); 136 } 137 138 @Override 139 public boolean writeBinaryContent(IIdType theResourceId, String theBinaryContentId, OutputStream theOutputStream) 140 throws IOException { 141 Optional<BinaryStorageEntity> entityOpt = myBinaryStorageEntityDao.findByIdAndResourceId( 142 theBinaryContentId, theResourceId.toUnqualifiedVersionless().getValue()); 143 if (entityOpt.isEmpty()) { 144 return false; 145 } 146 147 copyBinaryContentToOutputStream(theOutputStream, entityOpt.get()); 148 149 return true; 150 } 151 152 @Override 153 public void expungeBinaryContent(IIdType theResourceId, String theBinaryContentId) { 154 Optional<BinaryStorageEntity> entityOpt = myBinaryStorageEntityDao.findByIdAndResourceId( 155 theBinaryContentId, theResourceId.toUnqualifiedVersionless().getValue()); 156 entityOpt.ifPresent( 157 theBinaryStorageEntity -> myBinaryStorageEntityDao.deleteByPid(theBinaryStorageEntity.getContentId())); 158 } 159 160 @Override 161 public byte[] fetchBinaryContent(IIdType theResourceId, String theBinaryContentId) throws IOException { 162 BinaryStorageEntity entityOpt = myBinaryStorageEntityDao 163 .findByIdAndResourceId( 164 theBinaryContentId, 165 theResourceId.toUnqualifiedVersionless().getValue()) 166 .orElseThrow(() -> new ResourceNotFoundException( 167 "Unknown BinaryContent ID: " + theBinaryContentId + " for resource ID " + theResourceId)); 168 169 return copyBinaryContentToByteArray(entityOpt); 170 } 171 172 void copyBinaryContentToOutputStream(OutputStream theOutputStream, BinaryStorageEntity theEntity) 173 throws IOException { 174 175 try (InputStream inputStream = getBinaryContent(theEntity)) { 176 IOUtils.copy(inputStream, theOutputStream); 177 } catch (SQLException e) { 178 throw new IOException(Msg.code(1341) + e); 179 } 180 } 181 182 byte[] copyBinaryContentToByteArray(BinaryStorageEntity theEntity) throws IOException { 183 byte[] retVal; 184 185 try (InputStream inputStream = getBinaryContent(theEntity)) { 186 retVal = ByteStreams.toByteArray(inputStream); 187 } catch (SQLException e) { 188 throw new IOException(Msg.code(1342) + e); 189 } 190 191 return retVal; 192 } 193 194 /** 195 * 196 * The caller is responsible for closing the returned stream. 197 * 198 * @param theEntity 199 * @return 200 * @throws SQLException 201 */ 202 private InputStream getBinaryContent(BinaryStorageEntity theEntity) throws SQLException { 203 InputStream retVal; 204 205 if (theEntity.hasStorageContent()) { 206 retVal = new ByteArrayInputStream(theEntity.getStorageContentBin()); 207 } else if (theEntity.hasBlob()) { 208 retVal = theEntity.getBlob().getBinaryStream(); 209 } else { 210 retVal = new ByteArrayInputStream(new byte[0]); 211 } 212 213 return retVal; 214 } 215}