aboutsummaryrefslogtreecommitdiff
blob: 2f6d34f0d48987419f2f7b780139549e7f149ee5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# R overlay -- roverlay package, argutil
# -*- coding: utf-8 -*-
# Copyright (C) 2012, 2013 André Erdmann <dywi@mailerd.de>
# Distributed under the terms of the GNU General Public License;
# either version 2 of the License, or (at your option) any later version.

import os
import argparse
import pwd
import grp
import sys

import roverlay.config.entrymap
from roverlay.config.entryutil import deref_entry_safe

# ref
LOG_LEVELS = roverlay.config.entrymap.LOG_LEVEL

def is_log_level ( s ):
   sup = s.upper()
   if sup in LOG_LEVELS:
      return sup

   raise argparse.ArgumentTypeError ( "not a log level: {}".format ( s ) )
# --- end of is_log_level (...) ---

def get_uid ( user ):
   try:
      return int ( user )
      #pwd.getpwuid(^).pw_uid
   except ValueError:
      pass
   return pwd.getpwnam ( user ).pw_uid

def get_gid ( group ):
   try:
      return int ( group )
      #grp.getgrgid(^).gr_gid
   except ValueError:
      pass
   return grp.getgrnam ( group ).gr_gid

def is_uid ( value ):
   try:
      return get_uid ( value )
   except:
      pass
   raise argparse.ArgumentTypeError (
      "no such user/uid: {}".format ( value )
   )

def is_gid ( value ):
   try:
      return get_gid ( value )
   except:
      pass
   raise argparse.ArgumentTypeError (
      "no such group/gid: {}".format ( value )
   )

def is_fs_file ( value ):
   f = os.path.abspath ( value )
   if not os.path.isfile ( f ):
      raise argparse.ArgumentTypeError (
         "{!r} is not a file.".format ( value )
      )
   return f

def couldbe_fs_file ( value ):
   if value:
      f = os.path.abspath ( value )
      if not os.path.exists ( f ) or os.path.isfile ( f ):
         return f

   raise argparse.ArgumentTypeError (
      "{!r} is not a file.".format ( value )
   )

def couldbe_stdout_or_file ( value ):
   return value if value == "-" else couldbe_fs_file ( value )

def is_fs_dir ( value ):
   d = os.path.abspath ( value )
   if not os.path.isdir ( d ):
      raise argparse.ArgumentTypeError (
         "{!r} is not a directory.".format ( value )
      )
   return d

def is_fs_file_or_dir ( value ):
   f = os.path.abspath ( value )
   if os.path.isdir ( f ) or os.path.isfile ( f ):
      return f
   else:
      raise argparse.ArgumentTypeError (
         "{!r} is neither a file nor a directory.".format ( value )
      )

def couldbe_fs_dir ( value ):
   d = os.path.abspath ( value )
   if os.path.exists ( d ) and not os.path.isdir ( d ):
      raise argparse.ArgumentTypeError (
         "{!r} cannot be a directory.".format ( value )
      )
   return d

def is_fs_dir_or_void ( value ):
   if value:
      return is_fs_dir ( value )
   else:
      return ''

def is_fs_file_or_void ( value ):
   if value:
      return is_fs_file ( value )
   else:
      return ''

def is_config_opt ( value ):
   k = value.partition ( '=' ) [0]
   try:
      map_entry = deref_entry_safe ( k )
   except KeyError:
      raise argparse.ArgumentTypeError (
         "no such config option: {!r}".format ( k )
      )
   else:
      return value

def dirstr ( value ):
   if value:
      if value[0] == '~':
         return value.rstrip ( os.path.sep )
      else:
         return os.path.abspath ( value )
   else:
      raise argparse.ArgumentTypeError (
         "cannot create dir-string for {!r}".format ( value )
      )

def dirstr_existing ( value ):
   dstr    = dirstr ( value )
   dirpath = os.path.abspath ( os.path.expanduser ( dstr ) )
   if os.path.isdir ( dirpath ):
      return dstr
   else:
      raise argparse.ArgumentTypeError (
         "directory {!r} does not exist!".format ( dstr )
      )

def couldbe_dirstr_existing ( value ):
   dstr    = dirstr ( value )
   dirpath = os.path.abspath ( os.path.expanduser ( dstr ) )
   if not os.path.lexists ( dirpath ) or os.path.isdir ( dirpath ):
      return dstr
   else:
      raise argparse.ArgumentTypeError (
         "{!r} cannot be a directory.".format ( dstr )
      )

