source: ogServer-Git/src/core.c @ ac3ce22

Last change on this file since ac3ce22 was ac3ce22, checked in by Jose M. Guisado <jguisado@…>, 3 years ago

#1064 revisit error handling from ogClient

200 => successful command, run next pending command
202 => successful command in progress, do not run next pending command
403 => server sent a malformed HTTP header, should not ever happen,

close connection (server is buggy?).

500 => client fails to run command, report error and run next pending command
503 => client is busy, report error and do not run next pending command

Anything else, should not ever happen (client is buggy?), close connection with
client.

On error, when processing response from ogClient, do not close the connection,
instead annotate in the database that command was not successful and run next
pending command.

*Only* if client replies status code 500 set last_cmd to UNSPEC so its state is
not BSY as reported by og_client_status function and pending cmds can be
sent.

  • Property mode set to 100644
File size: 9.3 KB
RevLine 
[48de515]1/*
[a7cce8d]2 * Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
[48de515]3 *
4 * This program is free software: you can redistribute it and/or modify it under
5 * the terms of the GNU Affero General Public License as published by the
[a7cce8d]6 * Free Software Foundation; either version 3 of the License, or
7 * (at your option) any later version.
[48de515]8 */
9
10#include "ogAdmServer.h"
11#include "dbi.h"
12#include "utils.h"
13#include "list.h"
14#include "rest.h"
[1f13855]15#include "wol.h"
[48de515]16#include "client.h"
17#include "json.h"
18#include "schedule.h"
19#include <syslog.h>
20#include <sys/ioctl.h>
21#include <ifaddrs.h>
22#include <sys/types.h>
23#include <sys/stat.h>
[3b1f2c2]24#include <netinet/tcp.h>
[48de515]25#include <fcntl.h>
26#include <jansson.h>
27#include <time.h>
28
29static void og_client_release(struct ev_loop *loop, struct og_client *cli)
30{
31        list_del(&cli->list);
32        ev_io_stop(loop, &cli->io);
33        close(cli->io.fd);
34        free(cli);
35}
36
37static int og_client_payload_too_large(struct og_client *cli)
38{
39        char buf[] = "HTTP/1.1 413 Payload Too Large\r\n"
40                     "Content-Length: 0\r\n\r\n";
41
42        send(og_client_socket(cli), buf, strlen(buf), 0);
43
44        return -1;
45}
46
47static int og_client_state_recv_hdr_rest(struct og_client *cli)
48{
49        char *ptr;
50
51        ptr = strstr(cli->buf, "\r\n\r\n");
52        if (!ptr)
53                return 0;
54
55        cli->msg_len = ptr - cli->buf + 4;
56
57        ptr = strstr(cli->buf, "Content-Length: ");
58        if (ptr) {
59                sscanf(ptr, "Content-Length: %i[^\r\n]", &cli->content_length);
60                if (cli->content_length < 0)
61                        return -1;
62                cli->msg_len += cli->content_length;
63        }
64
65        ptr = strstr(cli->buf, "Authorization: ");
66        if (ptr)
67                sscanf(ptr, "Authorization: %63[^\r\n]", cli->auth_token);
68
69        return 1;
70}
71
72static int og_client_recv(struct og_client *cli, int events)
73{
74        struct ev_io *io = &cli->io;
75        int ret;
76
77        if (events & EV_ERROR) {
78                syslog(LOG_ERR, "unexpected error event from client %s:%hu\n",
79                               inet_ntoa(cli->addr.sin_addr),
80                               ntohs(cli->addr.sin_port));
81                return 0;
82        }
83
84        ret = recv(io->fd, cli->buf + cli->buf_len,
85                   sizeof(cli->buf) - cli->buf_len, 0);
86        if (ret <= 0) {
87                if (ret < 0) {
88                        syslog(LOG_ERR, "error reading from client %s:%hu (%s)\n",
89                               inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port),
90                               strerror(errno));
91                }
92                return ret;
93        }
94
95        return ret;
96}
97
98static void og_client_read_cb(struct ev_loop *loop, struct ev_io *io, int events)
99{
100        struct og_client *cli;
101        int ret;
102
103        cli = container_of(io, struct og_client, io);
104
105        ret = og_client_recv(cli, events);
106        if (ret <= 0)
107                goto close;
108
109        ev_timer_again(loop, &cli->timer);
110
111        cli->buf_len += ret;
112        if (cli->buf_len >= sizeof(cli->buf)) {
113                syslog(LOG_ERR, "client request from %s:%hu is too long\n",
114                       inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port));
115                og_client_payload_too_large(cli);
116                goto close;
117        }
118
119        switch (cli->state) {
120        case OG_CLIENT_RECEIVING_HEADER:
121                ret = og_client_state_recv_hdr_rest(cli);
122                if (ret < 0)
123                        goto close;
124                if (!ret)
125                        return;
126
127                cli->state = OG_CLIENT_RECEIVING_PAYLOAD;
128                /* Fall through. */
129        case OG_CLIENT_RECEIVING_PAYLOAD:
130                /* Still not enough data to process request. */
131                if (cli->buf_len < cli->msg_len)
132                        return;
133
134                cli->state = OG_CLIENT_PROCESSING_REQUEST;
135                /* fall through. */
136        case OG_CLIENT_PROCESSING_REQUEST:
137                ret = og_client_state_process_payload_rest(cli);
138                if (ret < 0) {
139                        syslog(LOG_ERR, "Failed to process HTTP request from %s:%hu\n",
140                               inet_ntoa(cli->addr.sin_addr),
141                               ntohs(cli->addr.sin_port));
142                }
[8a0a32c]143                goto close;
[48de515]144        default:
145                syslog(LOG_ERR, "unknown state, critical internal error\n");
146                goto close;
147        }
148        return;
149close:
150        ev_timer_stop(loop, &cli->timer);
151        og_client_release(loop, cli);
152}
153
154enum og_agent_state {
155        OG_AGENT_RECEIVING_HEADER       = 0,
156        OG_AGENT_RECEIVING_PAYLOAD,
157        OG_AGENT_PROCESSING_RESPONSE,
158};
159
160static int og_agent_state_recv_hdr_rest(struct og_client *cli)
161{
162        char *ptr;
163
164        ptr = strstr(cli->buf, "\r\n\r\n");
165        if (!ptr)
166                return 0;
167
168        cli->msg_len = ptr - cli->buf + 4;
169
170        ptr = strstr(cli->buf, "Content-Length: ");
171        if (ptr) {
172                sscanf(ptr, "Content-Length: %i[^\r\n]", &cli->content_length);
173                if (cli->content_length < 0)
174                        return -1;
175                cli->msg_len += cli->content_length;
176        }
177
178        return 1;
179}
180
181static void og_agent_reset_state(struct og_client *cli)
182{
183        cli->state = OG_AGENT_RECEIVING_HEADER;
184        cli->buf_len = 0;
185        cli->content_length = 0;
186        memset(cli->buf, 0, sizeof(cli->buf));
187}
188
[2d68f8a]189#define OG_AGENT_CMD_TIMEOUT 900
190
[48de515]191static void og_agent_deliver_pending_cmd(struct og_client *cli)
192{
[2d68f8a]193        struct timeval now, elapsed;
[48de515]194        const struct og_cmd *cmd;
195
196        cmd = og_cmd_find(inet_ntoa(cli->addr.sin_addr));
197        if (!cmd)
198                return;
199
[2d68f8a]200        gettimeofday(&now, NULL);
201        timersub(&now, &cmd->tv, &elapsed);
202        if (elapsed.tv_sec >= OG_AGENT_CMD_TIMEOUT) {
203                og_dbi_update_action(cmd->id, false);
204                og_cmd_free(cmd);
205                return;
206        }
207
[48de515]208        og_send_request(cmd->method, cmd->type, &cmd->params, cmd->json);
209        cli->last_cmd_id = cmd->id;
210
211        og_cmd_free(cmd);
212}
213
214static void og_agent_read_cb(struct ev_loop *loop, struct ev_io *io, int events)
215{
216        struct og_client *cli;
217        int ret;
218
219        cli = container_of(io, struct og_client, io);
220
221        ret = og_client_recv(cli, events);
222        if (ret <= 0)
223                goto close;
224
225        ev_timer_again(loop, &cli->timer);
226
227        cli->buf_len += ret;
228        if (cli->buf_len >= sizeof(cli->buf)) {
229                syslog(LOG_ERR, "client request from %s:%hu is too long\n",
230                       inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port));
231                goto close;
232        }
233
234        switch (cli->state) {
235        case OG_AGENT_RECEIVING_HEADER:
236                ret = og_agent_state_recv_hdr_rest(cli);
237                if (ret < 0)
238                        goto close;
239                if (!ret)
240                        return;
241
242                cli->state = OG_AGENT_RECEIVING_PAYLOAD;
243                /* Fall through. */
244        case OG_AGENT_RECEIVING_PAYLOAD:
245                /* Still not enough data to process request. */
246                if (cli->buf_len < cli->msg_len)
247                        return;
248
249                cli->state = OG_AGENT_PROCESSING_RESPONSE;
250                /* fall through. */
251        case OG_AGENT_PROCESSING_RESPONSE:
252                ret = og_agent_state_process_response(cli);
253                if (ret < 0) {
254                        goto close;
255                } else if (ret == 0) {
256                        og_agent_deliver_pending_cmd(cli);
257                }
258
259                og_agent_reset_state(cli);
260                break;
261        default:
262                syslog(LOG_ERR, "unknown state, critical internal error\n");
263                goto close;
264        }
265        return;
266close:
267        ev_timer_stop(loop, &cli->timer);
268        og_client_release(loop, cli);
269}
270
271static void og_client_timer_cb(struct ev_loop *loop, ev_timer *timer, int events)
272{
273        struct og_client *cli;
274
275        cli = container_of(timer, struct og_client, timer);
[8a0a32c]276        if (cli->agent) {
[48de515]277                ev_timer_again(loop, &cli->timer);
278                return;
279        }
280        syslog(LOG_ERR, "timeout request for client %s:%hu\n",
281               inet_ntoa(cli->addr.sin_addr), ntohs(cli->addr.sin_port));
282
283        og_client_release(loop, cli);
284}
285
286static void og_agent_send_refresh(struct og_client *cli)
287{
288        struct og_msg_params params;
289        int err;
290
291        params.ips_array[0] = inet_ntoa(cli->addr.sin_addr);
292        params.ips_array_len = 1;
293
294        err = og_send_request(OG_METHOD_GET, OG_CMD_REFRESH, &params, NULL);
295        if (err < 0) {
296                syslog(LOG_ERR, "Can't send refresh to: %s\n",
297                       params.ips_array[0]);
298        } else {
299                syslog(LOG_INFO, "Sent refresh to: %s\n",
300                       params.ips_array[0]);
301        }
302}
303
304/* Shut down connection if there is no complete message after 10 seconds. */
305#define OG_CLIENT_TIMEOUT       10
306
307/* Agent client operation might take longer, shut down after 30 seconds. */
308#define OG_AGENT_CLIENT_TIMEOUT 30
309
[3b1f2c2]310#define OG_TCP_KEEPALIVE_IDLE   60
311#define OG_TCP_KEEPALIVE_INTL   30
312#define OG_TCP_KEEPALIVE_CNT    4
313
[48de515]314int socket_rest, socket_agent_rest;
315
316void og_server_accept_cb(struct ev_loop *loop, struct ev_io *io, int events)
317{
[3b1f2c2]318        int intl = OG_TCP_KEEPALIVE_INTL, cnt = OG_TCP_KEEPALIVE_CNT;
319        int on = 1, idle = OG_TCP_KEEPALIVE_IDLE;
[48de515]320        struct sockaddr_in client_addr;
321        socklen_t addrlen = sizeof(client_addr);
[1f13855]322        struct og_client_wol *cli_wol;
[48de515]323        struct og_client *cli;
324        int client_sd;
325
326        if (events & EV_ERROR)
327                return;
328
329        client_sd = accept(io->fd, (struct sockaddr *)&client_addr, &addrlen);
330        if (client_sd < 0) {
331                syslog(LOG_ERR, "cannot accept client connection\n");
332                return;
333        }
334
[3b1f2c2]335        setsockopt(client_sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(int));
336        setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(int));
337        setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPINTVL, &intl, sizeof(int));
338        setsockopt(client_sd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(int));
339
[1f13855]340        cli_wol = og_client_wol_find(&client_addr.sin_addr);
341        if (cli_wol)
342                og_client_wol_destroy(cli_wol);
343
[48de515]344        cli = (struct og_client *)calloc(1, sizeof(struct og_client));
345        if (!cli) {
346                close(client_sd);
347                return;
348        }
349        memcpy(&cli->addr, &client_addr, sizeof(client_addr));
350
[79e7e2b]351        if (io->fd == socket_agent_rest) {
[48de515]352                cli->agent = true;
353                ev_io_init(&cli->io, og_agent_read_cb, client_sd, EV_READ);
[79e7e2b]354        } else {
[48de515]355                ev_io_init(&cli->io, og_client_read_cb, client_sd, EV_READ);
[79e7e2b]356        }
[48de515]357
358        ev_io_start(loop, &cli->io);
359        if (io->fd == socket_agent_rest) {
360                ev_timer_init(&cli->timer, og_client_timer_cb,
361                              OG_AGENT_CLIENT_TIMEOUT, 0.);
362        } else {
363                ev_timer_init(&cli->timer, og_client_timer_cb,
364                              OG_CLIENT_TIMEOUT, 0.);
365        }
366        ev_timer_start(loop, &cli->timer);
367        og_client_add(cli);
368
369        if (io->fd == socket_agent_rest) {
370                og_agent_send_refresh(cli);
371        }
372}
373
374int og_socket_server_init(const char *port)
375{
376        struct sockaddr_in local;
377        int sd, on = 1;
378
379        sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
380        if (sd < 0) {
381                syslog(LOG_ERR, "cannot create main socket\n");
382                return -1;
383        }
384        setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(int));
385
386        local.sin_addr.s_addr = htonl(INADDR_ANY);
387        local.sin_family = AF_INET;
388        local.sin_port = htons(atoi(port));
389
390        if (bind(sd, (struct sockaddr *) &local, sizeof(local)) < 0) {
391                close(sd);
392                syslog(LOG_ERR, "cannot bind socket\n");
393                return -1;
394        }
395
396        listen(sd, 250);
397
398        return sd;
399}
Note: See TracBrowser for help on using the repository browser.