001package ca.uhn.fhir.jpa.bulk.export.job;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.RuntimeSearchParam;
024import ca.uhn.fhir.interceptor.model.RequestPartitionId;
025import ca.uhn.fhir.jpa.batch.log.Logs;
026import ca.uhn.fhir.jpa.dao.IResultIterator;
027import ca.uhn.fhir.jpa.dao.ISearchBuilder;
028import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
029import ca.uhn.fhir.jpa.dao.index.IdHelperService;
030import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
031import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
032import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
033import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
034import ca.uhn.fhir.jpa.util.QueryChunker;
035import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
036import ca.uhn.fhir.model.primitive.IdDt;
037import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
038import ca.uhn.fhir.rest.param.ReferenceOrListParam;
039import ca.uhn.fhir.rest.param.ReferenceParam;
040import org.hl7.fhir.instance.model.api.IBaseResource;
041import org.hl7.fhir.instance.model.api.IIdType;
042import org.hl7.fhir.instance.model.api.IPrimitiveType;
043import org.slf4j.Logger;
044import org.springframework.batch.item.ItemReader;
045import org.springframework.beans.factory.annotation.Autowired;
046import org.springframework.beans.factory.annotation.Value;
047
048import java.util.ArrayList;
049import java.util.HashMap;
050import java.util.HashSet;
051import java.util.Iterator;
052import java.util.List;
053import java.util.Map;
054import java.util.Optional;
055import java.util.Set;
056import java.util.stream.Collectors;
057
058/**
059 * Bulk Item reader for the Group Bulk Export job.
060 * Instead of performing a normal query on the resource type using type filters, we instead
061 *
062 * 1. Get the group ID defined for this job
063 * 2. Expand its membership so we get references to all patients in the group
064 * 3. Optionally further expand that into all MDM-matched Patients (including golden resources)
065 * 4. Then perform normal bulk export, filtered so that only results that refer to members are returned.
066 */
067public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
068        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
069        public static final int QUERY_CHUNK_SIZE = 100;
070
071        @Value("#{jobParameters['" + BulkExportJobConfig.GROUP_ID_PARAMETER + "']}")
072        private String myGroupId;
073        @Value("#{jobParameters['" + BulkExportJobConfig.EXPAND_MDM_PARAMETER+ "'] ?: false}")
074        private boolean myMdmEnabled;
075
076        @Autowired
077        private IdHelperService myIdHelperService;
078        @Autowired
079        private IMdmLinkDao myMdmLinkDao;
080        @Autowired
081        private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
082
083        @Override
084        protected Iterator<ResourcePersistentId> getResourcePidIterator() {
085                Set<ResourcePersistentId> myReadPids = new HashSet<>();
086
087                //Short circuit out if we detect we are attempting to extract patients
088                if (myResourceType.equalsIgnoreCase("Patient")) {
089                        return getExpandedPatientIterator();
090                }
091
092                //First lets expand the group so we get a list of all patient IDs of the group, and MDM-matched patient IDs of the group.
093                Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup();
094                if (ourLog.isDebugEnabled()) {
095                        ourLog.debug("Group/{} has been expanded to members:[{}]", myGroupId, String.join(",", expandedMemberResourceIds));
096                }
097
098                //Next, let's search for the target resources, with their correct patient references, chunked.
099                //The results will be jammed into myReadPids
100                QueryChunker<String> queryChunker = new QueryChunker<>();
101                queryChunker.chunk(new ArrayList<>(expandedMemberResourceIds), QUERY_CHUNK_SIZE, (idChunk) -> {
102                        queryResourceTypeWithReferencesToPatients(myReadPids, idChunk);
103                });
104
105                if (ourLog.isDebugEnabled()) {
106                        ourLog.debug("Resource PIDs to be Bulk Exported: [{}]", myReadPids.stream().map(ResourcePersistentId::toString).collect(Collectors.joining(",")));
107                }
108                return myReadPids.iterator();
109        }
110
111        /**
112         * In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members,
113         * possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
114         */
115        private Iterator<ResourcePersistentId> getExpandedPatientIterator() {
116                List<String> members = getMembers();
117                List<IIdType> ids = members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
118                List<Long> pidsOrThrowException = myIdHelperService.getPidsOrThrowException(ids);
119                Set<Long> patientPidsToExport = new HashSet<>(pidsOrThrowException);
120
121                if (myMdmEnabled) {
122                        SystemRequestDetails srd = SystemRequestDetails.newSystemRequestAllPartitions();
123                        IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), srd);
124                        Long pidOrNull = myIdHelperService.getPidOrNull(group);
125                        List<IMdmLinkDao.MdmPidTuple> goldenPidSourcePidTuple = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
126                        goldenPidSourcePidTuple.forEach(tuple -> {
127                                patientPidsToExport.add(tuple.getGoldenPid());
128                                patientPidsToExport.add(tuple.getSourcePid());
129                        });
130                        populateMdmResourceCache(goldenPidSourcePidTuple);
131                }
132                List<ResourcePersistentId> resourcePersistentIds = patientPidsToExport
133                        .stream()
134                        .map(ResourcePersistentId::new)
135                        .collect(Collectors.toList());
136                return resourcePersistentIds.iterator();
137        }
138
139        /**
140         * @param thePidTuples
141         */
142        private void populateMdmResourceCache(List<IMdmLinkDao.MdmPidTuple> thePidTuples) {
143                if (myMdmExpansionCacheSvc.hasBeenPopulated()) {
144                        return;
145                }
146                //First, convert this zipped set of tuples to a map of
147                //{
148                //   patient/gold-1 -> [patient/1, patient/2]
149                //   patient/gold-2 -> [patient/3, patient/4]
150                //}
151                Map<Long, Set<Long>> goldenResourceToSourcePidMap = new HashMap<>();
152                extract(thePidTuples, goldenResourceToSourcePidMap);
153
154                //Next, lets convert it to an inverted index for fast lookup
155                // {
156                //   patient/1 -> patient/gold-1
157                //   patient/2 -> patient/gold-1
158                //   patient/3 -> patient/gold-2
159                //   patient/4 -> patient/gold-2
160                // }
161                Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>();
162                goldenResourceToSourcePidMap.forEach((key, value) -> {
163                        String goldenResourceId = myIdHelperService.translatePidIdToForcedIdWithCache(new ResourcePersistentId(key)).orElse(key.toString());
164                        Map<Long, Optional<String>> pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value);
165
166                        Set<String> sourceResourceIds = pidsToForcedIds.entrySet().stream()
167                                .map(ent -> ent.getValue().isPresent() ? ent.getValue().get() : ent.getKey().toString())
168                                .collect(Collectors.toSet());
169
170                        sourceResourceIds
171                                .forEach(sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId));
172                });
173
174                //Now that we have built our cached expansion, store it.
175                myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap);
176        }
177
178        /**
179         * Given the local myGroupId, read this group, and find all members' patient references.
180         * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
181         */
182        private List<String> getMembers() {
183                SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
184                IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails);
185                List<IPrimitiveType> evaluate = myContext.newFhirPath().evaluate(group, "member.entity.reference", IPrimitiveType.class);
186                return  evaluate.stream().map(IPrimitiveType::getValueAsString).collect(Collectors.toList());
187        }
188
189        /**
190         * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
191         * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
192         * patients.
193         *
194         * @return a Set of Strings representing the resource IDs of all members of a group.
195         */
196        private Set<String> expandAllPatientPidsFromGroup() {
197                Set<String> expandedIds = new HashSet<>();
198                SystemRequestDetails requestDetails = SystemRequestDetails.newSystemRequestAllPartitions();
199                IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(myGroupId), requestDetails);
200                Long pidOrNull = myIdHelperService.getPidOrNull(group);
201
202                //Attempt to perform MDM Expansion of membership
203                if (myMdmEnabled) {
204                        List<IMdmLinkDao.MdmPidTuple> goldenPidTargetPidTuples = myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
205                        //Now lets translate these pids into resource IDs
206                        Set<Long> uniquePids = new HashSet<>();
207                        goldenPidTargetPidTuples.forEach(tuple -> {
208                                uniquePids.add(tuple.getGoldenPid());
209                                uniquePids.add(tuple.getSourcePid());
210                        });
211                        Map<Long, Optional<String>> pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
212
213                        Map<Long, Set<Long>> goldenResourceToSourcePidMap = new HashMap<>();
214                        extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap);
215                        populateMdmResourceCache(goldenPidTargetPidTuples);
216
217                        //If the result of the translation is an empty optional, it means there is no forced id, and we can use the PID as the resource ID.
218                        Set<String> resolvedResourceIds = pidToForcedIdMap.entrySet().stream()
219                                .map(entry -> entry.getValue().isPresent() ? entry.getValue().get() : entry.getKey().toString())
220                                .collect(Collectors.toSet());
221
222                        expandedIds.addAll(resolvedResourceIds);
223                }
224
225                //Now manually add the members of the group (its possible even with mdm expansion that some members dont have MDM matches,
226                //so would be otherwise skipped
227                expandedIds.addAll(getMembers());
228
229                return expandedIds;
230        }
231
232        private void extract(List<IMdmLinkDao.MdmPidTuple> theGoldenPidTargetPidTuples, Map<Long, Set<Long>> theGoldenResourceToSourcePidMap) {
233                for (IMdmLinkDao.MdmPidTuple goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) {
234                        Long goldenPid = goldenPidTargetPidTuple.getGoldenPid();
235                        Long sourcePid = goldenPidTargetPidTuple.getSourcePid();
236                        theGoldenResourceToSourcePidMap.computeIfAbsent(goldenPid, key -> new HashSet<>()).add(sourcePid);
237                }
238        }
239
240        private void queryResourceTypeWithReferencesToPatients(Set<ResourcePersistentId> myReadPids, List<String> idChunk) {
241                //Build SP map
242                //First, inject the _typeFilters and _since from the export job
243                List<SearchParameterMap> expandedSpMaps = createSearchParameterMapsForResourceType();
244                for (SearchParameterMap expandedSpMap: expandedSpMaps) {
245
246                        //Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we need to manually set that.
247                        validateSearchParameters(expandedSpMap);
248
249                        // Now, further filter the query with patient references defined by the chunk of IDs we have.
250                        filterSearchByResourceIds(idChunk, expandedSpMap);
251
252                        // Fetch and cache a search builder for this resource type
253                        ISearchBuilder searchBuilder = getSearchBuilderForLocalResourceType();
254
255                        //Execute query and all found pids to our local iterator.
256                        IResultIterator resultIterator = searchBuilder.createQuery(expandedSpMap, new SearchRuntimeDetails(null, myJobUUID), null, RequestPartitionId.allPartitions());
257                        while (resultIterator.hasNext()) {
258                                myReadPids.add(resultIterator.next());
259                        }
260                }
261        }
262
263        private void filterSearchByResourceIds(List<String> idChunk, SearchParameterMap expandedSpMap) {
264                ReferenceOrListParam orList =  new ReferenceOrListParam();
265                idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
266                expandedSpMap.add(getPatientSearchParamForCurrentResourceType().getName(), orList);
267        }
268
269        private RuntimeSearchParam validateSearchParameters(SearchParameterMap expandedSpMap) {
270                RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType();
271                if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
272                        throw new IllegalArgumentException(String.format("Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", runtimeSearchParam.getName()));
273                }
274                return runtimeSearchParam;
275        }
276}