source: ogClient-Git/src/ogRest.py

Last change on this file was 30fdcce, checked in by Jose M. Guisado <jguisado@…>, 2 years ago

src: improve logging

Adds new logging handler redirecting messages to the log file
located in the Samba shared directory (applies to live mode
clients, i.e: ogLive)

Parses log level configuration from ogclient.json. See:

{

"opengnsys": {

...
"log": "INFO",
...

}
...

}

Adds --debug option to set root logger level to DEBUG when starting
ogClient. Overrides log level from config file.

In addition:

  • Replaces any occurence of print with a corresponding logging function.
  • Unsets log level for handlers, use root logger level instead.
  • Default level for root logger is INFO.
  • Replaces level from response log messages to debug (ogRest)
  • Property mode set to 100644
File size: 12.4 KB
Line 
1#
2# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
3#
4# This program is free software: you can redistribute it and/or modify it under
5# the terms of the GNU Affero General Public License as published by the
6# Free Software Foundation; either version 3 of the License, or
7# (at your option) any later version.
8
9import threading
10import platform
11import time
12from enum import Enum
13import json
14import queue
15import sys
16import os
17import signal
18import logging
19from logging.handlers import SysLogHandler
20
21from src.restRequest import *
22
23LOGGER = logging.getLogger()
24
25class ThreadState(Enum):
26        IDLE = 0
27        BUSY = 1
28
29class jsonBody():
30        def __init__(self, dictionary=None):
31                if dictionary:
32                        self.jsontree = dictionary
33                else:
34                        self.jsontree = {}
35
36        def add_element(self, key, value):
37                self.jsontree[key] = value
38
39        def dump(self):
40                return json.dumps(self.jsontree)
41
42class restResponse():
43        def __init__(self, response, json_body=None):
44                self.msg = ''
45                if response == ogResponses.BAD_REQUEST:
46                        self.msg = 'HTTP/1.0 400 Bad Request'
47                elif response == ogResponses.IN_PROGRESS:
48                        self.msg = 'HTTP/1.0 202 Accepted'
49                elif response == ogResponses.OK:
50                        self.msg = 'HTTP/1.0 200 OK'
51                elif response == ogResponses.INTERNAL_ERR:
52                        self.msg = 'HTTP/1.0 500 Internal Server Error'
53                elif response == ogResponses.UNAUTHORIZED:
54                        self.msg = 'HTTP/1.0 401 Unauthorized'
55                elif response == ogResponses.SERVICE_UNAVAILABLE:
56                        self.msg = 'HTTP/1.0 503 Service Unavailable'
57                elif response == ogResponses.EARLY_HINTS:
58                        self.msg = 'HTTP/1.0 103 Early Hints'
59                else:
60                        return self.msg
61
62                if response in {ogResponses.OK, ogResponses.IN_PROGRESS}:
63                        LOGGER.debug(self.msg[:ogRest.LOG_LENGTH])
64                else:
65                        LOGGER.warn(self.msg[:ogRest.LOG_LENGTH])
66
67                self.msg += '\r\n'
68
69                if json_body:
70                        self.msg += 'Content-Length: ' + str(len(json_body.dump()))
71                        self.msg += '\r\nContent-Type: application/json'
72                        self.msg += '\r\n\r\n' + json_body.dump()
73                else:
74                        self.msg += 'Content-Length: 0\r\n' \
75                                    'Content-Type: application/json\r\n\r\n'
76
77
78        def get(self):
79                return self.msg
80
81class ogThread():
82        def shellrun(client, request, ogRest):
83                if not request.getrun():
84                        response = restResponse(ogResponses.BAD_REQUEST)
85                        client.send(response.get())
86                        ogRest.state = ThreadState.IDLE
87                        return
88
89                try:
90                        shellout = ogRest.operations.shellrun(request, ogRest)
91                except ValueError as err:
92                        response = restResponse(ogResponses.INTERNAL_ERR)
93                        client.send(response.get())
94                        ogRest.state = ThreadState.IDLE
95                        return
96
97                if request.getEcho():
98                        json_body = jsonBody()
99                        json_body.add_element('out', shellout)
100                        response = restResponse(ogResponses.OK, json_body)
101                        client.send(response.get())
102                else:
103                        response = restResponse(ogResponses.OK)
104                        client.send(response.get())
105
106                ogRest.state = ThreadState.IDLE
107
108        def poweroff(ogRest):
109                time.sleep(2)
110                ogRest.operations.poweroff()
111
112        def reboot(ogRest):
113                ogRest.operations.reboot()
114
115        def session(client, request, ogRest):
116                try:
117                        ogRest.operations.session(request, ogRest)
118                except ValueError as err:
119                        response = restResponse(ogResponses.INTERNAL_ERR)
120                        client.send(response.get())
121                        ogRest.state = ThreadState.IDLE
122                        return
123
124                response = restResponse(ogResponses.OK)
125                client.send(response.get())
126                client.disconnect()
127
128        def software(client, request, path, ogRest):
129                try:
130                        software = ogRest.operations.software(request, path, ogRest)
131                except ValueError as err:
132                        response = restResponse(ogResponses.INTERNAL_ERR)
133                        client.send(response.get())
134                        ogRest.state = ThreadState.IDLE
135                        return
136
137                json_body = jsonBody()
138                json_body.add_element('partition', request.getPartition())
139                json_body.add_element('software', software)
140
141                response = restResponse(ogResponses.OK, json_body)
142                client.send(response.get())
143                ogRest.state = ThreadState.IDLE
144
145        def hardware(client, path, ogRest):
146                try:
147                        ogRest.operations.hardware(path, ogRest)
148                except ValueError as err:
149                        response = restResponse(ogResponses.INTERNAL_ERR)
150                        client.send(response.get())
151                        ogRest.state = ThreadState.IDLE
152                        return
153
154                json_body = jsonBody()
155                with open(path, 'r') as f:
156                        json_body.add_element('hardware', f.read())
157
158                response = restResponse(ogResponses.OK, json_body)
159                client.send(response.get())
160                ogRest.state = ThreadState.IDLE
161
162        def setup(client, request, ogRest):
163                try:
164                        out = ogRest.operations.setup(request, ogRest)
165                except ValueError as err:
166                        response = restResponse(ogResponses.INTERNAL_ERR)
167                        client.send(response.get())
168                        ogRest.state = ThreadState.IDLE
169                        return
170
171                json_body = jsonBody(out)
172
173                response = restResponse(ogResponses.OK, json_body)
174                client.send(response.get())
175                ogRest.state = ThreadState.IDLE
176
177        def image_restore(client, request, ogRest):
178                try:
179                        ogRest.operations.image_restore(request, ogRest)
180                except ValueError as err:
181                        response = restResponse(ogResponses.INTERNAL_ERR)
182                        client.send(response.get())
183                        ogRest.state = ThreadState.IDLE
184                        return
185
186                json_body = jsonBody()
187                json_body.add_element('disk', request.getDisk())
188                json_body.add_element('partition', request.getPartition())
189                json_body.add_element('image_id', request.getId())
190
191                response = restResponse(ogResponses.OK, json_body)
192                client.send(response.get())
193                ogRest.state = ThreadState.IDLE
194
195        def image_create(client, path, request, ogRest):
196                try:
197                        image_info = ogRest.operations.image_create(path,
198                                                                    request,
199                                                                    ogRest)
200                        software = ogRest.operations.software(request, path, ogRest)
201                except ValueError as err:
202                        response = restResponse(ogResponses.INTERNAL_ERR)
203                        client.send(response.get())
204                        ogRest.state = ThreadState.IDLE
205                        return
206
207                kibi = 1024
208                datasize = int(image_info['datasize']) * kibi
209
210                json_body = jsonBody()
211                json_body.add_element('disk', request.getDisk())
212                json_body.add_element('partition', request.getPartition())
213                json_body.add_element('code', request.getCode())
214                json_body.add_element('id', request.getId())
215                json_body.add_element('name', request.getName())
216                json_body.add_element('repository', request.getRepo())
217                json_body.add_element('software', software)
218                json_body.add_element('clonator', image_info['clonator'])
219                json_body.add_element('compressor', image_info['compressor'])
220                json_body.add_element('filesystem', image_info['filesystem'])
221                json_body.add_element('datasize', datasize)
222
223                response = restResponse(ogResponses.OK, json_body)
224                client.send(response.get())
225                ogRest.state = ThreadState.IDLE
226
227        def refresh(client, ogRest):
228                try:
229                        out = ogRest.operations.refresh(ogRest)
230                except ValueError as err:
231                        response = restResponse(ogResponses.INTERNAL_ERR)
232                        client.send(response.get())
233                        ogRest.state = ThreadState.IDLE
234                        return
235
236                json_body = jsonBody(out)
237
238                response = restResponse(ogResponses.OK, json_body)
239                client.send(response.get())
240                ogRest.state = ThreadState.IDLE
241
242class ogResponses(Enum):
243        BAD_REQUEST=0
244        IN_PROGRESS=1
245        OK=2
246        INTERNAL_ERR=3
247        UNAUTHORIZED=4
248        SERVICE_UNAVAILABLE=5
249        EARLY_HINTS=6
250
251class ogRest():
252        LOG_LENGTH = 32
253
254        def __init__(self, config):
255                self.proc = None
256                self.terminated = False
257                self.state = ThreadState.IDLE
258                self.CONFIG = config
259                self.mode = self.CONFIG['opengnsys']['mode']
260                self.samba_config = self.CONFIG['samba']
261
262                if self.mode == 'live':
263                        from src.live.ogOperations import OgLiveOperations
264                        self.operations = OgLiveOperations(self.CONFIG)
265                elif self.mode == 'virtual':
266                        from src.virtual.ogOperations import \
267                                OgVirtualOperations
268                        self.operations = OgVirtualOperations()
269                        threading.Thread(target=self.operations.check_vm_state_loop,
270                                         args=(self,)).start()
271                elif self.mode == 'linux':
272                        from src.linux.ogOperations import OgLinuxOperations
273                        self.operations = OgLinuxOperations()
274                elif self.mode == 'windows':
275                        from src.windows.ogOperations import OgWindowsOperations
276                        self.operations = OgWindowsOperations()
277                else:
278                        raise ValueError('Mode not supported.')
279
280        def process_request(self, request, client):
281                method = request.get_method()
282                URI = request.get_uri()
283
284                LOGGER.debug('Incoming request: %s%s', method, URI[:ogRest.LOG_LENGTH])
285
286                if (not "stop" in URI and
287                    not "reboot" in URI and
288                    not "poweroff" in URI and
289                    not "probe" in URI):
290                        if self.state == ThreadState.BUSY:
291                                LOGGER.warn('Request has been received '
292                                            'while ogClient is busy')
293                                response = restResponse(ogResponses.SERVICE_UNAVAILABLE)
294                                client.send(response.get())
295                                return
296                        else:
297                                self.state = ThreadState.BUSY
298
299                if ("GET" in method):
300                        if "hardware" in URI:
301                                self.process_hardware(client)
302                        elif ("software" in URI):
303                                self.process_software(client, request)
304                        elif ("run/schedule" in URI):
305                                self.process_schedule(client)
306                        elif "refresh" in URI:
307                                self.process_refresh(client)
308                        else:
309                                LOGGER.warn('Unsupported request: %s',
310                                            {URI[:ogRest.LOG_LENGTH]})
311                                response = restResponse(ogResponses.BAD_REQUEST)
312                                client.send(response.get())
313                                self.state = ThreadState.IDLE
314                elif ("POST" in method):
315                        if ("poweroff" in URI):
316                                self.process_poweroff(client)
317                        elif "probe" in URI:
318                                self.process_probe(client)
319                        elif ("reboot" in URI):
320                                self.process_reboot(client)
321                        elif ("shell/run" in URI):
322                                self.process_shellrun(client, request)
323                        elif ("session" in URI):
324                                self.process_session(client, request)
325                        elif ("setup" in URI):
326                                self.process_setup(client, request)
327                        elif ("image/restore" in URI):
328                                self.process_imagerestore(client, request)
329                        elif ("stop" in URI):
330                                self.process_stop(client)
331                        elif ("image/create" in URI):
332                                self.process_imagecreate(client, request)
333                        else:
334                                LOGGER.warn('Unsupported request: %s',
335                                            URI[:ogRest.LOG_LENGTH])
336                                response = restResponse(ogResponses.BAD_REQUEST)
337                                client.send(response.get())
338                                self.state = ThreadState.IDLE
339                else:
340                        response = restResponse(ogResponses.BAD_REQUEST)
341                        client.send(response.get())
342                        self.state = ThreadState.IDLE
343
344                return 0
345
346        def kill_process(self):
347                try:
348                        os.kill(self.proc.pid, signal.SIGTERM)
349                except:
350                        pass
351
352                time.sleep(2)
353                try:
354                        os.kill(self.proc.pid, signal.SIGKILL)
355                except:
356                        pass
357
358                self.state = ThreadState.IDLE
359
360        def process_reboot(self, client):
361                response = restResponse(ogResponses.IN_PROGRESS)
362                client.send(response.get())
363
364                if self.mode != 'virtual':
365                        client.disconnect()
366                        if self.state == ThreadState.BUSY:
367                                self.kill_process()
368
369                threading.Thread(target=ogThread.reboot, args=(self,)).start()
370
371        def process_poweroff(self, client):
372                response = restResponse(ogResponses.IN_PROGRESS)
373                client.send(response.get())
374
375                if self.mode != 'virtual':
376                        client.disconnect()
377                        if self.state == ThreadState.BUSY:
378                                self.kill_process()
379
380                threading.Thread(target=ogThread.poweroff, args=(self,)).start()
381
382        def process_probe(self, client):
383                try:
384                        status = self.operations.probe(self)
385                except:
386                        response = restResponse(ogResponses.INTERNAL_ERR)
387                        client.send(response.get())
388                        return
389
390                json_body = jsonBody()
391                for k, v in status.items():
392                        json_body.add_element(k, v)
393
394                if self.state != ThreadState.BUSY:
395                        response = restResponse(ogResponses.OK, json_body)
396                else:
397                        response = restResponse(ogResponses.IN_PROGRESS, json_body)
398
399                client.send(response.get())
400
401        def process_shellrun(self, client, request):
402                threading.Thread(target=ogThread.shellrun, args=(client, request, self,)).start()
403
404        def process_session(self, client, request):
405                threading.Thread(target=ogThread.session, args=(client, request, self,)).start()
406
407        def process_software(self, client, request):
408                path = '/tmp/CSft-' + client.ip + '-' + str(request.getPartition())
409                threading.Thread(target=ogThread.software, args=(client, request, path, self,)).start()
410
411        def process_hardware(self, client):
412                path = '/tmp/Chrd-' + client.ip
413                threading.Thread(target=ogThread.hardware, args=(client, path, self,)).start()
414
415        def process_schedule(self, client):
416                response = restResponse(ogResponses.OK)
417                client.send(response.get())
418                self.state = ThreadState.IDLE
419
420        def process_setup(self, client, request):
421                threading.Thread(target=ogThread.setup, args=(client, request, self,)).start()
422
423        def process_imagerestore(self, client, request):
424                threading.Thread(target=ogThread.image_restore, args=(client, request, self,)).start()
425
426        def process_stop(self, client):
427                client.disconnect()
428                if self.state == ThreadState.BUSY:
429                        self.kill_process()
430                        self.terminated = True
431
432                sys.exit(0)
433
434        def process_imagecreate(self, client, request):
435                path = '/tmp/CSft-' + client.ip + '-' + request.getPartition()
436                threading.Thread(target=ogThread.image_create, args=(client, path, request, self,)).start()
437
438        def process_refresh(self, client):
439                threading.Thread(target=ogThread.refresh, args=(client, self,)).start()
Note: See TracBrowser for help on using the repository browser.