Manage aws security group rules

Published on:

Python script to allow you to easily add and remove AWS security group rules.

#!/usr/bin/env python3

# http://boto3.readthedocs.org/en/latest/reference/services/ec2.html#EC2.Vpc.security_groups

import boto3
from botocore.client import ClientError
import sys
import argparse
import re

class MyAP(argparse.ArgumentParser):
  """
  This just makes the parser print out the full help message on error
  instead of just telling you what you did wrong.
  """
  def error(self, message):
    sys.stderr.write('error: %s\n' % message)
    self.print_help()
    sys.exit(2)

def build_parser():
  parser = MyAP(description=
    """
    Allow or revoke inbound access for AWS security groups.
    Groups are specified by REGEX_PATTERN.
    TO_PORT defaults to FROM_PORT.
    if FROM_PORT contains multiples TO_PORT is ignored.
    CIDR_RANGES must be CIDR even for singles.
    AWS_PROFILE=hip-prod AWS_DEFAULT_REGION=us-east-1 ./add-sg-rule.py (options)
    """)
  parser.add_argument(
    '-r',
    '--regex-pattern',
    required=True,
    type=str,
    help='regex pattern to match against security group GroupName')
  parser.add_argument(
    '-p',
    '--protocol',
    default='tcp',
    type=str,
    help='either tcp or udp')
  parser.add_argument(
    '-f',
    '--from-port',
    help='starting port number. or space delimited list of port numbers.')
  parser.add_argument(
    '-t',
    '--to-port',
    help='ending port number (defaults to FROM_PORT)')
  parser.add_argument(
    '-c',
    '--cidr-ranges',
    type=str,
    help='space delimited list of source CIDR ranges')
  parser.add_argument(
    '-d',
    '--dry-run',
    action='store_true',
    help='if specified we do nothing except see if change would have worked')
  parser.add_argument(
    '--revoke',
    action='store_true',
    help='if specified we revoke access instead of grant access')
  parser.add_argument(
    '--list-matching-groups',
    action='store_true',
    help='if specified we just list what security groups matched the REGEX_PATTERN')
  return parser

def get_sgs_for_name_pattern(pattern):
  sgs = []
  p = re.compile(pattern)
  for g in ec2.describe_security_groups()['SecurityGroups']:
    if p.match(g['GroupName']):
      sgs.append(resource.SecurityGroup(g['GroupId']))
  return sgs

def authorize_ingress(args):
  sgs = get_sgs_for_name_pattern(args.regex_pattern)
  for sg in sgs:
    if args.list_matching_groups:
      print(sg.group_name)
    else:
      # we loop here rathern than specifying multiple IpRanges
      # less effecient, but duplicate errors or rule doesn't exist
      # errors won't keep is from trying the next in the list.
      # much more useful for the user.
      for r in args.cidr_ranges.split():
        perms = {
          'IpProtocol': args.protocol,
          'FromPort': int(args.from_port),
          'ToPort': int(args.to_port),
          'IpRanges': [{'CidrIp': r}]
        }
        print(str(perms))
        try:
          if args.revoke:
            sg.revoke_ingress(
              DryRun=args.dry_run,
              IpPermissions=[perms]
            )
          else:
            sg.authorize_ingress(
              DryRun=args.dry_run,
              IpPermissions=[perms]
            )
        except ClientError as e:
          print((sg.group_name + ': ').ljust(20) + str(e))

ec2 = boto3.client('ec2')
resource = boto3.resource('ec2')

if __name__ == "__main__":
    parser = build_parser()
    args = parser.parse_args(sys.argv[1:])
    if not args.list_matching_groups:
      if not args.from_port:
        parser.error('FROM_PORT required unless LIST_MATCHING_GROUPS specified\n')
      if not args.cidr_ranges:
        parser.error('CIDR_RANGES required unless LIST_MATCHING_GROUPS specified\n')
    args.from_port = '0' if not args.from_port else args.from_port
    args.to_port = args.from_port
    ports = args.from_port.split()
    if len(ports) > 1:
      for port in ports:
        args.from_port = port
        args.to_port = port
        authorize_ingress(args)
    else:
      authorize_ingress(args)