View Javadoc
1   package ca.uhn.fhir.jpa.subscription.websocket;
2   
3   /*
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
25  import ca.uhn.fhir.jpa.subscription.ResourceDeliveryMessage;
26  import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
27  import org.hl7.fhir.instance.model.api.IIdType;
28  import org.hl7.fhir.r4.model.IdType;
29  import org.springframework.beans.factory.annotation.Autowired;
30  import org.springframework.messaging.Message;
31  import org.springframework.messaging.MessageHandler;
32  import org.springframework.messaging.MessagingException;
33  import org.springframework.web.socket.CloseStatus;
34  import org.springframework.web.socket.TextMessage;
35  import org.springframework.web.socket.WebSocketSession;
36  import org.springframework.web.socket.handler.TextWebSocketHandler;
37  
38  import javax.annotation.PostConstruct;
39  import javax.annotation.PreDestroy;
40  import java.io.IOException;
41  import java.util.Map;
42  
43  public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements ISubscriptionWebsocketHandler {
44  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
45  	@Autowired
46  	private SubscriptionWebsocketInterceptor mySubscriptionWebsocketInterceptor;
47  	@Autowired
48  	private FhirContext myCtx;
49  
50  	private IState myState = new InitialState();
51  
52  	@Override
53  	public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception {
54  		super.afterConnectionClosed(theSession, theStatus);
55  		ourLog.info("Closing WebSocket connection from {}", theSession.getRemoteAddress());
56  	}
57  
58  	@Override
59  	public void afterConnectionEstablished(WebSocketSession theSession) throws Exception {
60  		super.afterConnectionEstablished(theSession);
61  		ourLog.info("Incoming WebSocket connection from {}", theSession.getRemoteAddress());
62  	}
63  
64  	protected void handleFailure(Exception theE) {
65  		ourLog.error("Failure during communication", theE);
66  	}
67  
68  	@Override
69  	protected void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) throws Exception {
70  		ourLog.info("Textmessage: " + theMessage.getPayload());
71  		myState.handleTextMessage(theSession, theMessage);
72  	}
73  
74  	@Override
75  	public void handleTransportError(WebSocketSession theSession, Throwable theException) throws Exception {
76  		super.handleTransportError(theSession, theException);
77  		ourLog.error("Transport error", theException);
78  	}
79  
80  	@PostConstruct
81  	public synchronized void postConstruct() {
82  		ourLog.info("Websocket connection has been created");
83  	}
84  
85  	@PreDestroy
86  	public synchronized void preDescroy() {
87  		ourLog.info("Websocket connection is closing");
88  		IState state = myState;
89  		if (state != null) {
90  			state.closing();
91  		}
92  	}
93  
94  
95  	private interface IState {
96  
97  		void closing();
98  
99  		void handleTextMessage(WebSocketSession theSession, TextMessage theMessage);
100 
101 	}
102 
103 	private class BoundStaticSubscipriptionState implements IState, MessageHandler {
104 
105 		private WebSocketSession mySession;
106 		private CanonicalSubscription mySubscription;
107 
108 		public BoundStaticSubscipriptionState(WebSocketSession theSession, CanonicalSubscription theSubscription) {
109 			mySession = theSession;
110 			mySubscription = theSubscription;
111 
112 			mySubscriptionWebsocketInterceptor.getDeliveryChannel().subscribe(this);
113 		}
114 
115 		@Override
116 		public void closing() {
117 			mySubscriptionWebsocketInterceptor.getDeliveryChannel().unsubscribe(this);
118 		}
119 
120 		private void deliver() {
121 			try {
122 				String payload = "ping " + mySubscription.getIdElement(myCtx).getIdPart();
123 				ourLog.info("Sending WebSocket message: {}", payload);
124 				mySession.sendMessage(new TextMessage(payload));
125 			} catch (IOException e) {
126 				handleFailure(e);
127 			}
128 		}
129 
130 		@Override
131 		public void handleMessage(Message<?> theMessage) {
132 			if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
133 				return;
134 			}
135 			try {
136 				ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
137 				if (mySubscription.equals(msg.getSubscription())) {
138 					deliver();
139 				}
140 			} catch (Exception e) {
141 				ourLog.error("Failure handling subscription payload", e);
142 				throw new MessagingException(theMessage, "Failure handling subscription payload", e);
143 			}
144 		}
145 
146 		@Override
147 		public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
148 			try {
149 				theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
150 			} catch (IOException e) {
151 				handleFailure(e);
152 			}
153 		}
154 
155 	}
156 
157 	private class InitialState implements IState {
158 
159 		private IIdType bindSimple(WebSocketSession theSession, String theBindString) {
160 			IdType id = new IdType(theBindString);
161 
162 			if (!id.hasIdPart() || !id.isIdPartValid()) {
163 				try {
164 					String message = "Invalid bind request - No ID included";
165 					ourLog.warn(message);
166 					theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), message));
167 				} catch (IOException e) {
168 					handleFailure(e);
169 				}
170 				return null;
171 			}
172 
173 			if (id.hasResourceType() == false) {
174 				id = id.withResourceType("Subscription");
175 			}
176 
177 			try {
178 				Map<String, CanonicalSubscription> idToSubscription = mySubscriptionWebsocketInterceptor.getIdToSubscription();
179 				CanonicalSubscription subscription = idToSubscription.get(id.getIdPart());
180 				myState = new BoundStaticSubscipriptionState( theSession, subscription);
181 			} catch (ResourceNotFoundException e) {
182 				try {
183 					String message = "Invalid bind request - Unknown subscription: " + id.getValue();
184 					ourLog.warn(message);
185 					theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), message));
186 				} catch (IOException e1) {
187 					handleFailure(e);
188 				}
189 				return null;
190 			}
191 
192 			return id;
193 		}
194 
195 		@Override
196 		public void closing() {
197 			// nothing
198 		}
199 
200 		@Override
201 		public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
202 			String message = theMessage.getPayload();
203 			if (message.startsWith("bind ")) {
204 				String remaining = message.substring("bind ".length());
205 
206 				IIdType subscriptionId;
207 				subscriptionId = bindSimple(theSession, remaining);
208 				if (subscriptionId == null) {
209 					return;
210 				}
211 
212 				try {
213 					theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart()));
214 				} catch (IOException e) {
215 					handleFailure(e);
216 				}
217 
218 			}
219 		}
220 
221 	}
222 
223 }
224 
225 
226 //	private IIdType bingSearch(WebSocketSession theSession, String theRemaining) {
227 //		Subscription subscription = new Subscription();
228 //		subscription.getChannel().setType(SubscriptionChannelType.WEBSOCKET);
229 //		subscription.setStatus(SubscriptionStatus.ACTIVE);
230 //		subscription.setCriteria(theRemaining);
231 //
232 //		try {
233 //			String params = theRemaining.substring(theRemaining.indexOf('?')+1);
234 //			List<NameValuePair> paramValues = URLEncodedUtils.parse(params, Constants.CHARSET_UTF8, '&');
235 //			EncodingEnum encoding = EncodingEnum.JSON;
236 //			for (NameValuePair nameValuePair : paramValues) {
237 //				if (Constants.PARAM_FORMAT.equals(nameValuePair.getName())) {
238 //					EncodingEnum nextEncoding = EncodingEnum.forContentType(nameValuePair.getValue());
239 //					if (nextEncoding != null) {
240 //						encoding = nextEncoding;
241 //					}
242 //				}
243 //			}
244 //
245 //			IIdType id = ourSubscriptionDao.create(subscription).getId();
246 //
247 //			mySubscriptionPid = ourSubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id);
248 //			mySubscriptionId = subscription.getIdElement();
249 //			myState = new BoundDynamicSubscriptionState(theSession, encoding);
250 //
251 //			return id;
252 //		} catch (UnprocessableEntityException e) {
253 //			ourLog.warn("Failed to bind subscription: " + e.getMessage());
254 //			try {
255 //				theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage()));
256 //			} catch (IOException e2) {
257 //				handleFailure(e2);
258 //			}
259 //		} catch (Exception e) {
260 //			handleFailure(e);
261 //			try {
262 //				theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included"));
263 //			} catch (IOException e2) {
264 //				handleFailure(e2);
265 //			}
266 //		}
267 //		return null;
268 //	}
269 
270 
271 //private class BoundDynamicSubscriptionState implements SubscriptionWebsocketHandler.IState {
272 //
273 //	private EncodingEnum myEncoding;
274 //	private WebSocketSession mySession;
275 //
276 //	public BoundDynamicSubscriptionState(WebSocketSession theSession, EncodingEnum theEncoding) {
277 //		mySession = theSession;
278 //		myEncoding = theEncoding;
279 //	}
280 //
281 //	@Override
282 //	public void closing() {
283 //		ourLog.info("Deleting subscription {}", mySubscriptionId);
284 //		try {
285 //			ourSubscriptionDao.delete(mySubscriptionId, null);
286 //		} catch (Exception e) {
287 //			handleFailure(e);
288 //		}
289 //	}
290 //
291 //	@Override
292 //	public void deliver(List<IBaseResource> theResults) {
293 //		try {
294 //			for (IBaseResource nextResource : theResults) {
295 //				ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement());
296 //				String encoded = myEncoding.newParser(ourCtx).encodeResourceToString(nextResource);
297 //				String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded;
298 //				mySession.sendMessage(new TextMessage(payload));
299 //			}
300 //		} catch (IOException e) {
301 //			handleFailure(e);
302 //		}
303 //	}
304 //
305 //	@Override
306 //	public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
307 //		try {
308 //			theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
309 //		} catch (IOException e) {
310 //			handleFailure(e);
311 //		}
312 //	}
313 //
314 //}