
001/* 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.deliver.websocket; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; 024import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelWithHandlers; 025import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 026import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 027import org.hl7.fhir.instance.model.api.IIdType; 028import org.hl7.fhir.r4.model.IdType; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.springframework.beans.factory.annotation.Autowired; 032import org.springframework.messaging.Message; 033import org.springframework.messaging.MessageHandler; 034import org.springframework.messaging.MessagingException; 035import org.springframework.web.socket.CloseStatus; 036import org.springframework.web.socket.TextMessage; 037import org.springframework.web.socket.WebSocketHandler; 038import org.springframework.web.socket.WebSocketSession; 039import org.springframework.web.socket.handler.TextWebSocketHandler; 040 041import java.io.IOException; 042import javax.annotation.PostConstruct; 043import javax.annotation.PreDestroy; 044 045public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements WebSocketHandler { 046 private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); 047 048 @Autowired 049 protected WebsocketConnectionValidator myWebsocketConnectionValidator; 050 051 @Autowired 052 SubscriptionChannelRegistry mySubscriptionChannelRegistry; 053 054 /** 055 * Constructor 056 */ 057 public SubscriptionWebsocketHandler() { 058 super(); 059 } 060 061 private IState myState = new InitialState(); 062 063 @Override 064 public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception { 065 super.afterConnectionClosed(theSession, theStatus); 066 ourLog.info("Closing WebSocket connection from {}", theSession.getRemoteAddress()); 067 } 068 069 @Override 070 public void afterConnectionEstablished(WebSocketSession theSession) throws Exception { 071 super.afterConnectionEstablished(theSession); 072 ourLog.info("Incoming WebSocket connection from {}", theSession.getRemoteAddress()); 073 } 074 075 protected void handleFailure(Exception theE) { 076 ourLog.error("Failure during communication", theE); 077 } 078 079 @Override 080 protected void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) throws Exception { 081 ourLog.info("Textmessage: " + theMessage.getPayload()); 082 myState.handleTextMessage(theSession, theMessage); 083 } 084 085 @Override 086 public void handleTransportError(WebSocketSession theSession, Throwable theException) throws Exception { 087 super.handleTransportError(theSession, theException); 088 ourLog.error("Transport error", theException); 089 } 090 091 @PostConstruct 092 public synchronized void postConstruct() { 093 ourLog.info("Websocket connection has been created"); 094 } 095 096 @PreDestroy 097 public synchronized void preDescroy() { 098 ourLog.info("Websocket connection is closing"); 099 IState state = myState; 100 if (state != null) { 101 state.closing(); 102 } 103 } 104 105 private interface IState { 106 107 void closing(); 108 109 void handleTextMessage(WebSocketSession theSession, TextMessage theMessage); 110 } 111 112 private class BoundStaticSubscriptionState implements IState, MessageHandler { 113 114 private final WebSocketSession mySession; 115 private final ActiveSubscription myActiveSubscription; 116 117 public BoundStaticSubscriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) { 118 mySession = theSession; 119 myActiveSubscription = theActiveSubscription; 120 121 SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = 122 mySubscriptionChannelRegistry.getDeliveryReceiverChannel(theActiveSubscription.getChannelName()); 123 subscriptionChannelWithHandlers.addHandler(this); 124 } 125 126 @Override 127 public void closing() { 128 SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = 129 mySubscriptionChannelRegistry.getDeliveryReceiverChannel(myActiveSubscription.getChannelName()); 130 subscriptionChannelWithHandlers.removeHandler(this); 131 } 132 133 private void deliver() { 134 try { 135 String payload = "ping " + myActiveSubscription.getId(); 136 ourLog.info("Sending WebSocket message: {}", payload); 137 mySession.sendMessage(new TextMessage(payload)); 138 } catch (IOException e) { 139 handleFailure(e); 140 } 141 } 142 143 @Override 144 public void handleMessage(Message<?> theMessage) { 145 if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { 146 return; 147 } 148 try { 149 ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); 150 if (myActiveSubscription.getSubscription().equals(msg.getSubscription())) { 151 deliver(); 152 } 153 } catch (Exception e) { 154 ourLog.error("Failure handling subscription payload", e); 155 throw new MessagingException(theMessage, Msg.code(6) + "Failure handling subscription payload", e); 156 } 157 } 158 159 @Override 160 public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { 161 try { 162 theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload())); 163 } catch (IOException e) { 164 handleFailure(e); 165 } 166 } 167 } 168 169 private class InitialState implements IState { 170 171 private IIdType bindSimple(WebSocketSession theSession, String theBindString) { 172 IdType id = new IdType(theBindString); 173 174 WebsocketValidationResponse response = myWebsocketConnectionValidator.validate(id); 175 if (!response.isValid()) { 176 try { 177 ourLog.warn(response.getMessage()); 178 theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), response.getMessage())); 179 } catch (IOException e) { 180 handleFailure(e); 181 } 182 return null; 183 } 184 185 myState = new BoundStaticSubscriptionState(theSession, response.getActiveSubscription()); 186 187 return id; 188 } 189 190 @Override 191 public void closing() { 192 // nothing 193 } 194 195 @Override 196 public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { 197 String message = theMessage.getPayload(); 198 if (message.startsWith("bind ")) { 199 String remaining = message.substring("bind ".length()); 200 201 IIdType subscriptionId; 202 subscriptionId = bindSimple(theSession, remaining); 203 if (subscriptionId == null) { 204 return; 205 } 206 207 try { 208 theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart())); 209 } catch (IOException e) { 210 handleFailure(e); 211 } 212 } 213 } 214 } 215}