View Javadoc
1   package ca.uhn.fhir.jpa.subscription.websocket;
2   
3   /*
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
25  import ca.uhn.fhir.jpa.subscription.ResourceDeliveryMessage;
26  import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
27  import org.hl7.fhir.instance.model.api.IIdType;
28  import org.hl7.fhir.r4.model.IdType;
29  import org.springframework.beans.factory.annotation.Autowired;
30  import org.springframework.messaging.Message;
31  import org.springframework.messaging.MessageHandler;
32  import org.springframework.messaging.MessagingException;
33  import org.springframework.web.socket.CloseStatus;
34  import org.springframework.web.socket.TextMessage;
35  import org.springframework.web.socket.WebSocketSession;
36  import org.springframework.web.socket.handler.TextWebSocketHandler;
37  
38  import javax.annotation.PostConstruct;
39  import javax.annotation.PreDestroy;
40  import java.io.IOException;
41  import java.util.Map;
42  
43  public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements ISubscriptionWebsocketHandler {
44  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
45  	@Autowired
46  	private SubscriptionWebsocketInterceptor mySubscriptionWebsocketInterceptor;
47  	@Autowired
48  	private FhirContext myCtx;
49  
50  	private IState myState = new InitialState();
51  
52  	@Override
53  	public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception {
54  		super.afterConnectionClosed(theSession, theStatus);
55  		ourLog.info("Closing WebSocket connection from {}", theSession.getRemoteAddress());
56  	}
57  
58  	@Override
59  	public void afterConnectionEstablished(WebSocketSession theSession) throws Exception {
60  		super.afterConnectionEstablished(theSession);
61  		ourLog.info("Incoming WebSocket connection from {}", theSession.getRemoteAddress());
62  	}
63  
64  	protected void handleFailure(Exception theE) {
65  		ourLog.error("Failure during communication", theE);
66  	}
67  
68  	@Override
69  	protected void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) throws Exception {
70  		ourLog.info("Textmessage: " + theMessage.getPayload());
71  		myState.handleTextMessage(theSession, theMessage);
72  	}
73  
74  	@Override
75  	public void handleTransportError(WebSocketSession theSession, Throwable theException) throws Exception {
76  		super.handleTransportError(theSession, theException);
77  		ourLog.error("Transport error", theException);
78  	}
79  
80  	@PostConstruct
81  	public synchronized void postConstruct() {
82  		ourLog.info("Websocket connection has been created");
83  	}
84  
85  	@PreDestroy
86  	public synchronized void preDescroy() {
87  		ourLog.info("Websocket connection is closing");
88  		IState state = myState;
89  		if (state != null) {
90  			state.closing();
91  		}
92  	}
93  
94  
95  	private interface IState {
96  
97  		void closing();
98  
99  		void handleTextMessage(WebSocketSession theSession, TextMessage theMessage);
100 
101 	}
102 
103 	private class BoundStaticSubscipriptionState implements IState, MessageHandler {
104 
105 		private WebSocketSession mySession;
106 		private CanonicalSubscription mySubscription;
107 
108 		public BoundStaticSubscipriptionState(WebSocketSession theSession, CanonicalSubscription theSubscription) {
109 			mySession = theSession;
110 			mySubscription = theSubscription;
111 
112 			String subscriptionId = mySubscription.getIdElement(myCtx).getIdPart();
113 			mySubscriptionWebsocketInterceptor.registerHandler(subscriptionId, this);
114 		}
115 
116 		@Override
117 		public void closing() {
118 			String subscriptionId = mySubscription.getIdElement(myCtx).getIdPart();
119 			mySubscriptionWebsocketInterceptor.unregisterHandler(subscriptionId, this);
120 		}
121 
122 		private void deliver() {
123 			try {
124 				String payload = "ping " + mySubscription.getIdElement(myCtx).getIdPart();
125 				ourLog.info("Sending WebSocket message: {}", payload);
126 				mySession.sendMessage(new TextMessage(payload));
127 			} catch (IOException e) {
128 				handleFailure(e);
129 			}
130 		}
131 
132 		@Override
133 		public void handleMessage(Message<?> theMessage) {
134 			if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
135 				return;
136 			}
137 			try {
138 				ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
139 				if (mySubscription.equals(msg.getSubscription())) {
140 					deliver();
141 				}
142 			} catch (Exception e) {
143 				ourLog.error("Failure handling subscription payload", e);
144 				throw new MessagingException(theMessage, "Failure handling subscription payload", e);
145 			}
146 		}
147 
148 		@Override
149 		public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
150 			try {
151 				theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
152 			} catch (IOException e) {
153 				handleFailure(e);
154 			}
155 		}
156 
157 	}
158 
159 	private class InitialState implements IState {
160 
161 		private IIdType bindSimple(WebSocketSession theSession, String theBindString) {
162 			IdType id = new IdType(theBindString);
163 
164 			if (!id.hasIdPart() || !id.isIdPartValid()) {
165 				try {
166 					String message = "Invalid bind request - No ID included";
167 					ourLog.warn(message);
168 					theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), message));
169 				} catch (IOException e) {
170 					handleFailure(e);
171 				}
172 				return null;
173 			}
174 
175 			if (id.hasResourceType() == false) {
176 				id = id.withResourceType("Subscription");
177 			}
178 
179 			try {
180 				Map<String, CanonicalSubscription> idToSubscription = mySubscriptionWebsocketInterceptor.getIdToSubscription();
181 				CanonicalSubscription subscription = idToSubscription.get(id.getIdPart());
182 				myState = new BoundStaticSubscipriptionState( theSession, subscription);
183 			} catch (ResourceNotFoundException e) {
184 				try {
185 					String message = "Invalid bind request - Unknown subscription: " + id.getValue();
186 					ourLog.warn(message);
187 					theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), message));
188 				} catch (IOException e1) {
189 					handleFailure(e);
190 				}
191 				return null;
192 			}
193 
194 			return id;
195 		}
196 
197 		@Override
198 		public void closing() {
199 			// nothing
200 		}
201 
202 		@Override
203 		public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
204 			String message = theMessage.getPayload();
205 			if (message.startsWith("bind ")) {
206 				String remaining = message.substring("bind ".length());
207 
208 				IIdType subscriptionId;
209 				subscriptionId = bindSimple(theSession, remaining);
210 				if (subscriptionId == null) {
211 					return;
212 				}
213 
214 				try {
215 					theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart()));
216 				} catch (IOException e) {
217 					handleFailure(e);
218 				}
219 
220 			}
221 		}
222 
223 	}
224 
225 }
226 
227 
228 //	private IIdType bingSearch(WebSocketSession theSession, String theRemaining) {
229 //		Subscription subscription = new Subscription();
230 //		subscription.getChannel().setType(SubscriptionChannelType.WEBSOCKET);
231 //		subscription.setStatus(SubscriptionStatus.ACTIVE);
232 //		subscription.setCriteria(theRemaining);
233 //
234 //		try {
235 //			String params = theRemaining.substring(theRemaining.indexOf('?')+1);
236 //			List<NameValuePair> paramValues = URLEncodedUtils.parse(params, Constants.CHARSET_UTF8, '&');
237 //			EncodingEnum encoding = EncodingEnum.JSON;
238 //			for (NameValuePair nameValuePair : paramValues) {
239 //				if (Constants.PARAM_FORMAT.equals(nameValuePair.getName())) {
240 //					EncodingEnum nextEncoding = EncodingEnum.forContentType(nameValuePair.getValue());
241 //					if (nextEncoding != null) {
242 //						encoding = nextEncoding;
243 //					}
244 //				}
245 //			}
246 //
247 //			IIdType id = ourSubscriptionDao.create(subscription).getId();
248 //
249 //			mySubscriptionPid = ourSubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id);
250 //			mySubscriptionId = subscription.getIdElement();
251 //			myState = new BoundDynamicSubscriptionState(theSession, encoding);
252 //
253 //			return id;
254 //		} catch (UnprocessableEntityException e) {
255 //			ourLog.warn("Failed to bind subscription: " + e.getMessage());
256 //			try {
257 //				theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage()));
258 //			} catch (IOException e2) {
259 //				handleFailure(e2);
260 //			}
261 //		} catch (Exception e) {
262 //			handleFailure(e);
263 //			try {
264 //				theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included"));
265 //			} catch (IOException e2) {
266 //				handleFailure(e2);
267 //			}
268 //		}
269 //		return null;
270 //	}
271 
272 
273 //private class BoundDynamicSubscriptionState implements SubscriptionWebsocketHandler.IState {
274 //
275 //	private EncodingEnum myEncoding;
276 //	private WebSocketSession mySession;
277 //
278 //	public BoundDynamicSubscriptionState(WebSocketSession theSession, EncodingEnum theEncoding) {
279 //		mySession = theSession;
280 //		myEncoding = theEncoding;
281 //	}
282 //
283 //	@Override
284 //	public void closing() {
285 //		ourLog.info("Deleting subscription {}", mySubscriptionId);
286 //		try {
287 //			ourSubscriptionDao.delete(mySubscriptionId, null);
288 //		} catch (Exception e) {
289 //			handleFailure(e);
290 //		}
291 //	}
292 //
293 //	@Override
294 //	public void deliver(List<IBaseResource> theResults) {
295 //		try {
296 //			for (IBaseResource nextResource : theResults) {
297 //				ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement());
298 //				String encoded = myEncoding.newParser(ourCtx).encodeResourceToString(nextResource);
299 //				String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded;
300 //				mySession.sendMessage(new TextMessage(payload));
301 //			}
302 //		} catch (IOException e) {
303 //			handleFailure(e);
304 //		}
305 //	}
306 //
307 //	@Override
308 //	public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
309 //		try {
310 //			theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
311 //		} catch (IOException e) {
312 //			handleFailure(e);
313 //		}
314 //	}
315 //
316 //}