def normalizeLink(link): pattern = re.search(r'/api/v[01]/', link) if pattern: return link.replace(pattern.group(), '') class ActivityUtils(object): @staticmethod def changeAgentOptions(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = ActivityUtils.getActivityByName(connection, sessionUrl, communityName, activityName) agentUrl = '%s/%s/agent' % (activityListUrl, activity.objectID) performGenericPatch(connection, agentUrl, optionsDict) @staticmethod def getActivityByName(connection, sessionUrl, communityName, activityName): communityListUrl = "%s/ixload/test/activeTest/communityList" % sessionUrl communityList = connection.httpGet(url=communityListUrl) community = getItemByName(communityList, communityName) if community is None: raise Exception('Community %s cannot be found.' % communityName) activityListUrl = "%s/%s/activityList" % (communityListUrl, community.objectID) activityList = connection.httpGet(url=activityListUrl) activity = getItemByName(activityList, activityName) if activity is None: raise Exception('Community %s does not have an activity named %s.' % (communityName, activityName)) return (activityListUrl, activity) class NetworkUtils(object): @staticmethod def getStackUrlByCommunityName(connection, sessionUrl, communityName): communityListUrl = "%s/ixload/test/activeTest/communityList" % sessionUrl communityList = connection.httpGet(url=communityListUrl) community = getItemByName(communityList, communityName) if community is None: raise Exception('Community %s cannot be found.' % communityName) stackUrl = "%s/%s/network/stack" % (communityListUrl, community.objectID) return stackUrl @staticmethod def _getRangeListUrl(connection, nodes, pluginName, rangeListType): for node in nodes: if node.name == pluginName: for link in node.links: if link.rel == rangeListType: return normalizeLink(link.href) else: raise Exception('Plugin %s does not have a rangeList with rangeListType %s' % (pluginName, rangeListType)) else: childrenNodesUrl = None for link in node.links: if link.rel == 'childrenList': childrenNodesUrl = normalizeLink(link.href) break if childrenNodesUrl is None: return childrenNodes = connection.httpGet(url=childrenNodesUrl) return NetworkUtils._getRangeListUrl(connection, childrenNodes, pluginName, rangeListType) @staticmethod def _getPluginUrl(connection, nodes, pluginName): for node in nodes: if node.name == pluginName: return node._url_ else: childrenNodesUrl = None for link in node.links: if link.rel == 'childrenList': childrenNodesUrl = normalizeLink(link.href) break if childrenNodesUrl is None: return childrenNodes = connection.httpGet(url=childrenNodesUrl) return NetworkUtils._getPluginUrl(connection, childrenNodes, pluginName) @staticmethod def getParentPluginChildrenUrl(connection, nodes, parentPluginName): for node in nodes: if node.name == parentPluginName: for link in node.links: if link.rel == 'childrenList': return normalizeLink(link.href) else: raise Exception('Plugin %s does not have the childrenList option.' % parentPluginName) else: childrenNodesUrl = None for link in node.links: if link.rel == 'childrenList': childrenNodesUrl = normalizeLink(link.href) break if childrenNodesUrl is None: return childrenNodes = connection.httpGet(url=childrenNodesUrl) return NetworkUtils.getParentPluginChildrenUrl(connection, childrenNodes, parentPluginName) @staticmethod def getRangeListUrl(connection, sessionUrl, communityName, pluginName, rangeListType): stackUrl = NetworkUtils.getStackUrlByCommunityName(connection, sessionUrl, communityName) stack = connection.httpGet(url=stackUrl) nodes = [stack] rangeListUrl = NetworkUtils._getRangeListUrl(connection, nodes, pluginName, rangeListType) if rangeListUrl is None: raise Exception('Plugin %s under community %s does not have a rangeList of type %s.' % (pluginName, communityName, rangeListType)) return rangeListUrl @staticmethod def getRangeUrl(connection, sessionUrl, communityName, pluginName, rangeListType, rangeName): rangeListUrl = NetworkUtils.getRangeListUrl(connection, sessionUrl, communityName, pluginName, rangeListType) rangeList = connection.httpGet(url=rangeListUrl) r = getItemByName(rangeList, rangeName) if r is None: raise Exception('Community %s, plugin %s does not have a range named %s.' % (communityName, pluginName, rangeName)) return '%s/%s' % (rangeListUrl, r.objectID) @staticmethod def getIPRange(connection, sessionUrl, communityName, pluginName, rangeListType, rangeName): rangeUrl=NetworkUtils.getRangeUrl(connection, sessionUrl, communityName, pluginName, rangeListType, rangeName) range=connection.httpGet(url=rangeUrl) return range.ipAddress @staticmethod def getPluginUrl(connection, sessionUrl, communityName, pluginName): stackUrl = NetworkUtils.getStackUrlByCommunityName(connection, sessionUrl, communityName) stack = connection.httpGet(url=stackUrl) nodes = [stack] pluginUrl = NetworkUtils._getPluginUrl(connection, nodes, pluginName) if pluginUrl is None: raise Exception('Community %s does not have a plugin with the name %s.' % (communityName, pluginName)) return pluginUrl @staticmethod def addRange(connection, sessionUrl, communityName, pluginName, rangeListType, rangeOptions): rangeListUrl = NetworkUtils.getRangeListUrl(connection, sessionUrl, communityName, pluginName, rangeListType) performGenericPost(connection, rangeListUrl, rangeOptions) @staticmethod def changeRangeOptions(connection, sessionUrl, communityName, pluginName, rangeListType, rangeName, rangeOptions): rangeUrl = NetworkUtils.getRangeUrl(connection, sessionUrl, communityName, pluginName, rangeListType, rangeName) if rangeUrl is None: raise Exception('Invalid range name %s for community %s, plugin %s and rangeListType %s' % (rangeName, communityName, pluginName, rangeListType)) performGenericPatch(connection, rangeUrl, rangeOptions) @staticmethod def addPlugin(connection, sessionUrl, communityName, parentPluginName, pluginOptions): stackUrl = NetworkUtils.getStackUrlByCommunityName(connection, sessionUrl, communityName) stack = connection.httpGet(url=stackUrl) nodes = [stack] parentPluginChildrenUrl = NetworkUtils.getParentPluginChildrenUrl(connection, nodes, parentPluginName) if parentPluginChildrenUrl is None: raise Exception('Invalid parent plugin name %s under community %' % (parentPluginName, communityName)) performGenericPost(connection, parentPluginChildrenUrl, pluginOptions) @staticmethod def changePluginOptions(connection, sessionUrl, communityName, pluginName, pluginOptions): pluginUrl = NetworkUtils.getPluginUrl(connection, sessionUrl, communityName, pluginName) performGenericPatch(connection, pluginUrl, pluginOptions) @staticmethod def deletePlugin(connection, sessionUrl, communityName, pluginName): pluginUrl = NetworkUtils.getPluginUrl(connection, sessionUrl, communityName, pluginName) performGenericDelete(connection, pluginUrl, {}) @staticmethod def addIpRange(connection, sessionUrl, communityName, pluginName, rangeOptions): NetworkUtils.addRange(connection, sessionUrl, communityName, pluginName, 'rangeList', rangeOptions) @staticmethod def addIpsecPlugin(connection, sessionUrl, communityName, parentPluginName): NetworkUtils.addPlugin(connection, sessionUrl, communityName, parentPluginName, {'itemType': 'IPSecPlugin'}) class ActivityNetworkMixinUtils(ActivityUtils, NetworkUtils): pass class HttpUtils(ActivityNetworkMixinUtils): @staticmethod def enableSSLOnClient(connection, sessionUrl, communityName, activityName): options = {"enableSsl": True} HttpUtils.changeAgentOptions(connection, sessionUrl, communityName, activityName, options) @staticmethod def enableSSLOnServer(connection, sessionUrl, communityName, activityName): options = {"acceptSslConnections": True} HttpUtils.changeAgentOptions(connection, sessionUrl, communityName, activityName, options) class ImapUtils(ActivityNetworkMixinUtils): @staticmethod def addImapCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = ImapUtils.getActivityByName(connection, sessionUrl, communityName, activityName) imapCommandsUrl = '%s/%s/agent/pm/imapCommands' % (activityListUrl, activity.objectID) performGenericPost(connection, imapCommandsUrl, optionsDict) @staticmethod def addImapServerConfigMail(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = ImapUtils.getActivityByName(connection, sessionUrl, communityName, activityName) imapMailsUrl = '%s/%s/agent/pm/imapServerConfig/mails' % (activityListUrl, activity.objectID) performGenericPost(connection, imapMailsUrl, optionsDict) class POP3Utils(ActivityNetworkMixinUtils): @staticmethod def addPOP3Command(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = POP3Utils.getActivityByName(connection, sessionUrl, communityName, activityName) POP3CommandsUrl = '%s/%s/agent/commandList' % (activityListUrl, activity.objectID) performGenericPost(connection, POP3CommandsUrl, optionsDict) @staticmethod def addMailMessage(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = POP3Utils.getActivityByName(connection, sessionUrl, communityName, activityName) POP3MailsUrl = '%s/%s/agent/mailBox' % (activityListUrl, activity.objectID) performGenericPost(connection, POP3MailsUrl, optionsDict) @staticmethod def changeMailMessageType(connection, sessionUrl, communityName, activityName, commandNumber,optionsDict): activityListUrl, activity = POP3Utils.getActivityByName(connection, sessionUrl, communityName, activityName) POP3MailBoxUrl = '%s/%s/agent/mailBox' % (activityListUrl, activity.objectID) POP3Commands = connection.httpGet(url=POP3MailBoxUrl) for POP3Command in POP3Commands: if POP3Command.objectID == commandNumber: POP3CommandUrl = '%s/%s' % (POP3MailBoxUrl, POP3Command.objectID) performGenericPatch(connection,POP3CommandUrl, optionsDict) break else: raise Exception('Community %s, activity %s does not have a command with the number %s' % (communityName, activityName, commandNumber)) class IpsecUtils(ActivityNetworkMixinUtils): @staticmethod def changePortGroupDataOptions(connection, sessionUrl, communityName, pluginName, portGroupDataOptions): pluginUrl = IpsecUtils.getPluginUrl(connection, sessionUrl, communityName, pluginName) portGroupDataUrl = '%s/%s' % (pluginUrl, 'PortGroupData') performGenericPatch(connection, portGroupDataUrl, portGroupDataOptions) @staticmethod def changeIpsecTunnelSetupOptions(connection, sessionUrl, communityName, pluginName, ipsecTunnelSetupOptions): pluginUrl = IpsecUtils.getPluginUrl(connection, sessionUrl, communityName, pluginName) ipsecTunnelSetupUrl = '%s/%s' % (pluginUrl, 'SessionData/ipsecTunnelSetup') performGenericPatch(connection, ipsecTunnelSetupUrl, ipsecTunnelSetupOptions) class FtpUtils(ActivityNetworkMixinUtils): @staticmethod def addFtpCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = FtpUtils.getActivityByName(connection, sessionUrl, communityName, activityName) ftpCommandsUrl = '%s/%s/agent/actionList' % (activityListUrl, activity.objectID) performGenericPost(connection, ftpCommandsUrl, optionsDict) class DnsUtils(ActivityNetworkMixinUtils): @staticmethod def addDnsCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = DnsUtils.getActivityByName(connection, sessionUrl, communityName, activityName) dnsCommandsUrl = '%s/%s/agent/pm/dnsConfig/dnsQueries' % (activityListUrl, activity.objectID) performGenericPost(connection, dnsCommandsUrl, optionsDict) class TftpUtils(ActivityNetworkMixinUtils): @staticmethod def addTftpCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = TftpUtils.getActivityByName(connection, sessionUrl, communityName, activityName) tftpCommandsUrl = '%s/%s/agent/pm/cmdList' % (activityListUrl, activity.objectID) performGenericPost(connection, tftpCommandsUrl, optionsDict) class cifsUtils(ActivityNetworkMixinUtils): @staticmethod def addCifsCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = cifsUtils.getActivityByName(connection, sessionUrl, communityName, activityName) cifsCommandsUrl = '%s/%s/agent/pm/commands' % (activityListUrl, activity.objectID) performGenericPost(connection, cifsCommandsUrl, optionsDict) @staticmethod def addFileToCifsServer(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = cifsUtils.getActivityByName(connection, sessionUrl, communityName, activityName) cifsCommandsUrl = '%s/%s/agent/pm/sharedPool' % (activityListUrl, activity.objectID) performGenericPost(connection, cifsCommandsUrl, optionsDict) class StatelessPeerUtils(ActivityNetworkMixinUtils): @staticmethod def addStatelessPeerCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = StatelessPeerUtils.getActivityByName(connection, sessionUrl, communityName, activityName) StatelessPeerCommandsUrl = '%s/%s/agent/pm/protocolFlows' % (activityListUrl, activity.objectID) performGenericPost(connection, StatelessPeerCommandsUrl, optionsDict) class SMTPUtils(ActivityNetworkMixinUtils): @staticmethod def addSMTPCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = SMTPUtils.getActivityByName(connection, sessionUrl, communityName, activityName) SMTPCommandsUrl = '%s/%s/agent/commandList' % (activityListUrl, activity.objectID) performGenericPost(connection, SMTPCommandsUrl, optionsDict) @staticmethod def changeMailMesage(connection, sessionUrl, communityName, activityName,commandName, optionsDict): activityListUrl, activity = SMTPUtils.getActivityByName(connection, sessionUrl, communityName, activityName) smtpCommandsListUrl = '%s/%s/agent/commandList/' % (activityListUrl, activity.objectID) smtpCommands = connection.httpGet(url=smtpCommandsListUrl) for smtpCommand in smtpCommands: if smtpCommand.cmdName == commandName: smtpCommandUrl = '%s/%s' % (smtpCommandsListUrl, smtpCommand.objectID) performGenericPatch(connection, smtpCommandUrl, optionsDict) break else: raise Exception('Community %s, activity %s does not have a command named %s' % (communityName, activityName, commandName)) class RtspUtils(ActivityNetworkMixinUtils): @staticmethod def changeRtspCommand(connection, sessionUrl, communityName, activityName, commandName, optionsDict): activityListUrl, activity = RtspUtils.getActivityByName(connection, sessionUrl, communityName, activityName) rtspCommandsUrl = '%s/%s/agent/commandList' % (activityListUrl, activity.objectID) rtspCommands = connection.httpGet(rtspCommandsUrl) for rtspCommand in rtspCommands: if rtspCommand.cmdName == commandName: rtspCommandUrl = '%s/%s' % (rtspCommandsUrl, rtspCommand.objectID) performGenericPatch(connection, rtspCommandUrl, optionsDict) break else: raise Exception('Community %s, activity %s does not have a command named %s' % (communityName, activityName, commandName)) class VoipPeerUtils(ActivityNetworkMixinUtils): @staticmethod def changeScenarioSettings(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = VoipPeerUtils.getActivityByName(connection, sessionUrl, communityName, activityName) scenarioSettingsUrl = '%s/%s/agent/pm/scenarioSettings' % (activityListUrl, activity.objectID) performGenericPatch(connection, scenarioSettingsUrl, optionsDict) class IPTVUtils(ActivityNetworkMixinUtils): @staticmethod def addIPTVCommand(connection, sessionUrl, communityName, activityName, optionsDict): activityListUrl, activity = IPTVUtils.getActivityByName(connection, sessionUrl, communityName, activityName) iptvCommandsUrl = '%s/%s/agent/pm/commands' % (activityListUrl, activity.objectID) performGenericPost(connection, iptvCommandsUrl, optionsDict)