001package ca.uhn.fhir.jpa.subscription.match.deliver.websocket;
002
003/*
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
024import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelWithHandlers;
025import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
026import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
027import org.hl7.fhir.instance.model.api.IIdType;
028import org.hl7.fhir.r4.model.IdType;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.springframework.beans.factory.annotation.Autowired;
032import org.springframework.messaging.Message;
033import org.springframework.messaging.MessageHandler;
034import org.springframework.messaging.MessagingException;
035import org.springframework.web.socket.CloseStatus;
036import org.springframework.web.socket.TextMessage;
037import org.springframework.web.socket.WebSocketHandler;
038import org.springframework.web.socket.WebSocketSession;
039import org.springframework.web.socket.handler.TextWebSocketHandler;
040
041import javax.annotation.PostConstruct;
042import javax.annotation.PreDestroy;
043import java.io.IOException;
044
045public class SubscriptionWebsocketHandler extends TextWebSocketHandler implements WebSocketHandler {
046        private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class);
047        @Autowired
048        protected WebsocketConnectionValidator myWebsocketConnectionValidator;
049        @Autowired
050        SubscriptionChannelRegistry mySubscriptionChannelRegistry;
051
052        /**
053         * Constructor
054         */
055        public SubscriptionWebsocketHandler() {
056                super();
057        }
058
059        private IState myState = new InitialState();
060
061        @Override
062        public void afterConnectionClosed(WebSocketSession theSession, CloseStatus theStatus) throws Exception {
063                super.afterConnectionClosed(theSession, theStatus);
064                ourLog.info("Closing WebSocket connection from {}", theSession.getRemoteAddress());
065        }
066
067        @Override
068        public void afterConnectionEstablished(WebSocketSession theSession) throws Exception {
069                super.afterConnectionEstablished(theSession);
070                ourLog.info("Incoming WebSocket connection from {}", theSession.getRemoteAddress());
071        }
072
073        protected void handleFailure(Exception theE) {
074                ourLog.error("Failure during communication", theE);
075        }
076
077        @Override
078        protected void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) throws Exception {
079                ourLog.info("Textmessage: " + theMessage.getPayload());
080                myState.handleTextMessage(theSession, theMessage);
081        }
082
083        @Override
084        public void handleTransportError(WebSocketSession theSession, Throwable theException) throws Exception {
085                super.handleTransportError(theSession, theException);
086                ourLog.error("Transport error", theException);
087        }
088
089        @PostConstruct
090        public synchronized void postConstruct() {
091                ourLog.info("Websocket connection has been created");
092        }
093
094        @PreDestroy
095        public synchronized void preDescroy() {
096                ourLog.info("Websocket connection is closing");
097                IState state = myState;
098                if (state != null) {
099                        state.closing();
100                }
101        }
102
103
104        private interface IState {
105
106                void closing();
107
108                void handleTextMessage(WebSocketSession theSession, TextMessage theMessage);
109
110        }
111
112        private class BoundStaticSubscriptionState implements IState, MessageHandler {
113
114                private final WebSocketSession mySession;
115                private final ActiveSubscription myActiveSubscription;
116
117                public BoundStaticSubscriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) {
118                        mySession = theSession;
119                        myActiveSubscription = theActiveSubscription;
120
121                        SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.getDeliveryReceiverChannel(theActiveSubscription.getChannelName());
122                        subscriptionChannelWithHandlers.addHandler(this);
123                }
124
125                @Override
126                public void closing() {
127                        SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.getDeliveryReceiverChannel(myActiveSubscription.getChannelName());
128                        subscriptionChannelWithHandlers.removeHandler(this);
129                }
130
131                private void deliver() {
132                        try {
133                                String payload = "ping " + myActiveSubscription.getId();
134                                ourLog.info("Sending WebSocket message: {}", payload);
135                                mySession.sendMessage(new TextMessage(payload));
136                        } catch (IOException e) {
137                                handleFailure(e);
138                        }
139                }
140
141                @Override
142                public void handleMessage(Message<?> theMessage) {
143                        if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
144                                return;
145                        }
146                        try {
147                                ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
148                                if (myActiveSubscription.getSubscription().equals(msg.getSubscription())) {
149                                        deliver();
150                                }
151                        } catch (Exception e) {
152                                ourLog.error("Failure handling subscription payload", e);
153                                throw new MessagingException(theMessage, "Failure handling subscription payload", e);
154                        }
155                }
156
157                @Override
158                public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
159                        try {
160                                theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
161                        } catch (IOException e) {
162                                handleFailure(e);
163                        }
164                }
165        }
166
167        private class InitialState implements IState {
168
169                private IIdType bindSimple(WebSocketSession theSession, String theBindString) {
170                        IdType id = new IdType(theBindString);
171
172                        WebsocketValidationResponse response = myWebsocketConnectionValidator.validate(id);
173                        if (!response.isValid()) {
174                                try {
175                                        ourLog.warn(response.getMessage());
176                                        theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), response.getMessage()));
177                                } catch (IOException e) {
178                                        handleFailure(e);
179                                }
180                                return null;
181                        }
182
183                        myState = new BoundStaticSubscriptionState(theSession, response.getActiveSubscription());
184
185                        return id;
186                }
187
188                @Override
189                public void closing() {
190                        // nothing
191                }
192
193                @Override
194                public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
195                        String message = theMessage.getPayload();
196                        if (message.startsWith("bind ")) {
197                                String remaining = message.substring("bind ".length());
198
199                                IIdType subscriptionId;
200                                subscriptionId = bindSimple(theSession, remaining);
201                                if (subscriptionId == null) {
202                                        return;
203                                }
204
205                                try {
206                                        theSession.sendMessage(new TextMessage("bound " + subscriptionId.getIdPart()));
207                                } catch (IOException e) {
208                                        handleFailure(e);
209                                }
210
211                        }
212                }
213
214        }
215
216}