def couldbe_dirstr_existing_or_empty ( value ):
   if not value:
      return ""
   else:
      return couldbe_dirstr_existing ( value )

class ArgumentParserError ( Exception ):
   pass

class ArgumentGroupExists ( ArgumentParserError ):
   pass

class SubparserExists ( ArgumentParserError ):
   pass

class ArgumentFlagException ( ArgumentParserError ):
   pass


class ArgumentParserProxy ( object ):

   ARG_ADD_DEFAULT  = 2**0
   # OPT_IN: store True
   ARG_OPT_IN       = 2**1
   # OPT_OUT: store False
   ARG_OPT_OUT      = 2**2
   ARG_SHARED       = 2**4
   ARG_HELP_DEFAULT = 2**5
   ARG_META_FILE    = 2**6
   ARG_META_DIR     = 2**7
   ARG_INVERSE      = 2**8

   ARG_SHARED_INVERSE = ARG_SHARED | ARG_INVERSE
   ARG_META_FILEDIR   = ARG_META_FILE | ARG_META_DIR
   ARG_WITH_DEFAULT   = ARG_ADD_DEFAULT | ARG_HELP_DEFAULT

   STR_TRUE     = 'enabled'
   STR_FALSE    = 'disabled'
   STR_SUPPRESS = 'keep'


   @classmethod
   def create_new_parser ( cls, defaults=None, **kwargs ):
      parser = argparse.ArgumentParser ( **kwargs )
      return cls ( parser, defaults=defaults )
   # --- end of create_new_parser (...) ---

   @classmethod
   def wrap ( cls, parser, defaults=None ):
      return cls ( parser, defaults=defaults )
   # --- end of wrap (...) ---

   def __init__ ( self, parser, defaults=None ):
      super ( ArgumentParserProxy, self ).__init__()
      self.parser = parser

      if defaults is None:
         self.defaults = dict()
      elif isinstance ( defaults, dict ):
         self.defaults = defaults
      else:
         self.defaults = dict ( defaults )

      self._argument_groups = dict()
      self._subparsers      = dict()
      self._subparser_ctrl  = None

      self.parsed = None
   # --- end of __init__ (...) ---

   def add_subparsers ( self, ignore_exist=False, **kwargs ):
      if self._subparser_ctrl is None:
         self._subparser_ctrl = self.parser.add_subparsers ( **kwargs )
      elif not ignore_exist:
         raise AssertionError ( "add_subparsers() already called!" )

      return self._subparser_ctrl
   # --- end of add_subparsers (...) ---

   def add_subparser ( self,
      command, defaults=True, proxy_cls=None, **parser_kwargs
   ):
      if command in self._subparsers:
         raise SubparserExists ( command )
      else:
         if proxy_cls is None:
            get_proxy = ArgumentParserProxy.wrap
         elif proxy_cls is True:
            get_proxy = self.__class__.wrap
         elif hasattr ( proxy_cls, 'wrap' ):
            get_proxy = proxy_cls.wrap
         else:
            get_proxy = proxy_cls

         parser = (
            self.add_subparsers ( ignore_exist=True ).add_parser (
               command, **parser_kwargs
            )
         )

         proxy = get_proxy (
            parser,
            defaults = ( self.defaults if defaults is True else defaults )
         )
         self._subparsers [command] = proxy
         return proxy
   # --- end of add_subparser (...) ---

   def get_subparser ( self, name ):
      return self._subparsers [name]
   # --- end of get_subparser (...) ---

   def get_options ( self ):
      return self.parsed
   # --- end of get_options (...) ---

   def get_commands ( self ):
      return ()
   # --- end of get_commands (...) ---

   def get_default ( self, key, *args ):
      return self.defaults.get ( key, *args )
   # --- end of get_default (...) ---

   def apply_arg_flags ( self, kwargs, flags ):

      if flags & self.ARG_SHARED and 'default' not in kwargs:
         kwargs ['default'] = argparse.SUPPRESS

      if flags & self.ARG_OPT_IN:
         if flags & self.ARG_OPT_OUT:
            raise ArgumentFlagException (
               "opt-in and opt-out are mutually exclusive."
            )
         else:
            kwargs ['action']  = 'store_true'
            if 'default' not in kwargs:
               kwargs ['default'] = False

      elif flags & self.ARG_OPT_OUT:
         kwargs ['action']  = 'store_false'
         if 'default' not in kwargs:
            kwargs ['default'] = True
      # -- end if <opt-in/opt-out>

      if flags & self.ARG_ADD_DEFAULT:
         if 'defkey' in kwargs:
            key = kwargs.pop ( 'defkey' )
