001/*
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.deliver.websocket;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
024import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelWithHandlers;
025import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
026import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
027import org.hl7.fhir.instance.model.api.IIdType;
028import org.hl7.fhir.r4.model.IdType;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.springframework.beans.factory.annotation.Autowired;
032import org.springframework.messaging.Message;
033import org.springframework.messaging.MessageHandler;
034import org.springframework.messaging.MessagingException;
035import org.springframework.web.socket.CloseStatus;
036import org.springframework.web.socket.TextMessage;
037import org.springframework.web.socket.WebSocketHandler;
038import org.springframework.web.socket.WebSocketSession;
039import org.springframework.web.socket.handler.TextWebSocketHandler;
040
041import java.io.IOException;
042import javax.annotation.PostConstruct;
043import javax.annotation.PreDestroy;
044
045public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements WebSocketHandler {
046        private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
047
048        @Autowired
049        protected WebsocketConnectionValidator myWebsocketConnectionValidator;
050
051        @Autowired
052        SubscriptionChannelRegistry mySubscriptionChannelRegistry;
053
054        /**
055         * Constructor
056         */
057        public SubscriptionWebsocketHandler() {
058                super();
059        }
060
061        private IState myState = new InitialState();
062
063        @Override
064        public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception {
065                super.afterConnectionClosed(theSession, theStatus);
066                ourLog.info("Closing WebSocket connection from {}", theSession.getRemoteAddress());
067        }
068
069        @Override
070        public void afterConnectionEstablished(WebSocketSession theSession) throws Exception {
071                super.afterConnectionEstablished(theSession);
072                ourLog.info("Incoming WebSocket connection from {}", theSession.getRemoteAddress());
073        }
074
075        protected void handleFailure(Exception theE) {
076                ourLog.error("Failure during communication", theE);
077        }
078
079        @Override
080        protected void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) throws Exception {
081                ourLog.info("Textmessage: " + theMessage.getPayload());
082                myState.handleTextMessage(theSession, theMessage);
083        }
084
085        @Override
086        public void handleTransportError(WebSocketSession theSession, Throwable theException) throws Exception {
087                super.handleTransportError(theSession, theException);
088                ourLog.error("Transport error", theException);
089        }
090
091        @PostConstruct
092        public synchronized void postConstruct() {
093                ourLog.info("Websocket connection has been created");
094        }
095
096        @PreDestroy
097        public synchronized void preDescroy() {
098                ourLog.info("Websocket connection is closing");
099                IState state = myState;
100                if (state != null) {
101                        state.closing();
102                }
103        }
104
105        private interface IState {
106
107                void closing();
108
109                void handleTextMessage(WebSocketSession theSession, TextMessage theMessage);
110        }
111
112        private class BoundStaticSubscriptionState implements IState, MessageHandler {
113
114                private final WebSocketSession mySession;
115                private final ActiveSubscription myActiveSubscription;
116
117                public BoundStaticSubscriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) {
118                        mySession = theSession;
119                        myActiveSubscription = theActiveSubscription;
120
121                        SubscriptionChannelWithHandlers subscriptionChannelWithHandlers =
122                                        mySubscriptionChannelRegistry.getDeliveryReceiverChannel(theActiveSubscription.getChannelName());
123                        subscriptionChannelWithHandlers.addHandler(this);
124                }
125
126                @Override
127                public void closing() {
128                        SubscriptionChannelWithHandlers subscriptionChannelWithHandlers =
129                                        mySubscriptionChannelRegistry.getDeliveryReceiverChannel(myActiveSubscription.getChannelName());
130                        subscriptionChannelWithHandlers.removeHandler(this);
131                }
132
133                private void deliver() {
134                        try {
135                                String payload = "ping " + myActiveSubscription.getId();
136                                ourLog.info("Sending WebSocket message: {}", payload);
137                                mySession.sendMessage(new TextMessage(payload));
138                        } catch (IOException e) {
139                                handleFailure(e);
140                        }
141                }
142
143                @Override
144                public void handleMessage(Message<?> theMessage) {
145                        if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
146                                return;
147                        }
148                        try {
149                                ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
150                                if (myActiveSubscription.getSubscription().equals(msg.getSubscription())) {
151                                        deliver();
152                                }
153                        } catch (Exception e) {
154                                ourLog.error("Failure handling subscription payload", e);
155                                throw new MessagingException(theMessage, Msg.code(6) + "Failure handling subscription payload", e);
156                        }
157                }
158
159                @Override
160                public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
161                        try {
162                                theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
163                        } catch (IOException e) {
164                                handleFailure(e);
165                        }
166                }
167        }
168
169        private class InitialState implements IState {
170
171                private IIdType bindSimple(WebSocketSession theSession, String theBindString) {
172                        IdType id = new IdType(theBindString);
173
174                        WebsocketValidationResponse response = myWebsocketConnectionValidator.validate(id);
175                        if (!response.isValid()) {
176                                try {
177                                        ourLog.warn(response.getMessage());
178                                        theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), response.getMessage()));
179                                } catch (IOException e) {
180                                        handleFailure(e);
181                                }
182                                return null;
183                        }
184
185                        myState = new BoundStaticSubscriptionState(theSession, response.getActiveSubscription());
186
187                        return id;
188                }
189
190                @Override
191                public void closing() {
192                        // nothing
193                }
194
195                @Override
196                public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
197                        String message = theMessage.getPayload();
198                        if (message.startsWith("bind ")) {
199                                String remaining = message.substring("bind ".length());
200
201                                IIdType subscriptionId;
202                                subscriptionId = bindSimple(theSession, remaining);
203                                if (subscriptionId == null) {
204                                        return;
205                                }
206
207                                try {
208                                        theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart()));
209                                } catch (IOException e) {
210                                        handleFailure(e);
211                                }
212                        }
213                }
214        }
215}