Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pytest
import time
import logging
import logging.handlers
import responses
import http.client
import json
from cobra.internal.codec.jsoncodec import fromJSONStr, toJSONStr
from cobra.internal.codec.xmlcodec import toXMLStr
from string import Template
# import cobra.services
import cobra.mit.access
import cobra.mit.request
import cobra.mit.session
cobra.model.fv = pytest.importorskip("cobra.model.fv")
import cobra.model.pol
import cobra.mit.request
import cobra.mit.session
import cobra.mit.access
import cobra.model.fv
import cobra.model.pol
import cobra.model.infra
http.client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fh = logging.handlers.RotatingFileHandler(
'{0}.log'.format(os.path.splitext(__file__)[0]), maxBytes=1000000)
l3extRsNodeL3OutAtt = cobra.model.l3ext.RsNodeL3OutAtt(l3extLNodeP, tDn='topology/pod-2/node-202', rtrId='222.222.222.222', rtrIdLoopBack='yes')
l3extInfraNodeP = cobra.model.l3ext.InfraNodeP(l3extRsNodeL3OutAtt, fabricExtCtrlPeering='yes')
l3extLoopBackIfP = cobra.model.l3ext.LoopBackIfP(l3extRsNodeL3OutAtt, addr='222.222.222.222')
l3extRsNodeL3OutAtt2 = cobra.model.l3ext.RsNodeL3OutAtt(l3extLNodeP, tDn='topology/pod-1/node-201', rtrId='111.111.111.111', rtrIdLoopBack='yes')
l3extInfraNodeP2 = cobra.model.l3ext.InfraNodeP(l3extRsNodeL3OutAtt2, fabricExtCtrlPeering='yes')
l3extLoopBackIfP2 = cobra.model.l3ext.LoopBackIfP(l3extRsNodeL3OutAtt2, addr='111.111.111.111')
l3extLIfP = cobra.model.l3ext.LIfP(l3extLNodeP, tag='yellow-green', name='MPOD_OSPF_INTS')
ospfIfP = cobra.model.ospf.IfP(l3extLIfP, authKeyId='1', authType='none')
ospfRsIfPol = cobra.model.ospf.RsIfPol(ospfIfP, tnOspfIfPolName='MPOD_OSPF_P2P')
l3extRsNdIfPol = cobra.model.l3ext.RsNdIfPol(l3extLIfP)
l3extRsIngressQosDppPol = cobra.model.l3ext.RsIngressQosDppPol(l3extLIfP)
l3extRsEgressQosDppPol = cobra.model.l3ext.RsEgressQosDppPol(l3extLIfP)
l3extRsPathL3OutAtt = cobra.model.l3ext.RsPathL3OutAtt(l3extLIfP, tDn='topology/pod-2/paths-202/pathep-[eth1/20]', targetDscp='unspecified', encapScope='local', llAddr='::', mac='00:22:BD:F8:19:FF', mode='regular', encap='vlan-4', ifInstT='sub-interface', mtu='inherit', addr='203.1.1.1/30')
l3extRsPathL3OutAtt2 = cobra.model.l3ext.RsPathL3OutAtt(l3extLIfP, tDn='topology/pod-1/paths-201/pathep-[eth1/20]', targetDscp='unspecified', encapScope='local', llAddr='::', mac='00:22:BD:F8:19:FF', mode='regular', encap='vlan-4', ifInstT='sub-interface', mtu='inherit', addr='202.1.1.1/30')
l3extInstP = cobra.model.l3ext.InstP(l3extOut, matchT='AtleastOne', name='l3extInstPNamee75ff0551285bf11', prio='unspecified', targetDscp='unspecified')
fvRsCustQosPol = cobra.model.fv.RsCustQosPol(l3extInstP)
bgpExtP = cobra.model.bgp.ExtP(l3extOut)
ospfExtP = cobra.model.ospf.ExtP(l3extOut, areaCtrl='redistribute,summary', areaType='regular', areaCost='1', areaId='backbone')
c = cobra.mit.request.ConfigRequest()
c.addMo(fvTenant)
md.commit(c)
ctrlrInst = cobra.model.ctrlr.Inst(topMo)
fabricSetupPol = cobra.model.fabric.SetupPol(ctrlrInst, name='default')
fabricSetupP = cobra.model.fabric.SetupP(fabricSetupPol, tepPool='10.1.0.0/16', podId='2')
# commit the generated code to APIC
#print toXMLStr(topMo)
c = cobra.mit.request.ConfigRequest()
dbgOngoingAcMode = cobra.model.dbg.OngoingAcMode(fabricInst, mode='path', name='default')
c = cobra.mit.request.ConfigRequest()
c.addMo(fabricInst)
md.commit(c)
fvTenant = cobra.model.fv.Tenant(topMo, name='infra')
fvFabricExtConnP = cobra.model.fv.FabricExtConnP(fvTenant, rt='extended:as2-nn4:11:22', id='1', name='fabricExtConnPolName_9a92763eadbe259d')
fvPodConnP = cobra.model.fv.PodConnP(fvFabricExtConnP, id='2')
fvIp = cobra.model.fv.Ip(fvPodConnP, addr='200.1.1.1/32')
fvPodConnP2 = cobra.model.fv.PodConnP(fvFabricExtConnP, id='1')
fvIp2 = cobra.model.fv.Ip(fvPodConnP2, addr='100.1.1.1/32')
l3extFabricExtRoutingP = cobra.model.l3ext.FabricExtRoutingP(fvFabricExtConnP, name='MPOD_EXT_ROUTE_PROFILE')
l3extSubnet = cobra.model.l3ext.Subnet(l3extFabricExtRoutingP, ip='202.1.0.0/16')
l3extSubnet2 = cobra.model.l3ext.Subnet(l3extFabricExtRoutingP, ip='203.1.0.0/16')
fvPeeringP = cobra.model.fv.PeeringP(fvFabricExtConnP, type='automatic_with_full_mesh')
ospfIfPol = cobra.model.ospf.IfPol(fvTenant, nwT='p2p', pfxSuppress='inherit', name='MPOD_OSPF_P2P', prio='1', ctrl='advert-subnet,mtu-ignore', helloIntvl='10', rexmitIntvl='5', xmitDelay='1', cost='unspecified', deadIntvl='40')
l3extOut = cobra.model.l3ext.Out(fvTenant, name='MPOD_OSPF_OUT', enforceRtctrl='export', targetDscp='unspecified')
l3extRsEctx = cobra.model.l3ext.RsEctx(l3extOut, tnFvCtxName='overlay-1')
l3extLNodeP = cobra.model.l3ext.LNodeP(l3extOut, tag='yellow-green', name='MPOD_OSPF_NODES', targetDscp='unspecified')
l3extRsNodeL3OutAtt = cobra.model.l3ext.RsNodeL3OutAtt(l3extLNodeP, tDn='topology/pod-2/node-202', rtrId='222.222.222.222', rtrIdLoopBack='yes')
l3extInfraNodeP = cobra.model.l3ext.InfraNodeP(l3extRsNodeL3OutAtt, fabricExtCtrlPeering='yes')
l3extLoopBackIfP = cobra.model.l3ext.LoopBackIfP(l3extRsNodeL3OutAtt, addr='222.222.222.222')
l3extRsNodeL3OutAtt2 = cobra.model.l3ext.RsNodeL3OutAtt(l3extLNodeP, tDn='topology/pod-1/node-201', rtrId='111.111.111.111', rtrIdLoopBack='yes')
l3extInfraNodeP2 = cobra.model.l3ext.InfraNodeP(l3extRsNodeL3OutAtt2, fabricExtCtrlPeering='yes')
l3extLoopBackIfP2 = cobra.model.l3ext.LoopBackIfP(l3extRsNodeL3OutAtt2, addr='111.111.111.111')
l3extLIfP = cobra.model.l3ext.LIfP(l3extLNodeP, tag='yellow-green', name='MPOD_OSPF_INTS')
ospfIfP = cobra.model.ospf.IfP(l3extLIfP, authKeyId='1', authType='none')
ospfRsIfPol = cobra.model.ospf.RsIfPol(ospfIfP, tnOspfIfPolName='MPOD_OSPF_P2P')
l3extRsNdIfPol = cobra.model.l3ext.RsNdIfPol(l3extLIfP)
l3extRsIngressQosDppPol = cobra.model.l3ext.RsIngressQosDppPol(l3extLIfP)
l3extRsEgressQosDppPol = cobra.model.l3ext.RsEgressQosDppPol(l3extLIfP)
"""
# create a session and define the root
requests.packages.urllib3.disable_warnings()
auth = cobra.mit.session.LoginSession(URL, LOGIN, PASSWORD)
session = cobra.mit.access.MoDirectory(auth)
session.login()
root = cobra.model.pol.Uni('')
# test if tenant name is already in use
test_tenant(TENANT, session)
# model new tenant configuration
tenant = cobra.model.fv.Tenant(root, name=TENANT)
vrf = cobra.model.fv.Ctx(tenant, name=VRF)
bridge_domain = cobra.model.fv.BD(tenant, name=BRIDGEDOMAIN)
attached_vrf = cobra.model.fv.RsCtx(bridge_domain, tnFvCtxName=VRF)
subnet = cobra.model.fv.Subnet(bridge_domain, ip=GATEWAY, scope=SCOPE, name=SUBNETNAME)
#submit the configuration to the apic and print a success message
config_request = cobra.mit.request.ConfigRequest()
config_request.addMo(tenant)
session.commit(config_request)
print("\nNew Tenant, {}, has been created:\n\n{}\n".format(TENANT, config_request.data))
requests.packages.urllib3.disable_warnings()
auth = cobra.mit.session.LoginSession(URL, LOGIN, PASSWORD)
session = cobra.mit.access.MoDirectory(auth)
session.login()
root = cobra.model.pol.Uni('')
# test if tenant name is already in use
test_tenant(TENANT, session)
# model new tenant configuration
tenant = cobra.model.fv.Tenant(root, name=TENANT)
vrf = cobra.model.fv.Ctx(tenant, name=VRF)
bridge_domain = cobra.model.fv.BD(tenant, name=BRIDGEDOMAIN)
attached_vrf = cobra.model.fv.RsCtx(bridge_domain, tnFvCtxName=VRF)
subnet = cobra.model.fv.Subnet(bridge_domain, ip=GATEWAY, scope=SCOPE, name=SUBNETNAME)
#submit the configuration to the apic and print a success message
config_request = cobra.mit.request.ConfigRequest()
config_request.addMo(tenant)
session.commit(config_request)
print("\nNew Tenant, {}, has been created:\n\n{}\n".format(TENANT, config_request.data))
fvRsProv = cobra.model.fv.RsProv(fvAEPg, tnVzBrCPName=u'power_up', matchT=u'AtleastOne', prio=u'unspecified')
fvAEPg2 = cobra.model.fv.AEPg(fvAp, isAttrBasedEPg=u'no', matchT=u'AtleastOne', prio=u'unspecified', name=u'db', descr=u'')
fvRsPathAtt3 = cobra.model.fv.RsPathAtt(fvAEPg2, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2B]', instrImedcy=u'lazy', encap=vlan2, descr=u'', mode=u'regular')
fvRsPathAtt4 = cobra.model.fv.RsPathAtt(fvAEPg2, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2A]', instrImedcy=u'lazy', encap=vlan2, descr=u'', mode=u'regular')
fvRsDomAtt2 = cobra.model.fv.RsDomAtt(fvAEPg2, instrImedcy=u'lazy', resImedcy=u'lazy', encap=u'unknown', tDn=u'uni/phys-Heroes_phys')
fvRsCustQosPol2 = cobra.model.fv.RsCustQosPol(fvAEPg2, tnQosCustomPolName=u'')
fvRsBd2 = cobra.model.fv.RsBd(fvAEPg2, tnFvBDName=bridge_domain)
fvRsProv2 = cobra.model.fv.RsProv(fvAEPg2, tnVzBrCPName=u'sql', matchT=u'AtleastOne', prio=u'unspecified')
fvAEPg3 = cobra.model.fv.AEPg(fvAp, isAttrBasedEPg=u'no', matchT=u'AtleastOne', prio=u'unspecified', name=u'web', descr=u'')
fvRsCons2 = cobra.model.fv.RsCons(fvAEPg3, tnVzBrCPName=u'sql', prio=u'unspecified')
fvRsPathAtt5 = cobra.model.fv.RsPathAtt(fvAEPg3, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2B]', instrImedcy=u'lazy', encap=vlan3, descr=u'', mode=u'regular')
fvRsPathAtt6 = cobra.model.fv.RsPathAtt(fvAEPg3, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2A]', instrImedcy=u'lazy', encap=vlan3, descr=u'', mode=u'regular')
fvRsDomAtt3 = cobra.model.fv.RsDomAtt(fvAEPg3, instrImedcy=u'lazy', resImedcy=u'lazy', encap=u'unknown', tDn=u'uni/phys-Heroes_phys')
fvRsCustQosPol3 = cobra.model.fv.RsCustQosPol(fvAEPg3, tnQosCustomPolName=u'')
fvRsBd3 = cobra.model.fv.RsBd(fvAEPg3, tnFvBDName=bridge_domain)
fvRsProv3 = cobra.model.fv.RsProv(fvAEPg3, tnVzBrCPName=u'web', matchT=u'AtleastOne', prio=u'unspecified')
# commit the generated code to APIC
print(toXMLStr(fvTenant))
c = cobra.mit.request.ConfigRequest()
c.addMo(fvTenant)
md.commit(c)
# log into an APIC and create a directory object
ls = cobra.mit.session.LoginSession(URL, LOGIN, PASSWORD)
md = cobra.mit.access.MoDirectory(ls)
md.login()
# the top level object on which operations will be made
polUni = cobra.model.pol.Uni('')
fvTenant = cobra.model.fv.Tenant(polUni, tenant)
# build the request using cobra syntax
fvAp = cobra.model.fv.Ap(fvTenant, ownerKey=u'', name=application, prio=u'unspecified', ownerTag=u'', descr=u'')
fvAEPg = cobra.model.fv.AEPg(fvAp, isAttrBasedEPg=u'no', matchT=u'AtleastOne', prio=u'unspecified', name=u'app', descr=u'')
fvRsCons = cobra.model.fv.RsCons(fvAEPg, tnVzBrCPName=u'sql', prio=u'unspecified')
fvRsPathAtt = cobra.model.fv.RsPathAtt(fvAEPg, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2B]', instrImedcy=u'lazy', encap=vlan1, descr=u'', mode=u'regular')
fvRsPathAtt2 = cobra.model.fv.RsPathAtt(fvAEPg, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2A]', instrImedcy=u'lazy', encap=vlan1, descr=u'', mode=u'regular')
fvRsDomAtt = cobra.model.fv.RsDomAtt(fvAEPg, instrImedcy=u'lazy', resImedcy=u'lazy', encap=u'unknown', tDn=u'uni/phys-Heroes_phys')
fvRsCustQosPol = cobra.model.fv.RsCustQosPol(fvAEPg, tnQosCustomPolName=u'')
fvRsBd = cobra.model.fv.RsBd(fvAEPg, tnFvBDName=bridge_domain)
fvRsProv = cobra.model.fv.RsProv(fvAEPg, tnVzBrCPName=u'power_up', matchT=u'AtleastOne', prio=u'unspecified')
fvAEPg2 = cobra.model.fv.AEPg(fvAp, isAttrBasedEPg=u'no', matchT=u'AtleastOne', prio=u'unspecified', name=u'db', descr=u'')
fvRsPathAtt3 = cobra.model.fv.RsPathAtt(fvAEPg2, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2B]', instrImedcy=u'lazy', encap=vlan2, descr=u'', mode=u'regular')
fvRsPathAtt4 = cobra.model.fv.RsPathAtt(fvAEPg2, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2A]', instrImedcy=u'lazy', encap=vlan2, descr=u'', mode=u'regular')
fvRsDomAtt2 = cobra.model.fv.RsDomAtt(fvAEPg2, instrImedcy=u'lazy', resImedcy=u'lazy', encap=u'unknown', tDn=u'uni/phys-Heroes_phys')
fvRsCustQosPol2 = cobra.model.fv.RsCustQosPol(fvAEPg2, tnQosCustomPolName=u'')
fvRsBd2 = cobra.model.fv.RsBd(fvAEPg2, tnFvBDName=bridge_domain)
fvRsProv2 = cobra.model.fv.RsProv(fvAEPg2, tnVzBrCPName=u'sql', matchT=u'AtleastOne', prio=u'unspecified')
fvAEPg3 = cobra.model.fv.AEPg(fvAp, isAttrBasedEPg=u'no', matchT=u'AtleastOne', prio=u'unspecified', name=u'web', descr=u'')
fvRsCons2 = cobra.model.fv.RsCons(fvAEPg3, tnVzBrCPName=u'sql', prio=u'unspecified')
fvRsPathAtt5 = cobra.model.fv.RsPathAtt(fvAEPg3, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2B]', instrImedcy=u'lazy', encap=vlan3, descr=u'', mode=u'regular')
fvRsPathAtt6 = cobra.model.fv.RsPathAtt(fvAEPg3, tDn=u'topology/pod-1/protpaths-101-102/pathep-[Heroes_FI-2A]', instrImedcy=u'lazy', encap=vlan3, descr=u'', mode=u'regular')
fvRsDomAtt3 = cobra.model.fv.RsDomAtt(fvAEPg3, instrImedcy=u'lazy', resImedcy=u'lazy', encap=u'unknown', tDn=u'uni/phys-Heroes_phys')
fvRsCustQosPol3 = cobra.model.fv.RsCustQosPol(fvAEPg3, tnQosCustomPolName=u'')
import cobra.model.pol
import cobra.model.vz
from cobra.internal.codec.xmlcodec import toXMLStr
# log into an APIC and create a directory object
ls = cobra.mit.session.LoginSession('https://sandboxapicdc.cisco.com', 'admin', 'ciscopsdt')
md = cobra.mit.access.MoDirectory(ls)
md.login()
# the top level object on which operations will be made
topMo = cobra.model.pol.Uni('')
# build the request using cobra syntax
fvTenant = cobra.model.fv.Tenant(topMo, name=u'SnV')
fvCtx = cobra.model.fv.Ctx(fvTenant, name=u'Superverse')
fvBD = cobra.model.fv.BD(fvTenant, name=u'antigravity')
fvRsCtx = cobra.model.fv.RsCtx(fvBD, tnFvCtxName=u'Superverse')
fvSubnet = cobra.model.fv.Subnet(fvBD, ip=u'10.2.10.1/23', name=u'Subnet')
vzFilter = cobra.model.vz.Filter(fvTenant, name=u'http')
vzEntry = cobra.model.vz.Entry(vzFilter, etherT=u'ip', prot=u'tcp', dFromPort=u'http', name=u'tcp-80', dToPort=u'http')
vzFilter2 = cobra.model.vz.Filter(fvTenant, name=u'https')
vzEntry2 = cobra.model.vz.Entry(vzFilter2, etherT=u'ip', prot=u'tcp', dFromPort=u'https', name=u'tcp-443', dToPort=u'https')
vzFilter3 = cobra.model.vz.Filter(fvTenant, name=u'syslog')
vzEntry3 = cobra.model.vz.Entry(vzFilter3, etherT=u'ip', prot=u'udp', dFromPort=u'514', name=u'udp-514', dToPort=u'514')
vzFilter4 = cobra.model.vz.Filter(fvTenant, name=u'sql-server')
vzEntry4 = cobra.model.vz.Entry(vzFilter4, etherT=u'ip', prot=u'tcp', dFromPort=u'1433', name=u'tcp-1433', dToPort=u'1433')
vzFilter5 = cobra.model.vz.Filter(fvTenant, name=u'mqtt')
vzEntry5 = cobra.model.vz.Entry(vzFilter5, etherT=u'ip', prot=u'tcp', dFromPort=u'1883', name=u'tcp-1883', dToPort=u'1883')
vzFilter6 = cobra.model.vz.Filter(fvTenant, name=u'sql-browser')
vzEntry6 = cobra.model.vz.Entry(vzFilter6, etherT=u'ip', prot=u'udp', dFromPort=u'1434', name=u'udp-1434', dToPort=u'1434')
vzBrCP = cobra.model.vz.BrCP(fvTenant, name=u'web-ui')
successfulnodes = []
leafDn = {}
physDomP = self.md.lookupByClass(
'physDomP', propFilter='eq(physDomP.name,"phys")')[0]
topMo = cobra.model.pol.Uni('')
fvTenant = cobra.model.fv.Tenant(topMo, name=tenant)
fvCtx = cobra.model.fv.Ctx(
fvTenant, name=tenant, pcEnfPref='unenforced')
fvAp = cobra.model.fv.Ap(fvTenant, name='EPG-as-VLAN')
for v in vlans:
vlan, ip = v
kwargs = {'descr': urllib.quote('{0}'.format(ip))}
vlanlist.append(vlan)
fvAEPg = cobra.model.fv.AEPg(
fvAp, name='vlan{0}'.format(vlan), **kwargs)
fvRsDomAtt = cobra.model.fv.RsDomAtt(fvAEPg, tDn=physDomP.dn)
fvBD = cobra.model.fv.BD(
fvTenant, name='vlan{0}'.format(vlan), **kwargs)
fvSubnet = cobra.model.fv.Subnet(fvBD, ip=ip)
fvRsCtx = cobra.model.fv.RsCtx(fvBD, tnFvCtxName=fvCtx.name)
fvRsBd = cobra.model.fv.RsBd(fvAEPg, tnFvBDName=fvBD.name)
for leaf in map(str, nodes):
if self.verifyleaf:
if leafDn.get(leaf):
leafMo = leafDn.get(leaf)
else:
leafMo = self.md.lookupByClass(
'fabricNode',
propFilter='eq(fabricNode.id,"{0}")'.format(leaf))
if len(leafMo) > 0 and leafMo[0].role != 'leaf':
leafDn[leaf] = None
elif len(leafMo) > 0:
leafMo = leafMo[0]
leafMoDn = leafMo.dn
leafDn[leaf] = leafMo
else:
print 'Unable to find leaf {0}. Skipping it'.format(leaf)
leafMoDn = None
leafDn[leaf] = None
else:
leafMoDn = 'topology/pod-1/node-{0}'.format(leaf)
if leafMoDn:
if leafMoDn not in successfulnodes:
successfulnodes.append(leafMoDn)
fvRsNodeAtt = cobra.model.fv.RsNodeAtt(
fvAEPg, tDn=leafMoDn, encap='vlan-{0}'.format(vlan))
successfulvlans = self.createVlanInstances(self.collapseRange(vlanlist), nodes)
return fvTenant, successfulnodes, successfulvlans