001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.reindex; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.model.RequestPartitionId; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.api.pid.IResourcePidList; 029import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; 030import ca.uhn.fhir.jpa.api.pid.StreamTemplate; 031import ca.uhn.fhir.jpa.api.pid.TypedResourcePid; 032import ca.uhn.fhir.jpa.api.pid.TypedResourceStream; 033import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; 034import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; 035import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 036import ca.uhn.fhir.jpa.model.dao.JpaPid; 037import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 038import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 039import ca.uhn.fhir.rest.api.Constants; 040import ca.uhn.fhir.rest.api.SortSpec; 041import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 042import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 043import ca.uhn.fhir.util.DateRangeUtil; 044import jakarta.annotation.Nonnull; 045import jakarta.annotation.Nullable; 046import org.apache.commons.lang3.Validate; 047 048import java.util.Date; 049import java.util.function.Supplier; 050import java.util.stream.Stream; 051 052public class Batch2DaoSvcImpl implements IBatch2DaoSvc { 053 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(Batch2DaoSvcImpl.class); 054 055 private final IResourceTableDao myResourceTableDao; 056 057 private final MatchUrlService myMatchUrlService; 058 059 private final DaoRegistry myDaoRegistry; 060 061 private final FhirContext myFhirContext; 062 063 private final IHapiTransactionService myTransactionService; 064 065 @Override 066 public boolean isAllResourceTypeSupported() { 067 return true; 068 } 069 070 public Batch2DaoSvcImpl( 071 IResourceTableDao theResourceTableDao, 072 MatchUrlService theMatchUrlService, 073 DaoRegistry theDaoRegistry, 074 FhirContext theFhirContext, 075 IHapiTransactionService theTransactionService) { 076 myResourceTableDao = theResourceTableDao; 077 myMatchUrlService = theMatchUrlService; 078 myDaoRegistry = theDaoRegistry; 079 myFhirContext = theFhirContext; 080 myTransactionService = theTransactionService; 081 } 082 083 @Override 084 public IResourcePidStream fetchResourceIdStream( 085 Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId, String theUrl) { 086 if (theUrl == null) { 087 return makeStreamResult( 088 theRequestPartitionId, () -> streamResourceIdsNoUrl(theStart, theEnd, theRequestPartitionId)); 089 } else { 090 return makeStreamResult( 091 theRequestPartitionId, 092 () -> streamResourceIdsWithUrl(theStart, theEnd, theUrl, theRequestPartitionId)); 093 } 094 } 095 096 private Stream<TypedResourcePid> streamResourceIdsWithUrl( 097 Date theStart, Date theEnd, String theUrl, RequestPartitionId theRequestPartitionId) { 098 validateUrl(theUrl); 099 100 SearchParameterMap searchParamMap = parseQuery(theUrl); 101 searchParamMap.setLastUpdated(DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd)); 102 103 String resourceType = theUrl.substring(0, theUrl.indexOf('?')); 104 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType); 105 106 SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId); 107 108 return dao.searchForIdStream(searchParamMap, request, null).map(pid -> new TypedResourcePid(resourceType, pid)); 109 } 110 111 private static TypedResourcePid typedPidFromQueryArray(Object[] thePidTypeDateArray) { 112 String resourceType = (String) thePidTypeDateArray[1]; 113 Long pid = (Long) thePidTypeDateArray[0]; 114 return new TypedResourcePid(resourceType, JpaPid.fromId(pid)); 115 } 116 117 @Nonnull 118 private TypedResourceStream makeStreamResult( 119 RequestPartitionId theRequestPartitionId, Supplier<Stream<TypedResourcePid>> streamSupplier) { 120 121 IHapiTransactionService.IExecutionBuilder txSettings = 122 myTransactionService.withSystemRequest().withRequestPartitionId(theRequestPartitionId); 123 124 StreamTemplate<TypedResourcePid> streamTemplate = 125 StreamTemplate.fromSupplier(streamSupplier).withTransactionAdvice(txSettings); 126 127 return new TypedResourceStream(theRequestPartitionId, streamTemplate); 128 } 129 130 @Nonnull 131 private Stream<TypedResourcePid> streamResourceIdsNoUrl( 132 Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) { 133 Stream<Object[]> rowStream; 134 if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) { 135 ourLog.debug("Search for resources - all partitions"); 136 rowStream = myResourceTableDao.streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest( 137 theStart, theEnd); 138 } else if (theRequestPartitionId.isDefaultPartition()) { 139 ourLog.debug("Search for resources - default partition"); 140 rowStream = 141 myResourceTableDao 142 .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition( 143 theStart, theEnd); 144 } else { 145 ourLog.debug("Search for resources - partition {}", theRequestPartitionId); 146 rowStream = 147 myResourceTableDao 148 .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds( 149 theStart, theEnd, theRequestPartitionId.getPartitionIds()); 150 } 151 152 return rowStream.map(Batch2DaoSvcImpl::typedPidFromQueryArray); 153 } 154 155 @Deprecated(since = "6.11", forRemoval = true) // delete once the default method in the interface is gone. 156 @Override 157 public IResourcePidList fetchResourceIdsPage( 158 Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) { 159 Validate.isTrue(false, "Unimplemented"); 160 return null; 161 } 162 163 private static void validateUrl(@Nonnull String theUrl) { 164 if (!theUrl.contains("?")) { 165 throw new InternalErrorException(Msg.code(2422) + "this should never happen: URL is missing a '?'"); 166 } 167 } 168 169 @Nonnull 170 private SearchParameterMap parseQuery(String theUrl) { 171 String resourceType = theUrl.substring(0, theUrl.indexOf('?')); 172 RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType); 173 174 SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theUrl, def); 175 // this matches idx_res_type_del_updated 176 searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED).setChain(new SortSpec(Constants.PARAM_PID))); 177 // TODO this limits us to 2G resources. 178 searchParamMap.setLoadSynchronousUpTo(Integer.MAX_VALUE); 179 return searchParamMap; 180 } 181}