##         elif 'dest' in kwargs:
         else:
            key = kwargs ['dest']
##         else:
##            key = args[0].lstrip ( '-' ).lower().replace ( '-', '_' )

         if 'default' in kwargs:
            fallback = kwargs.pop ( "default" )
            kwargs ['default'] = self.defaults.get ( key, fallback )
         else:
            kwargs ['default'] = self.defaults [key]

      # -- end if <ARG_ADD_DEFAULT>

      if flags & self.ARG_HELP_DEFAULT:
         default = kwargs.get ( 'default', None )
         if default is argparse.SUPPRESS:
            default_str = self.STR_SUPPRESS

         elif default is True or default is False:
            if flags & self.ARG_INVERSE:
               default_str = self.STR_FALSE if default else self.STR_TRUE
            else:
               default_str = self.STR_TRUE if default else self.STR_FALSE

         else:
            default_str = '%(default)s'
         # -- end if

         if default_str:
            if 'help' in kwargs:
               kwargs ['help'] = kwargs ['help'] + ' [' + default_str + ']'
            else:
               kwargs ['help'] = '[' + default_str + ']'
      # -- end if <append default value to help>

      if flags & self.ARG_META_DIR:
         if flags & self.ARG_META_FILE:
            kwargs ['metavar'] = '<file|dir>'
         else:
            kwargs ['metavar'] = '<dir>'
      elif flags & self.ARG_META_FILE:
         kwargs ['metavar'] = '<file>'
      # -- end if <metavar>

      return kwargs
   # --- end of apply_arg_flags (...) ---

   def convert_kwargs ( self, kwargs, flags=0 ):
      if 'flags' in kwargs:
         kwargs_copy = dict ( kwargs )
         my_flags    = kwargs_copy.pop ( "flags" ) | flags
         return self.apply_arg_flags ( kwargs_copy, my_flags )
      elif flags:
         return self.apply_arg_flags ( dict ( kwargs ), flags )
      else:
         return kwargs
   # --- end of convert_kwargs (...) ---

   def arg ( self, *args, **kwargs ):
      return self.parser.add_argument (
         *args, **self.convert_kwargs ( kwargs )
      )
   # --- end of arg (...) ---

   def get_group_arg_adder ( self, key ):
      def wrapped_group_arg ( *args, **kwargs ):
         return self.group ( key ).add_argument (
            *args, **self.convert_kwargs ( kwargs )
         )
      # --- end of wrapped_group_arg (...) ---

      wrapped_group_arg.__doc__ = self.group_arg.__doc__
      wrapped_group_arg.__name__ = self.group_arg.__name__
      wrapped_group_arg.__dict__.update ( self.group_arg.__dict__ )
      return wrapped_group_arg
   # --- end of get_group_arg_adder (...) ---

   def group_arg ( self, key, *args, **kwargs ):
      return self.group ( key ).add_argument (
         *args, **self.convert_kwargs ( kwargs )
      )
   # --- end of group_arg (...) ---

   def get_args_to_parse ( self ):
      return sys.argv[1:]
   # --- end of get_args_to_parse (...) ---

   def parse_args ( self, args=None, namespace=None ):
      self.parsed = self.parser.parse_args (
         args      = ( self.get_args_to_parse() if args is None else args ),
         namespace = namespace,
      )
      return self.parsed
   # --- end of parse_args (...) ---

   def parse ( self, *args, **kwargs ):
      # likely overridden by derived classes
      return self.parse_args ( *args, **kwargs )
   # --- end of parse (...) ---

   def add_argument_group ( self, key, **kwargs ):
      if key in self._argument_groups:
         raise ArgumentGroupExists ( key )
      else:
         self._argument_groups [key] = (
            self.parser.add_argument_group ( **kwargs )
         )
         return self.get_group_arg_adder ( key )
   # --- end of add_argument_group (...) ---

   def group ( self, key ):
      return self._argument_groups [key]
   # --- end of group (...) ---

# --- end of ArgumentParserProxy ---