001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.batch2; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.model.RequestPartitionId; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.api.pid.IResourcePidList; 029import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; 030import ca.uhn.fhir.jpa.api.pid.StreamTemplate; 031import ca.uhn.fhir.jpa.api.pid.TypedResourcePid; 032import ca.uhn.fhir.jpa.api.pid.TypedResourceStream; 033import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; 034import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; 035import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 036import ca.uhn.fhir.jpa.model.dao.JpaPid; 037import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 038import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 039import ca.uhn.fhir.rest.api.Constants; 040import ca.uhn.fhir.rest.api.SortSpec; 041import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 042import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 043import ca.uhn.fhir.util.DateRangeUtil; 044import ca.uhn.fhir.util.Logs; 045import jakarta.annotation.Nonnull; 046import jakarta.annotation.Nullable; 047import org.apache.commons.lang3.StringUtils; 048import org.apache.commons.lang3.Validate; 049 050import java.util.Date; 051import java.util.function.Supplier; 052import java.util.stream.Stream; 053 054public class Batch2DaoSvcImpl implements IBatch2DaoSvc { 055 private static final org.slf4j.Logger ourLog = Logs.getBatchTroubleshootingLog(); 056 057 private final IResourceTableDao myResourceTableDao; 058 059 private final MatchUrlService myMatchUrlService; 060 061 private final DaoRegistry myDaoRegistry; 062 063 private final FhirContext myFhirContext; 064 065 private final IHapiTransactionService myTransactionService; 066 067 @Override 068 public boolean isAllResourceTypeSupported() { 069 return true; 070 } 071 072 public Batch2DaoSvcImpl( 073 IResourceTableDao theResourceTableDao, 074 MatchUrlService theMatchUrlService, 075 DaoRegistry theDaoRegistry, 076 FhirContext theFhirContext, 077 IHapiTransactionService theTransactionService) { 078 myResourceTableDao = theResourceTableDao; 079 myMatchUrlService = theMatchUrlService; 080 myDaoRegistry = theDaoRegistry; 081 myFhirContext = theFhirContext; 082 myTransactionService = theTransactionService; 083 } 084 085 @Override 086 public IResourcePidStream fetchResourceIdStream( 087 Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId, String theUrl) { 088 if (StringUtils.isBlank(theUrl)) { 089 return makeStreamResult( 090 theRequestPartitionId, () -> streamResourceIdsNoUrl(theStart, theEnd, theRequestPartitionId)); 091 } else { 092 return makeStreamResult( 093 theRequestPartitionId, 094 () -> streamResourceIdsWithUrl(theStart, theEnd, theUrl, theRequestPartitionId)); 095 } 096 } 097 098 private Stream<TypedResourcePid> streamResourceIdsWithUrl( 099 Date theStart, Date theEnd, String theUrl, RequestPartitionId theRequestPartitionId) { 100 validateUrl(theUrl); 101 102 SearchParameterMap searchParamMap = parseQuery(theUrl); 103 searchParamMap.setLastUpdated(DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd)); 104 105 String resourceType = theUrl.substring(0, theUrl.indexOf('?')); 106 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType); 107 108 SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId); 109 110 return dao.searchForIdStream(searchParamMap, request, null).map(pid -> new TypedResourcePid(resourceType, pid)); 111 } 112 113 private static TypedResourcePid typedPidFromQueryArray(Object[] thePidTypeDateArray) { 114 String resourceType = (String) thePidTypeDateArray[1]; 115 Long pid = (Long) thePidTypeDateArray[0]; 116 return new TypedResourcePid(resourceType, JpaPid.fromId(pid)); 117 } 118 119 @Nonnull 120 private TypedResourceStream makeStreamResult( 121 RequestPartitionId theRequestPartitionId, Supplier<Stream<TypedResourcePid>> streamSupplier) { 122 123 IHapiTransactionService.IExecutionBuilder txSettings = 124 myTransactionService.withSystemRequest().withRequestPartitionId(theRequestPartitionId); 125 126 StreamTemplate<TypedResourcePid> streamTemplate = 127 StreamTemplate.fromSupplier(streamSupplier).withTransactionAdvice(txSettings); 128 129 return new TypedResourceStream(theRequestPartitionId, streamTemplate); 130 } 131 132 /** 133 * At the moment there is no use-case for this method. 134 * This can be cleaned up at a later point in time if there is no use for it. 135 */ 136 @Nonnull 137 private Stream<TypedResourcePid> streamResourceIdsNoUrl( 138 Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) { 139 Stream<Object[]> rowStream; 140 if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) { 141 ourLog.debug("Search for resources - all partitions"); 142 rowStream = myResourceTableDao.streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest( 143 theStart, theEnd); 144 } else if (theRequestPartitionId.isDefaultPartition()) { 145 ourLog.debug("Search for resources - default partition"); 146 rowStream = 147 myResourceTableDao 148 .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition( 149 theStart, theEnd); 150 } else { 151 ourLog.debug("Search for resources - partition {}", theRequestPartitionId); 152 rowStream = 153 myResourceTableDao 154 .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds( 155 theStart, theEnd, theRequestPartitionId.getPartitionIds()); 156 } 157 158 return rowStream.map(Batch2DaoSvcImpl::typedPidFromQueryArray); 159 } 160 161 @Deprecated(since = "6.11", forRemoval = true) // delete once the default method in the interface is gone. 162 @Override 163 public IResourcePidList fetchResourceIdsPage( 164 Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) { 165 Validate.isTrue(false, "Unimplemented"); 166 return null; 167 } 168 169 private static void validateUrl(@Nonnull String theUrl) { 170 if (!theUrl.contains("?")) { 171 throw new InternalErrorException(Msg.code(2422) + "this should never happen: URL is missing a '?'"); 172 } 173 } 174 175 @Nonnull 176 private SearchParameterMap parseQuery(String theUrl) { 177 String resourceType = theUrl.substring(0, theUrl.indexOf('?')); 178 RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType); 179 180 SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theUrl, def); 181 // this matches idx_res_type_del_updated 182 searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED).setChain(new SortSpec(Constants.PARAM_PID))); 183 // TODO this limits us to 2G resources. 184 searchParamMap.setLoadSynchronousUpTo(Integer.MAX_VALUE); 185 return searchParamMap; 186 } 187}