001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.interceptor; 021 022import ca.uhn.fhir.interceptor.api.Hook; 023import ca.uhn.fhir.interceptor.api.Interceptor; 024import ca.uhn.fhir.interceptor.api.Pointcut; 025import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails; 026import ca.uhn.fhir.jpa.util.MemoryCacheService; 027import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; 028import ca.uhn.fhir.sl.cache.Cache; 029import ca.uhn.fhir.sl.cache.CacheFactory; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import java.util.ArrayList; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.concurrent.Semaphore; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.stream.Collectors; 041 042/** 043 * This interceptor uses semaphores to avoid multiple concurrent FHIR transaction 044 * bundles from processing the same records at the same time, avoiding concurrency 045 * issues. 046 */ 047@Interceptor 048public class TransactionConcurrencySemaphoreInterceptor { 049 050 private static final Logger ourLog = LoggerFactory.getLogger(TransactionConcurrencySemaphoreInterceptor.class); 051 private static final String HELD_SEMAPHORES = 052 TransactionConcurrencySemaphoreInterceptor.class.getName() + "_HELD_SEMAPHORES"; 053 private final Cache<String, Semaphore> mySemaphoreCache; 054 private final MemoryCacheService myMemoryCacheService; 055 private boolean myLogWaits; 056 private final Semaphore myLockingSemaphore = new Semaphore(1); 057 058 /** 059 * Constructor 060 */ 061 public TransactionConcurrencySemaphoreInterceptor(MemoryCacheService theMemoryCacheService) { 062 myMemoryCacheService = theMemoryCacheService; 063 mySemaphoreCache = CacheFactory.build(TimeUnit.MINUTES.toMillis(1)); 064 } 065 066 /** 067 * Should the interceptor log if a wait for a semaphore is required 068 */ 069 public boolean isLogWaits() { 070 return myLogWaits; 071 } 072 073 /** 074 * Should the interceptor log if a wait for a semaphore is required 075 */ 076 public void setLogWaits(boolean theLogWaits) { 077 myLogWaits = theLogWaits; 078 } 079 080 @Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE) 081 public void pre( 082 TransactionDetails theTransactionDetails, TransactionWriteOperationsDetails theWriteOperationsDetails) { 083 List<Semaphore> heldSemaphores = new ArrayList<>(); 084 Map<String, Semaphore> pendingAndHeldSemaphores = new HashMap<>(); 085 086 AtomicBoolean locked = new AtomicBoolean(false); 087 try { 088 acquireSemaphoresForUrlList( 089 locked, 090 heldSemaphores, 091 pendingAndHeldSemaphores, 092 theWriteOperationsDetails.getUpdateRequestUrls(), 093 false); 094 acquireSemaphoresForUrlList( 095 locked, 096 heldSemaphores, 097 pendingAndHeldSemaphores, 098 theWriteOperationsDetails.getConditionalCreateRequestUrls(), 099 true); 100 101 pendingAndHeldSemaphores.keySet().removeIf(k -> pendingAndHeldSemaphores.get(k) == null); 102 if (!pendingAndHeldSemaphores.isEmpty()) { 103 if (isLogWaits()) { 104 ourLog.info( 105 "Waiting to acquire write semaphore for URLs:{}{}", 106 (pendingAndHeldSemaphores.size() > 1 ? "\n * " : ""), 107 (pendingAndHeldSemaphores.keySet().stream().sorted().collect(Collectors.joining("\n * ")))); 108 } 109 for (Map.Entry<String, Semaphore> nextEntry : pendingAndHeldSemaphores.entrySet()) { 110 Semaphore nextSemaphore = nextEntry.getValue(); 111 try { 112 if (nextSemaphore.tryAcquire(10, TimeUnit.SECONDS)) { 113 ourLog.trace("Acquired semaphore {} on request URL: {}", nextSemaphore, nextEntry.getKey()); 114 heldSemaphores.add(nextSemaphore); 115 } else { 116 ourLog.warn( 117 "Timed out waiting for semaphore {} on request URL: {}", 118 nextSemaphore, 119 nextEntry.getKey()); 120 break; 121 } 122 } catch (InterruptedException e) { 123 Thread.currentThread().interrupt(); 124 break; 125 } 126 } 127 } 128 129 theTransactionDetails.putUserData(HELD_SEMAPHORES, heldSemaphores); 130 131 } finally { 132 if (locked.get()) { 133 myLockingSemaphore.release(); 134 } 135 } 136 } 137 138 private void acquireSemaphoresForUrlList( 139 AtomicBoolean theLocked, 140 List<Semaphore> theHeldSemaphores, 141 Map<String, Semaphore> thePendingAndHeldSemaphores, 142 List<String> urls, 143 boolean isConditionalCreates) { 144 for (String nextUrl : urls) { 145 146 if (isConditionalCreates) { 147 if (myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.MATCH_URL, nextUrl) != null) { 148 continue; 149 } 150 } 151 152 Semaphore semaphore = mySemaphoreCache.get(nextUrl, t -> new Semaphore(1)); 153 if (thePendingAndHeldSemaphores.containsKey(nextUrl)) { 154 continue; 155 } 156 157 if (!theLocked.get()) { 158 myLockingSemaphore.acquireUninterruptibly(); 159 theLocked.set(true); 160 } 161 162 assert semaphore != null; 163 if (semaphore.tryAcquire()) { 164 ourLog.trace("Acquired semaphore {} on request URL: {}", semaphore, nextUrl); 165 theHeldSemaphores.add(semaphore); 166 thePendingAndHeldSemaphores.put(nextUrl, null); 167 } else { 168 thePendingAndHeldSemaphores.put(nextUrl, semaphore); 169 } 170 } 171 } 172 173 @Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST) 174 public void post(TransactionDetails theTransactionDetails) { 175 List<Semaphore> heldSemaphores = theTransactionDetails.getUserData(HELD_SEMAPHORES); 176 for (Semaphore next : heldSemaphores) { 177 ourLog.trace("Releasing semaphore {}", next); 178 next.release(); 179 } 180 } 181 182 /** 183 * Clear all semaphors from the list. This is really mostly intended for testing scenarios. 184 */ 185 public void clearSemaphores() { 186 mySemaphoreCache.invalidateAll(); 187 } 188 189 /** 190 * Returns a count of all semaphores currently in the cache (incuding held and unheld semaphores) 191 */ 192 public long countSemaphores() { 193 return mySemaphoreCache.estimatedSize(); 194 } 195}