001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.search;
021
022import com.google.gson.Gson;
023import com.google.gson.JsonArray;
024import com.google.gson.JsonObject;
025import jakarta.annotation.Nonnull;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import java.util.List;
030import java.util.stream.Collectors;
031import java.util.stream.Stream;
032import java.util.stream.StreamSupport;
033
034import static ca.uhn.fhir.jpa.model.search.HSearchIndexWriter.SEARCH_PARAM_ROOT;
035
036/**
037 * Builds lastN aggregation, and parse the results
038 */
039public class LastNAggregation {
040        private static final Logger ourLog = LoggerFactory.getLogger(LastNAggregation.class);
041        static final String SP_SUBJECT = SEARCH_PARAM_ROOT + ".subject.reference.value";
042        private static final String SP_CODE_TOKEN_CODE_AND_SYSTEM = SEARCH_PARAM_ROOT + ".code.token.code-system";
043        private static final String SP_DATE_DT_UPPER = SEARCH_PARAM_ROOT + ".date.dt.upper";
044        private static final String GROUP_BY_CODE_SYSTEM_SUB_AGGREGATION = "group_by_code_system";
045        private static final String MOST_RECENT_EFFECTIVE_SUB_AGGREGATION = "most_recent_effective";
046
047        private final int myLastNMax;
048        private final boolean myAggregateOnSubject;
049        private final Gson myJsonParser = new Gson();
050
051        public LastNAggregation(int theLastNMax, boolean theAggregateOnSubject) {
052                myLastNMax = theLastNMax;
053                myAggregateOnSubject = theAggregateOnSubject;
054        }
055
056        /**
057         * Aggregation template json.
058         * <p>
059         * https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html
060         */
061        public JsonObject toAggregation() {
062                JsonObject lastNAggregation = myJsonParser.fromJson(
063                                "{" + "   \"terms\":{"
064                                                + "      \"field\":\""
065                                                + SP_CODE_TOKEN_CODE_AND_SYSTEM + "\"," + "      \"size\":10000,"
066                                                + "      \"min_doc_count\":1"
067                                                + "   },"
068                                                + "   \"aggs\":{"
069                                                + "      \""
070                                                + MOST_RECENT_EFFECTIVE_SUB_AGGREGATION + "\":{" + "         \"top_hits\":{"
071                                                + "            \"size\":"
072                                                + myLastNMax + "," + "            \"sort\":["
073                                                + "               {"
074                                                + "                  \""
075                                                + SP_DATE_DT_UPPER + "\":{" + "                     \"order\":\"desc\""
076                                                + "                  }"
077                                                + "               }"
078                                                + "            ],"
079                                                + "            \"_source\":["
080                                                + "               \"myId\""
081                                                + "            ]"
082                                                + "         }"
083                                                + "      }"
084                                                + "   }"
085                                                + "}",
086                                JsonObject.class);
087                if (myAggregateOnSubject) {
088                        lastNAggregation = myJsonParser.fromJson(
089                                        "{" + "  \"terms\": {"
090                                                        + "    \"field\": \""
091                                                        + SP_SUBJECT + "\"," + "    \"size\": 10000,"
092                                                        + "    \"min_doc_count\": 1"
093                                                        + "  },"
094                                                        + "  \"aggs\": {"
095                                                        + "    \""
096                                                        + GROUP_BY_CODE_SYSTEM_SUB_AGGREGATION + "\": " + myJsonParser.toJson(lastNAggregation) + ""
097                                                        + "  }"
098                                                        + "}",
099                                        JsonObject.class);
100                }
101                return lastNAggregation;
102        }
103
104        /**
105         * Parses the JSONObject aggregation result from ES to extract observation resource ids
106         * E.g aggregation result payload
107         * <pre>
108         * {@code
109         * {
110         *   "doc_count_error_upper_bound": 0,
111         *   "sum_other_doc_count": 0,
112         *   "buckets": [
113         *     {
114         *       "key": "http://mycode.com|code0",
115         *       "doc_count": 45,
116         *       "most_recent_effective": {
117         *         "hits": {
118         *           "total": {
119         *             "value": 45,
120         *             "relation": "eq"
121         *           },
122         *           "max_score": null,
123         *           "hits": [
124         *             {
125         *               "_index": "resourcetable-000001",
126         *               "_type": "_doc",
127         *               "_id": "48",
128         *               "_score": null,
129         *               "_source": {
130         *                 "myId": 48
131         *               },
132         *               "sort": [
133         *                 1643673125112
134         *               ]
135         *             }
136         *           ]
137         *         }
138         *       }
139         *     },
140         *     {
141         *       "key": "http://mycode.com|code1",
142         *       "doc_count": 30,
143         *       "most_recent_effective": {
144         *         "hits": {
145         *           "total": {
146         *             "value": 30,
147         *             "relation": "eq"
148         *           },
149         *           "max_score": null,
150         *           "hits": [
151         *             {
152         *               "_index": "resourcetable-000001",
153         *               "_type": "_doc",
154         *               "_id": "58",
155         *               "_score": null,
156         *               "_source": {
157         *                 "myId": 58
158         *               },
159         *               "sort": [
160         *                 1643673125112
161         *               ]
162         *             }
163         *           ]
164         *         }
165         *       }
166         *     }
167         *   ]
168         * }
169         * }
170         * </pre>
171         */
172        public List<Long> extractResourceIds(@Nonnull JsonObject theAggregationResult) {
173                ourLog.trace("extractResourceIds - hasSubject {} aggregation {}", myAggregateOnSubject, theAggregationResult);
174                Stream<JsonObject> resultBuckets = Stream.of(theAggregationResult);
175
176                // was it grouped by subject?
177                if (myAggregateOnSubject) {
178                        resultBuckets = StreamSupport.stream(
179                                                        theAggregationResult.getAsJsonArray("buckets").spliterator(), false)
180                                        .map(bucket -> bucket.getAsJsonObject().getAsJsonObject(GROUP_BY_CODE_SYSTEM_SUB_AGGREGATION));
181                }
182
183                return resultBuckets
184                                .flatMap(grouping ->
185                                                StreamSupport.stream(grouping.getAsJsonArray("buckets").spliterator(), false))
186                                .flatMap(bucket -> {
187                                        JsonArray hits = bucket.getAsJsonObject()
188                                                        .getAsJsonObject(MOST_RECENT_EFFECTIVE_SUB_AGGREGATION)
189                                                        .getAsJsonObject("hits")
190                                                        .getAsJsonArray("hits");
191                                        return StreamSupport.stream(hits.spliterator(), false);
192                                })
193                                .map(hit -> hit.getAsJsonObject()
194                                                .getAsJsonObject("_source")
195                                                .get("myId")
196                                                .getAsLong())
197                                .collect(Collectors.toList());
198        }
199}