001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao.search; 021 022import com.google.gson.Gson; 023import com.google.gson.JsonArray; 024import com.google.gson.JsonObject; 025import jakarta.annotation.Nonnull; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import java.util.List; 030import java.util.stream.Collectors; 031import java.util.stream.Stream; 032import java.util.stream.StreamSupport; 033 034import static ca.uhn.fhir.jpa.model.search.HSearchIndexWriter.SEARCH_PARAM_ROOT; 035 036/** 037 * Builds lastN aggregation, and parse the results 038 */ 039public class LastNAggregation { 040 private static final Logger ourLog = LoggerFactory.getLogger(LastNAggregation.class); 041 static final String SP_SUBJECT = SEARCH_PARAM_ROOT + ".subject.reference.value"; 042 private static final String SP_CODE_TOKEN_CODE_AND_SYSTEM = SEARCH_PARAM_ROOT + ".code.token.code-system"; 043 private static final String SP_DATE_DT_UPPER = SEARCH_PARAM_ROOT + ".date.dt.upper"; 044 private static final String GROUP_BY_CODE_SYSTEM_SUB_AGGREGATION = "group_by_code_system"; 045 private static final String MOST_RECENT_EFFECTIVE_SUB_AGGREGATION = "most_recent_effective"; 046 047 private final int myLastNMax; 048 private final boolean myAggregateOnSubject; 049 private final Gson myJsonParser = new Gson(); 050 051 public LastNAggregation(int theLastNMax, boolean theAggregateOnSubject) { 052 myLastNMax = theLastNMax; 053 myAggregateOnSubject = theAggregateOnSubject; 054 } 055 056 /** 057 * Aggregation template json. 058 * <p> 059 * https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html 060 */ 061 public JsonObject toAggregation() { 062 JsonObject lastNAggregation = myJsonParser.fromJson( 063 "{" + " \"terms\":{" 064 + " \"field\":\"" 065 + SP_CODE_TOKEN_CODE_AND_SYSTEM + "\"," + " \"size\":10000," 066 + " \"min_doc_count\":1" 067 + " }," 068 + " \"aggs\":{" 069 + " \"" 070 + MOST_RECENT_EFFECTIVE_SUB_AGGREGATION + "\":{" + " \"top_hits\":{" 071 + " \"size\":" 072 + myLastNMax + "," + " \"sort\":[" 073 + " {" 074 + " \"" 075 + SP_DATE_DT_UPPER + "\":{" + " \"order\":\"desc\"" 076 + " }" 077 + " }" 078 + " ]," 079 + " \"_source\":[" 080 + " \"myId\"" 081 + " ]" 082 + " }" 083 + " }" 084 + " }" 085 + "}", 086 JsonObject.class); 087 if (myAggregateOnSubject) { 088 lastNAggregation = myJsonParser.fromJson( 089 "{" + " \"terms\": {" 090 + " \"field\": \"" 091 + SP_SUBJECT + "\"," + " \"size\": 10000," 092 + " \"min_doc_count\": 1" 093 + " }," 094 + " \"aggs\": {" 095 + " \"" 096 + GROUP_BY_CODE_SYSTEM_SUB_AGGREGATION + "\": " + myJsonParser.toJson(lastNAggregation) + "" 097 + " }" 098 + "}", 099 JsonObject.class); 100 } 101 return lastNAggregation; 102 } 103 104 /** 105 * Parses the JSONObject aggregation result from ES to extract observation resource ids 106 * E.g aggregation result payload 107 * <pre> 108 * {@code 109 * { 110 * "doc_count_error_upper_bound": 0, 111 * "sum_other_doc_count": 0, 112 * "buckets": [ 113 * { 114 * "key": "http://mycode.com|code0", 115 * "doc_count": 45, 116 * "most_recent_effective": { 117 * "hits": { 118 * "total": { 119 * "value": 45, 120 * "relation": "eq" 121 * }, 122 * "max_score": null, 123 * "hits": [ 124 * { 125 * "_index": "resourcetable-000001", 126 * "_type": "_doc", 127 * "_id": "48", 128 * "_score": null, 129 * "_source": { 130 * "myId": 48 131 * }, 132 * "sort": [ 133 * 1643673125112 134 * ] 135 * } 136 * ] 137 * } 138 * } 139 * }, 140 * { 141 * "key": "http://mycode.com|code1", 142 * "doc_count": 30, 143 * "most_recent_effective": { 144 * "hits": { 145 * "total": { 146 * "value": 30, 147 * "relation": "eq" 148 * }, 149 * "max_score": null, 150 * "hits": [ 151 * { 152 * "_index": "resourcetable-000001", 153 * "_type": "_doc", 154 * "_id": "58", 155 * "_score": null, 156 * "_source": { 157 * "myId": 58 158 * }, 159 * "sort": [ 160 * 1643673125112 161 * ] 162 * } 163 * ] 164 * } 165 * } 166 * } 167 * ] 168 * } 169 * } 170 * </pre> 171 */ 172 public List<Long> extractResourceIds(@Nonnull JsonObject theAggregationResult) { 173 ourLog.trace("extractResourceIds - hasSubject {} aggregation {}", myAggregateOnSubject, theAggregationResult); 174 Stream<JsonObject> resultBuckets = Stream.of(theAggregationResult); 175 176 // was it grouped by subject? 177 if (myAggregateOnSubject) { 178 resultBuckets = StreamSupport.stream( 179 theAggregationResult.getAsJsonArray("buckets").spliterator(), false) 180 .map(bucket -> bucket.getAsJsonObject().getAsJsonObject(GROUP_BY_CODE_SYSTEM_SUB_AGGREGATION)); 181 } 182 183 return resultBuckets 184 .flatMap(grouping -> 185 StreamSupport.stream(grouping.getAsJsonArray("buckets").spliterator(), false)) 186 .flatMap(bucket -> { 187 JsonArray hits = bucket.getAsJsonObject() 188 .getAsJsonObject(MOST_RECENT_EFFECTIVE_SUB_AGGREGATION) 189 .getAsJsonObject("hits") 190 .getAsJsonArray("hits"); 191 return StreamSupport.stream(hits.spliterator(), false); 192 }) 193 .map(hit -> hit.getAsJsonObject() 194 .getAsJsonObject("_source") 195 .get("myId") 196 .getAsLong()) 197 .collect(Collectors.toList()); 198 } 199}