001package ca.uhn.fhir.jpa.binstore;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.api.config.DaoConfig;
024import ca.uhn.fhir.jpa.dao.data.IBinaryStorageEntityDao;
025import ca.uhn.fhir.jpa.model.entity.BinaryStorageEntity;
026import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
027import com.google.common.hash.HashingInputStream;
028import org.apache.commons.io.IOUtils;
029import org.apache.commons.io.input.CountingInputStream;
030import org.hibernate.LobHelper;
031import org.hibernate.Session;
032import org.hl7.fhir.instance.model.api.IIdType;
033import org.springframework.beans.factory.annotation.Autowired;
034import org.springframework.transaction.PlatformTransactionManager;
035import org.springframework.transaction.TransactionDefinition;
036import org.springframework.transaction.support.TransactionTemplate;
037
038import javax.persistence.EntityManager;
039import javax.persistence.PersistenceContext;
040import javax.persistence.PersistenceContextType;
041import javax.transaction.Transactional;
042import java.io.IOException;
043import java.io.InputStream;
044import java.io.OutputStream;
045import java.sql.Blob;
046import java.sql.SQLException;
047import java.util.Date;
048import java.util.Optional;
049
050@Transactional
051public class DatabaseBlobBinaryStorageSvcImpl extends BaseBinaryStorageSvcImpl {
052
053        @PersistenceContext(type = PersistenceContextType.TRANSACTION)
054        private EntityManager myEntityManager;
055        @Autowired
056        private IBinaryStorageEntityDao myBinaryStorageEntityDao;
057        @Autowired
058        private PlatformTransactionManager myPlatformTransactionManager;
059        @Autowired
060        private DaoConfig myDaoConfig;
061
062        @Override
063        @Transactional(Transactional.TxType.REQUIRED)
064        public StoredDetails storeBlob(IIdType theResourceId, String theBlobIdOrNull, String theContentType, InputStream theInputStream) throws IOException {
065
066                /*
067                 * Note on transactionality: This method used to have a propagation value of SUPPORTS and then do the actual
068                 * write in a new transaction.. I don't actually get why that was the original design, but it causes
069                 * connection pool deadlocks under load!
070                 */
071
072                Date publishedDate = new Date();
073
074                HashingInputStream hashingInputStream = createHashingInputStream(theInputStream);
075                CountingInputStream countingInputStream = createCountingInputStream(hashingInputStream);
076
077                String id = super.provideIdForNewBlob(theBlobIdOrNull);
078
079                BinaryStorageEntity entity = new BinaryStorageEntity();
080                entity.setResourceId(theResourceId.toUnqualifiedVersionless().getValue());
081                entity.setBlobId(id);
082                entity.setBlobContentType(theContentType);
083                entity.setPublished(publishedDate);
084
085                Session session = (Session) myEntityManager.getDelegate();
086                LobHelper lobHelper = session.getLobHelper();
087                byte[] loadedStream = IOUtils.toByteArray(countingInputStream);
088                Blob dataBlob = lobHelper.createBlob(loadedStream);
089                entity.setBlob(dataBlob);
090
091                // Update the entity with the final byte count and hash
092                long bytes = countingInputStream.getCount();
093                String hash = hashingInputStream.hash().toString();
094                entity.setSize((int) bytes);
095                entity.setHash(hash);
096
097                // Save the entity
098                myEntityManager.persist(entity);
099
100                return new StoredDetails()
101                        .setBlobId(id)
102                        .setBytes(bytes)
103                        .setPublished(publishedDate)
104                        .setHash(hash)
105                        .setContentType(theContentType);
106        }
107
108        @Override
109        public StoredDetails fetchBlobDetails(IIdType theResourceId, String theBlobId) {
110
111                Optional<BinaryStorageEntity> entityOpt = myBinaryStorageEntityDao.findByIdAndResourceId(theBlobId, theResourceId.toUnqualifiedVersionless().getValue());
112                if (entityOpt.isPresent() == false) {
113                        return null;
114                }
115
116                BinaryStorageEntity entity = entityOpt.get();
117                return new StoredDetails()
118                        .setBlobId(theBlobId)
119                        .setContentType(entity.getBlobContentType())
120                        .setHash(entity.getHash())
121                        .setPublished(entity.getPublished())
122                        .setBytes(entity.getSize());
123        }
124
125        @Override
126        public boolean writeBlob(IIdType theResourceId, String theBlobId, OutputStream theOutputStream) throws IOException {
127                Optional<BinaryStorageEntity> entityOpt = myBinaryStorageEntityDao.findByIdAndResourceId(theBlobId, theResourceId.toUnqualifiedVersionless().getValue());
128                if (entityOpt.isPresent() == false) {
129                        return false;
130                }
131
132                copyBlobToOutputStream(theOutputStream, entityOpt.get());
133
134                return true;
135        }
136
137        @Override
138        public void expungeBlob(IIdType theResourceId, String theBlobId) {
139                Optional<BinaryStorageEntity> entityOpt = myBinaryStorageEntityDao.findByIdAndResourceId(theBlobId, theResourceId.toUnqualifiedVersionless().getValue());
140                entityOpt.ifPresent(theBinaryStorageEntity -> myBinaryStorageEntityDao.deleteByPid(theBinaryStorageEntity.getBlobId()));
141        }
142
143        @Override
144        public byte[] fetchBlob(IIdType theResourceId, String theBlobId) throws IOException {
145                BinaryStorageEntity entityOpt = myBinaryStorageEntityDao
146                        .findByIdAndResourceId(theBlobId, theResourceId.toUnqualifiedVersionless().getValue())
147                        .orElseThrow(() -> new ResourceNotFoundException("Unknown blob ID: " + theBlobId + " for resource ID " + theResourceId));
148
149                return copyBlobToByteArray(entityOpt);
150        }
151
152        void copyBlobToOutputStream(OutputStream theOutputStream, BinaryStorageEntity theEntity) throws IOException {
153                try (InputStream inputStream = theEntity.getBlob().getBinaryStream()) {
154                        IOUtils.copy(inputStream, theOutputStream);
155                } catch (SQLException e) {
156                        throw new IOException(e);
157                }
158        }
159
160        byte[] copyBlobToByteArray(BinaryStorageEntity theEntity) throws IOException {
161                int size = theEntity.getSize();
162                try {
163                        return IOUtils.toByteArray(theEntity.getBlob().getBinaryStream(), size);
164                } catch (SQLException e) {
165                        throw new IOException(e);
166                }
167        }
168}