001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.lastn;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.jpa.dao.TolerantJsonParser;
025import ca.uhn.fhir.jpa.model.config.PartitionSettings;
026import ca.uhn.fhir.jpa.search.lastn.json.ObservationJson;
027import ca.uhn.fhir.parser.IParser;
028import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
029import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
030import co.elastic.clients.elasticsearch.ElasticsearchClient;
031import co.elastic.clients.elasticsearch._types.FieldValue;
032import co.elastic.clients.elasticsearch.core.SearchRequest;
033import co.elastic.clients.elasticsearch.core.SearchResponse;
034import co.elastic.clients.elasticsearch.core.search.Hit;
035import co.elastic.clients.elasticsearch.indices.ExistsRequest;
036import com.google.common.annotations.VisibleForTesting;
037import jakarta.annotation.Nullable;
038import org.hl7.fhir.instance.model.api.IBaseResource;
039import org.springframework.beans.factory.annotation.Autowired;
040
041import java.io.BufferedReader;
042import java.io.IOException;
043import java.io.InputStreamReader;
044import java.io.StringReader;
045import java.util.Collection;
046import java.util.List;
047import java.util.stream.Collectors;
048
049public class ElasticsearchSvcImpl implements IElasticsearchSvc {
050
051        // Index Constants
052        public static final String OBSERVATION_INDEX = "observation_index";
053        public static final String OBSERVATION_CODE_INDEX = "code_index";
054        public static final String OBSERVATION_INDEX_SCHEMA_FILE = "ObservationIndexSchema.json";
055        public static final String OBSERVATION_CODE_INDEX_SCHEMA_FILE = "ObservationCodeIndexSchema.json";
056
057        // Aggregation Constants
058
059        // Observation index document element names
060        private static final String OBSERVATION_IDENTIFIER_FIELD_NAME = "identifier";
061
062        // Code index document element names
063        private static final String CODE_HASH = "codingcode_system_hash";
064        private static final String CODE_TEXT = "text";
065
066        private static final String OBSERVATION_RESOURCE_NAME = "Observation";
067
068        private final ElasticsearchClient myRestHighLevelClient;
069
070        @Autowired
071        private FhirContext myContext;
072
073        // This constructor used to inject a dummy partitionsettings in test.
074        public ElasticsearchSvcImpl(
075                        PartitionSettings thePartitionSetings,
076                        String theProtocol,
077                        String theHostname,
078                        @Nullable String theUsername,
079                        @Nullable String thePassword) {
080                this(theProtocol, theHostname, theUsername, thePassword);
081        }
082
083        public ElasticsearchSvcImpl(
084                        String theProtocol, String theHostname, @Nullable String theUsername, @Nullable String thePassword) {
085
086                myRestHighLevelClient = ElasticsearchRestClientFactory.createElasticsearchHighLevelRestClient(
087                                theProtocol, theHostname, theUsername, thePassword);
088
089                try {
090                        createObservationIndexIfMissing();
091                        createObservationCodeIndexIfMissing();
092                } catch (IOException theE) {
093                        throw new RuntimeException(Msg.code(1175) + "Failed to create document index", theE);
094                }
095        }
096
097        private String getIndexSchema(String theSchemaFileName) throws IOException {
098                InputStreamReader input =
099                                new InputStreamReader(ElasticsearchSvcImpl.class.getResourceAsStream(theSchemaFileName));
100                BufferedReader reader = new BufferedReader(input);
101                StringBuilder sb = new StringBuilder();
102                String str;
103                while ((str = reader.readLine()) != null) {
104                        sb.append(str);
105                }
106
107                return sb.toString();
108        }
109
110        private void createObservationIndexIfMissing() throws IOException {
111                if (indexExists(OBSERVATION_INDEX)) {
112                        return;
113                }
114                String observationMapping = getIndexSchema(OBSERVATION_INDEX_SCHEMA_FILE);
115                if (!createIndex(OBSERVATION_INDEX, observationMapping)) {
116                        throw new RuntimeException(Msg.code(1176) + "Failed to create observation index");
117                }
118        }
119
120        private void createObservationCodeIndexIfMissing() throws IOException {
121                if (indexExists(OBSERVATION_CODE_INDEX)) {
122                        return;
123                }
124                String observationCodeMapping = getIndexSchema(OBSERVATION_CODE_INDEX_SCHEMA_FILE);
125                if (!createIndex(OBSERVATION_CODE_INDEX, observationCodeMapping)) {
126                        throw new RuntimeException(Msg.code(1177) + "Failed to create observation code index");
127                }
128        }
129
130        private boolean createIndex(String theIndexName, String theMapping) throws IOException {
131                return myRestHighLevelClient
132                                .indices()
133                                .create(cir -> cir.index(theIndexName).withJson(new StringReader(theMapping)))
134                                .acknowledged();
135        }
136
137        private boolean indexExists(String theIndexName) throws IOException {
138                ExistsRequest request = new ExistsRequest.Builder().index(theIndexName).build();
139                return myRestHighLevelClient.indices().exists(request).value();
140        }
141
142        @Override
143        public void close() throws IOException {
144                // nothing
145        }
146
147        @Override
148        public List<IBaseResource> getObservationResources(Collection<? extends IResourcePersistentId> thePids) {
149                SearchRequest searchRequest = buildObservationResourceSearchRequest(thePids);
150                try {
151                        SearchResponse<ObservationJson> observationDocumentResponse =
152                                        myRestHighLevelClient.search(searchRequest, ObservationJson.class);
153                        List<Hit<ObservationJson>> observationDocumentHits =
154                                        observationDocumentResponse.hits().hits();
155                        IParser parser = TolerantJsonParser.createWithLenientErrorHandling(myContext, null);
156                        Class<? extends IBaseResource> resourceType =
157                                        myContext.getResourceDefinition(OBSERVATION_RESOURCE_NAME).getImplementingClass();
158                        /**
159                         * @see ca.uhn.fhir.jpa.dao.BaseHapiFhirDao#toResource(Class, IBaseResourceEntity, Collection, boolean) for
160                         * details about parsing raw json to BaseResource
161                         */
162                        return observationDocumentHits.stream()
163                                        .map(Hit::source)
164                                        .map(observationJson -> parser.parseResource(resourceType, observationJson.getResource()))
165                                        .collect(Collectors.toList());
166                } catch (IOException theE) {
167                        throw new InvalidRequestException(
168                                        Msg.code(2003) + "Unable to execute observation document query for provided IDs " + thePids, theE);
169                }
170        }
171
172        private SearchRequest buildObservationResourceSearchRequest(Collection<? extends IResourcePersistentId> thePids) {
173                List<FieldValue> values = thePids.stream()
174                                .map(Object::toString)
175                                .map(v -> FieldValue.of(v))
176                                .collect(Collectors.toList());
177
178                return SearchRequest.of(sr -> sr.index(OBSERVATION_INDEX)
179                                .query(qb -> qb.bool(bb -> bb.must(bbm -> {
180                                        bbm.terms(terms ->
181                                                        terms.field(OBSERVATION_IDENTIFIER_FIELD_NAME).terms(termsb -> termsb.value(values)));
182                                        return bbm;
183                                })))
184                                .size(thePids.size()));
185        }
186
187        @VisibleForTesting
188        public void refreshIndex(String theIndexName) throws IOException {
189                myRestHighLevelClient.indices().refresh(fn -> fn.index(theIndexName));
190        }
191}