001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.interceptor;
021
022import ca.uhn.fhir.interceptor.api.Hook;
023import ca.uhn.fhir.interceptor.api.Interceptor;
024import ca.uhn.fhir.interceptor.api.Pointcut;
025import ca.uhn.fhir.interceptor.model.TransactionWriteOperationsDetails;
026import ca.uhn.fhir.jpa.util.MemoryCacheService;
027import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
028import ca.uhn.fhir.sl.cache.Cache;
029import ca.uhn.fhir.sl.cache.CacheFactory;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.concurrent.Semaphore;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.stream.Collectors;
041
042/**
043 * This interceptor uses semaphores to avoid multiple concurrent FHIR transaction
044 * bundles from processing the same records at the same time, avoiding concurrency
045 * issues.
046 */
047@Interceptor
048public class TransactionConcurrencySemaphoreInterceptor {
049
050        private static final Logger ourLog = LoggerFactory.getLogger(TransactionConcurrencySemaphoreInterceptor.class);
051        private static final String HELD_SEMAPHORES =
052                        TransactionConcurrencySemaphoreInterceptor.class.getName() + "_HELD_SEMAPHORES";
053        private final Cache<String, Semaphore> mySemaphoreCache;
054        private final MemoryCacheService myMemoryCacheService;
055        private boolean myLogWaits;
056        private final Semaphore myLockingSemaphore = new Semaphore(1);
057
058        /**
059         * Constructor
060         */
061        public TransactionConcurrencySemaphoreInterceptor(MemoryCacheService theMemoryCacheService) {
062                myMemoryCacheService = theMemoryCacheService;
063                mySemaphoreCache = CacheFactory.build(TimeUnit.MINUTES.toMillis(1));
064        }
065
066        /**
067         * Should the interceptor log if a wait for a semaphore is required
068         */
069        public boolean isLogWaits() {
070                return myLogWaits;
071        }
072
073        /**
074         * Should the interceptor log if a wait for a semaphore is required
075         */
076        public void setLogWaits(boolean theLogWaits) {
077                myLogWaits = theLogWaits;
078        }
079
080        @Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE)
081        public void pre(
082                        TransactionDetails theTransactionDetails, TransactionWriteOperationsDetails theWriteOperationsDetails) {
083                List<Semaphore> heldSemaphores = new ArrayList<>();
084                Map<String, Semaphore> pendingAndHeldSemaphores = new HashMap<>();
085
086                AtomicBoolean locked = new AtomicBoolean(false);
087                try {
088                        acquireSemaphoresForUrlList(
089                                        locked,
090                                        heldSemaphores,
091                                        pendingAndHeldSemaphores,
092                                        theWriteOperationsDetails.getUpdateRequestUrls(),
093                                        false);
094                        acquireSemaphoresForUrlList(
095                                        locked,
096                                        heldSemaphores,
097                                        pendingAndHeldSemaphores,
098                                        theWriteOperationsDetails.getConditionalCreateRequestUrls(),
099                                        true);
100
101                        pendingAndHeldSemaphores.keySet().removeIf(k -> pendingAndHeldSemaphores.get(k) == null);
102                        if (!pendingAndHeldSemaphores.isEmpty()) {
103                                if (isLogWaits()) {
104                                        ourLog.info(
105                                                        "Waiting to acquire write semaphore for URLs:{}{}",
106                                                        (pendingAndHeldSemaphores.size() > 1 ? "\n * " : ""),
107                                                        (pendingAndHeldSemaphores.keySet().stream().sorted().collect(Collectors.joining("\n * "))));
108                                }
109                                for (Map.Entry<String, Semaphore> nextEntry : pendingAndHeldSemaphores.entrySet()) {
110                                        Semaphore nextSemaphore = nextEntry.getValue();
111                                        try {
112                                                if (nextSemaphore.tryAcquire(10, TimeUnit.SECONDS)) {
113                                                        ourLog.trace("Acquired semaphore {} on request URL: {}", nextSemaphore, nextEntry.getKey());
114                                                        heldSemaphores.add(nextSemaphore);
115                                                } else {
116                                                        ourLog.warn(
117                                                                        "Timed out waiting for semaphore {} on request URL: {}",
118                                                                        nextSemaphore,
119                                                                        nextEntry.getKey());
120                                                        break;
121                                                }
122                                        } catch (InterruptedException e) {
123                                                Thread.currentThread().interrupt();
124                                                break;
125                                        }
126                                }
127                        }
128
129                        theTransactionDetails.putUserData(HELD_SEMAPHORES, heldSemaphores);
130
131                } finally {
132                        if (locked.get()) {
133                                myLockingSemaphore.release();
134                        }
135                }
136        }
137
138        private void acquireSemaphoresForUrlList(
139                        AtomicBoolean theLocked,
140                        List<Semaphore> theHeldSemaphores,
141                        Map<String, Semaphore> thePendingAndHeldSemaphores,
142                        List<String> urls,
143                        boolean isConditionalCreates) {
144                for (String nextUrl : urls) {
145
146                        if (isConditionalCreates) {
147                                if (myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.MATCH_URL, nextUrl) != null) {
148                                        continue;
149                                }
150                        }
151
152                        Semaphore semaphore = mySemaphoreCache.get(nextUrl, t -> new Semaphore(1));
153                        if (thePendingAndHeldSemaphores.containsKey(nextUrl)) {
154                                continue;
155                        }
156
157                        if (!theLocked.get()) {
158                                myLockingSemaphore.acquireUninterruptibly();
159                                theLocked.set(true);
160                        }
161
162                        assert semaphore != null;
163                        if (semaphore.tryAcquire()) {
164                                ourLog.trace("Acquired semaphore {} on request URL: {}", semaphore, nextUrl);
165                                theHeldSemaphores.add(semaphore);
166                                thePendingAndHeldSemaphores.put(nextUrl, null);
167                        } else {
168                                thePendingAndHeldSemaphores.put(nextUrl, semaphore);
169                        }
170                }
171        }
172
173        @Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_POST)
174        public void post(TransactionDetails theTransactionDetails) {
175                List<Semaphore> heldSemaphores = theTransactionDetails.getUserData(HELD_SEMAPHORES);
176                for (Semaphore next : heldSemaphores) {
177                        ourLog.trace("Releasing semaphore {}", next);
178                        next.release();
179                }
180        }
181
182        /**
183         * Clear all semaphors from the list. This is really mostly intended for testing scenarios.
184         */
185        public void clearSemaphores() {
186                mySemaphoreCache.invalidateAll();
187        }
188
189        /**
190         * Returns a count of all semaphores currently in the cache (incuding held and unheld semaphores)
191         */
192        public long countSemaphores() {
193                return mySemaphoreCache.estimatedSize();
194        }
195}