source: ogServer-Git/src/core.c @ c05f134

Last change on this file since c05f134 was 3b1f2c2, checked in by OpenGnSys Support Team <soporte-og@…>, 3 years ago

#980 Broken TCP connection times out after 120 seconds through keepalive

Enable TCP keepalive to detect if the ogClient is gone (hard reset). If no reply
after 120 seconds, then release the connection to the client.

  • Property mode set to 100644
File size: 9.4 KB
Line 
1/*
2 * Copyright (C) 2020 Soleta Networks <info@soleta.eu>
3 *
4 * This program is free software: you can redistribute it and/or modify it under
5 * the terms of the GNU Affero General Public License as published by the
6 * Free Software Foundation, version 3.
7 */
8
9#include "ogAdmServer.h"
10#include "dbi.h"
11#include "utils.h"
12#include "list.h"
13#include "rest.h"
14#include "client.h"
15#include "json.h"
16#include "schedule.h"
17#include <syslog.h>
18#include <sys/ioctl.h>
19#include <ifaddrs.h>
20#include <sys/types.h>
21#include <sys/stat.h>
22#include <netinet/tcp.h>
23#include <fcntl.h>
24#include <jansson.h>
25#include <time.h>
26
27static void og_client_release(struct ev_loop *loop, struct og_client *cli)
28{
29        list_del(&cli->list);
30        ev_io_stop(loop, &cli->io);
31        close(cli->io.fd);
32        free(cli);
33}
34
35static void og_client_reset_state(struct og_client *cli)
36{
37        cli->state = OG_CLIENT_RECEIVING_HEADER;
38        cli->buf_len = 0;
39}
40
41static int og_client_payload_too_large(struct og_client *cli)
42{
43        char buf[] = "HTTP/1.1 413 Payload Too Large\r\n"
44                     "Content-Length: 0\r\n\r\n";
45
46        send(og_client_socket(cli), buf, strlen(buf), 0);
47
48        return -1;
49}
50
51static int og_client_state_recv_hdr_rest(struct og_client *cli)
52{
53        char *ptr;
54
55        ptr = strstr(cli->buf, "\r\n\r\n");
56        if (!ptr)
57                return 0;
58
59        cli->msg_len = ptr - cli->buf + 4;
60
61        ptr = strstr(cli->buf, "Content-Length: ");
62        if (ptr) {
63                sscanf(ptr, "Content-Length: %i[^\r\n]", &cli->content_length);
64                if (cli->content_length < 0)
65                        return -1;
66                cli->msg_len += cli->content_length;
67        }
68
69        ptr = strstr(cli->buf, "Authorization: ");
70        if (ptr)
71                sscanf(ptr, "Authorization: %63[^\r\n]", cli->auth_token);
72
73        return 1;
74}
75
76static int og_client_recv(struct og_client *cli, int events)
77{
78        struct ev_io *io = &cli->io;
79        int ret;
80
81        if (events & EV_ERROR) {
82                syslog(LOG_ERR, "unexpected error event from client %s:%hu\n",
83                               inet_ntoa(cli->addr.sin_addr),
84                               ntohs(cli->addr.sin_port));
85                return 0;
86        }
87
88        ret = recv(io->fd, cli->buf + cli->buf_len,
89                   sizeof(cli->buf) - cli->buf_len, 0);
90        if (ret <= 0) {
91                if (ret < 0) {
92                        syslog(LOG_ERR, "error reading from client %s:%hu (%s)\n",
93                               inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port),
94                               strerror(errno));
95                }
96                return ret;
97        }
98
99        return ret;
100}
101
102static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events)
103{
104        struct og_client *cli;
105        int ret;
106
107        cli = container_of(io, struct og_client, io);
108
109        ret = og_client_recv(cli, events);
110        if (ret <= 0)
111                goto close;
112
113        if (cli->keepalive_idx >= 0)
114                return;
115
116        ev_timer_again(loop, &cli->timer);
117
118        cli->buf_len += ret;
119        if (cli->buf_len >= sizeof(cli->buf)) {
120                syslog(LOG_ERR, "client request from %s:%hu is too long\n",
121                       inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port));
122                og_client_payload_too_large(cli);
123                goto close;
124        }
125
126        switch (cli->state) {
127        case OG_CLIENT_RECEIVING_HEADER:
128                ret = og_client_state_recv_hdr_rest(cli);
129                if (ret < 0)
130                        goto close;
131                if (!ret)
132                        return;
133
134                cli->state = OG_CLIENT_RECEIVING_PAYLOAD;
135                /* Fall through. */
136        case OG_CLIENT_RECEIVING_PAYLOAD:
137                /* Still not enough data to process request. */
138                if (cli->buf_len < cli->msg_len)
139                        return;
140
141                cli->state = OG_CLIENT_PROCESSING_REQUEST;
142                /* fall through. */
143        case OG_CLIENT_PROCESSING_REQUEST:
144                ret = og_client_state_process_payload_rest(cli);
145                if (ret < 0) {
146                        syslog(LOG_ERR, "Failed to process HTTP request from %s:%hu\n",
147                               inet_ntoa(cli->addr.sin_addr),
148                               ntohs(cli->addr.sin_port));
149                }
150                if (ret < 0)
151                        goto close;
152
153                if (cli->keepalive_idx < 0) {
154                        goto close;
155                } else {
156                        og_client_reset_state(cli);
157                }
158                break;
159        default:
160                syslog(LOG_ERR, "unknown state, critical internal error\n");
161                goto close;
162        }
163        return;
164close:
165        ev_timer_stop(loop, &cli->timer);
166        og_client_release(loop, cli);
167}
168
169enum og_agent_state {
170        OG_AGENT_RECEIVING_HEADER       = 0,
171        OG_AGENT_RECEIVING_PAYLOAD,
172        OG_AGENT_PROCESSING_RESPONSE,
173};
174
175static int og_agent_state_recv_hdr_rest(struct og_client *cli)
176{
177        char *ptr;
178
179        ptr = strstr(cli->buf, "\r\n\r\n");
180        if (!ptr)
181                return 0;
182
183        cli->msg_len = ptr - cli->buf + 4;
184
185        ptr = strstr(cli->buf, "Content-Length: ");
186        if (ptr) {
187                sscanf(ptr, "Content-Length: %i[^\r\n]", &cli->content_length);
188                if (cli->content_length < 0)
189                        return -1;
190                cli->msg_len += cli->content_length;
191        }
192
193        return 1;
194}
195
196static void og_agent_reset_state(struct og_client *cli)
197{
198        cli->state = OG_AGENT_RECEIVING_HEADER;
199        cli->buf_len = 0;
200        cli->content_length = 0;
201        memset(cli->buf, 0, sizeof(cli->buf));
202}
203
204static void og_agent_deliver_pending_cmd(struct og_client *cli)
205{
206        const struct og_cmd *cmd;
207
208        cmd = og_cmd_find(inet_ntoa(cli->addr.sin_addr));
209        if (!cmd)
210                return;
211
212        og_send_request(cmd->method, cmd->type, &cmd->params, cmd->json);
213        cli->last_cmd_id = cmd->id;
214
215        og_cmd_free(cmd);
216}
217
218static void og_agent_read_cb(struct ev_loop *loop, struct ev_io *io, int events)
219{
220        struct og_client *cli;
221        int ret;
222
223        cli = container_of(io, struct og_client, io);
224
225        ret = og_client_recv(cli, events);
226        if (ret <= 0)
227                goto close;
228
229        ev_timer_again(loop, &cli->timer);
230
231        cli->buf_len += ret;
232        if (cli->buf_len >= sizeof(cli->buf)) {
233                syslog(LOG_ERR, "client request from %s:%hu is too long\n",
234                       inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port));
235                goto close;
236        }
237
238        switch (cli->state) {
239        case OG_AGENT_RECEIVING_HEADER:
240                ret = og_agent_state_recv_hdr_rest(cli);
241                if (ret < 0)
242                        goto close;
243                if (!ret)
244                        return;
245
246                cli->state = OG_AGENT_RECEIVING_PAYLOAD;
247                /* Fall through. */
248        case OG_AGENT_RECEIVING_PAYLOAD:
249                /* Still not enough data to process request. */
250                if (cli->buf_len < cli->msg_len)
251                        return;
252
253                cli->state = OG_AGENT_PROCESSING_RESPONSE;
254                /* fall through. */
255        case OG_AGENT_PROCESSING_RESPONSE:
256                ret = og_agent_state_process_response(cli);
257                if (ret < 0) {
258                        syslog(LOG_ERR, "Failed to process HTTP request from %s:%hu\n",
259                               inet_ntoa(cli->addr.sin_addr),
260                               ntohs(cli->addr.sin_port));
261                        goto close;
262                } else if (ret == 0) {
263                        og_agent_deliver_pending_cmd(cli);
264                }
265
266                og_agent_reset_state(cli);
267                break;
268        default:
269                syslog(LOG_ERR, "unknown state, critical internal error\n");
270                goto close;
271        }
272        return;
273close:
274        ev_timer_stop(loop, &cli->timer);
275        og_client_release(loop, cli);
276}
277
278static void og_client_timer_cb(struct ev_loop *loop, ev_timer *timer, int events)
279{
280        struct og_client *cli;
281
282        cli = container_of(timer, struct og_client, timer);
283        if (cli->keepalive_idx >= 0) {
284                ev_timer_again(loop, &cli->timer);
285                return;
286        }
287        syslog(LOG_ERR, "timeout request for client %s:%hu\n",
288               inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port));
289
290        og_client_release(loop, cli);
291}
292
293static void og_agent_send_refresh(struct og_client *cli)
294{
295        struct og_msg_params params;
296        int err;
297
298        params.ips_array[0] = inet_ntoa(cli->addr.sin_addr);
299        params.ips_array_len = 1;
300
301        err = og_send_request(OG_METHOD_GET, OG_CMD_REFRESH, &params, NULL);
302        if (err < 0) {
303                syslog(LOG_ERR, "Can't send refresh to: %s\n",
304                       params.ips_array[0]);
305        } else {
306                syslog(LOG_INFO, "Sent refresh to: %s\n",
307                       params.ips_array[0]);
308        }
309}
310
311/* Shut down connection if there is no complete message after 10 seconds. */
312#define OG_CLIENT_TIMEOUT       10
313
314/* Agent client operation might take longer, shut down after 30 seconds. */
315#define OG_AGENT_CLIENT_TIMEOUT 30
316
317#define OG_TCP_KEEPALIVE_IDLE   60
318#define OG_TCP_KEEPALIVE_INTL   30
319#define OG_TCP_KEEPALIVE_CNT    4
320
321int socket_rest, socket_agent_rest;
322
323void og_server_accept_cb(struct ev_loop *loop, struct ev_io *io, int events)
324{
325        int intl = OG_TCP_KEEPALIVE_INTL, cnt = OG_TCP_KEEPALIVE_CNT;
326        int on = 1, idle = OG_TCP_KEEPALIVE_IDLE;
327        struct sockaddr_in client_addr;
328        socklen_t addrlen = sizeof(client_addr);
329        struct og_client *cli;
330        int client_sd;
331
332        if (events & EV_ERROR)
333                return;
334
335        client_sd = accept(io->fd, (struct sockaddr *)&client_addr, &addrlen);
336        if (client_sd < 0) {
337                syslog(LOG_ERR, "cannot accept client connection\n");
338                return;
339        }
340
341        setsockopt(client_sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(int));
342        setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(int));
343        setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPINTVL, &intl, sizeof(int));
344        setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(int));
345
346        cli = (struct og_client *)calloc(1, sizeof(struct og_client));
347        if (!cli) {
348                close(client_sd);
349                return;
350        }
351        memcpy(&cli->addr, &client_addr, sizeof(client_addr));
352        if (io->fd == socket_agent_rest)
353                cli->keepalive_idx = 0;
354        else
355                cli->keepalive_idx = -1;
356
357        if (io->fd == socket_rest)
358                cli->rest = true;
359        else if (io->fd == socket_agent_rest)
360                cli->agent = true;
361
362        if (io->fd == socket_agent_rest)
363                ev_io_init(&cli->io, og_agent_read_cb, client_sd, EV_READ);
364        else
365                ev_io_init(&cli->io, og_client_read_cb, client_sd, EV_READ);
366
367        ev_io_start(loop, &cli->io);
368        if (io->fd == socket_agent_rest) {
369                ev_timer_init(&cli->timer, og_client_timer_cb,
370                              OG_AGENT_CLIENT_TIMEOUT, 0.);
371        } else {
372                ev_timer_init(&cli->timer, og_client_timer_cb,
373                              OG_CLIENT_TIMEOUT, 0.);
374        }
375        ev_timer_start(loop, &cli->timer);
376        og_client_add(cli);
377
378        if (io->fd == socket_agent_rest) {
379                og_agent_send_refresh(cli);
380        }
381}
382
383int og_socket_server_init(const char *port)
384{
385        struct sockaddr_in local;
386        int sd, on = 1;
387
388        sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
389        if (sd < 0) {
390                syslog(LOG_ERR, "cannot create main socket\n");
391                return -1;
392        }
393        setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(int));
394
395        local.sin_addr.s_addr = htonl(INADDR_ANY);
396        local.sin_family = AF_INET;
397        local.sin_port = htons(atoi(port));
398
399        if (bind(sd, (struct sockaddr *) &local, sizeof(local)) < 0) {
400                close(sd);
401                syslog(LOG_ERR, "cannot bind socket\n");
402                return -1;
403        }
404
405        listen(sd, 250);
406
407        return sd;
408}
Note: See TracBrowser for help on using the repository browser.