001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.lastn; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.jpa.dao.TolerantJsonParser; 025import ca.uhn.fhir.jpa.model.config.PartitionSettings; 026import ca.uhn.fhir.jpa.search.lastn.json.ObservationJson; 027import ca.uhn.fhir.parser.IParser; 028import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; 029import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 030import co.elastic.clients.elasticsearch.ElasticsearchClient; 031import co.elastic.clients.elasticsearch._types.FieldValue; 032import co.elastic.clients.elasticsearch.core.SearchRequest; 033import co.elastic.clients.elasticsearch.core.SearchResponse; 034import co.elastic.clients.elasticsearch.core.search.Hit; 035import co.elastic.clients.elasticsearch.indices.ExistsRequest; 036import com.google.common.annotations.VisibleForTesting; 037import jakarta.annotation.Nullable; 038import org.hl7.fhir.instance.model.api.IBaseResource; 039import org.springframework.beans.factory.annotation.Autowired; 040 041import java.io.BufferedReader; 042import java.io.IOException; 043import java.io.InputStreamReader; 044import java.io.StringReader; 045import java.util.Collection; 046import java.util.List; 047import java.util.stream.Collectors; 048 049public class ElasticsearchSvcImpl implements IElasticsearchSvc { 050 051 // Index Constants 052 public static final String OBSERVATION_INDEX = "observation_index"; 053 public static final String OBSERVATION_CODE_INDEX = "code_index"; 054 public static final String OBSERVATION_INDEX_SCHEMA_FILE = "ObservationIndexSchema.json"; 055 public static final String OBSERVATION_CODE_INDEX_SCHEMA_FILE = "ObservationCodeIndexSchema.json"; 056 057 // Aggregation Constants 058 059 // Observation index document element names 060 private static final String OBSERVATION_IDENTIFIER_FIELD_NAME = "identifier"; 061 062 // Code index document element names 063 private static final String CODE_HASH = "codingcode_system_hash"; 064 private static final String CODE_TEXT = "text"; 065 066 private static final String OBSERVATION_RESOURCE_NAME = "Observation"; 067 068 private final ElasticsearchClient myRestHighLevelClient; 069 070 @Autowired 071 private FhirContext myContext; 072 073 // This constructor used to inject a dummy partitionsettings in test. 074 public ElasticsearchSvcImpl( 075 PartitionSettings thePartitionSetings, 076 String theProtocol, 077 String theHostname, 078 @Nullable String theUsername, 079 @Nullable String thePassword) { 080 this(theProtocol, theHostname, theUsername, thePassword); 081 } 082 083 public ElasticsearchSvcImpl( 084 String theProtocol, String theHostname, @Nullable String theUsername, @Nullable String thePassword) { 085 086 myRestHighLevelClient = ElasticsearchRestClientFactory.createElasticsearchHighLevelRestClient( 087 theProtocol, theHostname, theUsername, thePassword); 088 089 try { 090 createObservationIndexIfMissing(); 091 createObservationCodeIndexIfMissing(); 092 } catch (IOException theE) { 093 throw new RuntimeException(Msg.code(1175) + "Failed to create document index", theE); 094 } 095 } 096 097 private String getIndexSchema(String theSchemaFileName) throws IOException { 098 InputStreamReader input = 099 new InputStreamReader(ElasticsearchSvcImpl.class.getResourceAsStream(theSchemaFileName)); 100 BufferedReader reader = new BufferedReader(input); 101 StringBuilder sb = new StringBuilder(); 102 String str; 103 while ((str = reader.readLine()) != null) { 104 sb.append(str); 105 } 106 107 return sb.toString(); 108 } 109 110 private void createObservationIndexIfMissing() throws IOException { 111 if (indexExists(OBSERVATION_INDEX)) { 112 return; 113 } 114 String observationMapping = getIndexSchema(OBSERVATION_INDEX_SCHEMA_FILE); 115 if (!createIndex(OBSERVATION_INDEX, observationMapping)) { 116 throw new RuntimeException(Msg.code(1176) + "Failed to create observation index"); 117 } 118 } 119 120 private void createObservationCodeIndexIfMissing() throws IOException { 121 if (indexExists(OBSERVATION_CODE_INDEX)) { 122 return; 123 } 124 String observationCodeMapping = getIndexSchema(OBSERVATION_CODE_INDEX_SCHEMA_FILE); 125 if (!createIndex(OBSERVATION_CODE_INDEX, observationCodeMapping)) { 126 throw new RuntimeException(Msg.code(1177) + "Failed to create observation code index"); 127 } 128 } 129 130 private boolean createIndex(String theIndexName, String theMapping) throws IOException { 131 return myRestHighLevelClient 132 .indices() 133 .create(cir -> cir.index(theIndexName).withJson(new StringReader(theMapping))) 134 .acknowledged(); 135 } 136 137 private boolean indexExists(String theIndexName) throws IOException { 138 ExistsRequest request = new ExistsRequest.Builder().index(theIndexName).build(); 139 return myRestHighLevelClient.indices().exists(request).value(); 140 } 141 142 @Override 143 public void close() throws IOException { 144 // nothing 145 } 146 147 @Override 148 public List<IBaseResource> getObservationResources(Collection<? extends IResourcePersistentId> thePids) { 149 SearchRequest searchRequest = buildObservationResourceSearchRequest(thePids); 150 try { 151 SearchResponse<ObservationJson> observationDocumentResponse = 152 myRestHighLevelClient.search(searchRequest, ObservationJson.class); 153 List<Hit<ObservationJson>> observationDocumentHits = 154 observationDocumentResponse.hits().hits(); 155 IParser parser = TolerantJsonParser.createWithLenientErrorHandling(myContext, null); 156 Class<? extends IBaseResource> resourceType = 157 myContext.getResourceDefinition(OBSERVATION_RESOURCE_NAME).getImplementingClass(); 158 /** 159 * @see ca.uhn.fhir.jpa.dao.BaseHapiFhirDao#toResource(Class, IBaseResourceEntity, Collection, boolean) for 160 * details about parsing raw json to BaseResource 161 */ 162 return observationDocumentHits.stream() 163 .map(Hit::source) 164 .map(observationJson -> parser.parseResource(resourceType, observationJson.getResource())) 165 .collect(Collectors.toList()); 166 } catch (IOException theE) { 167 throw new InvalidRequestException( 168 Msg.code(2003) + "Unable to execute observation document query for provided IDs " + thePids, theE); 169 } 170 } 171 172 private SearchRequest buildObservationResourceSearchRequest(Collection<? extends IResourcePersistentId> thePids) { 173 List<FieldValue> values = thePids.stream() 174 .map(Object::toString) 175 .map(v -> FieldValue.of(v)) 176 .collect(Collectors.toList()); 177 178 return SearchRequest.of(sr -> sr.index(OBSERVATION_INDEX) 179 .query(qb -> qb.bool(bb -> bb.must(bbm -> { 180 bbm.terms(terms -> 181 terms.field(OBSERVATION_IDENTIFIER_FIELD_NAME).terms(termsb -> termsb.value(values))); 182 return bbm; 183 }))) 184 .size(thePids.size())); 185 } 186 187 @VisibleForTesting 188 public void refreshIndex(String theIndexName) throws IOException { 189 myRestHighLevelClient.indices().refresh(fn -> fn.index(theIndexName)); 190 } 191}