Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_response_class(self):
one_bool = comm.Read('BaseBool')
self.assertEqual(
isinstance(one_bool, Response),
True, "Reponse class not found in Read")
bool_tags = ['BaseBool', 'BaseBits.0', 'BaseBits.31']
booleans = comm.Read(bool_tags)
self.assertEqual(
isinstance(booleans[0], Response),
True, "Reponse class not found in Multi Read")
bool_write = comm.Write('BaseBool', 1)
self.assertEqual(
isinstance(bool_write, Response),
True, "Reponse class not found in Write")
def _readTag(self, tag_name, elements, data_type):
"""
processes the read request
"""
self.Offset = 0
if not self._connect():
return None
tag, base_tag, index = _parseTagName(tag_name, 0)
resp = self._initial_read(tag, base_tag, data_type)
if resp[2] != 0 and resp[2] != 6:
return Response(tag_name, None, resp[2])
data_type = self.KnownTags[base_tag][0]
bit_count = self.CIPTypes[data_type][0] * 8
ioi = self._buildTagIOI(tag_name, data_type)
if data_type == 211:
# bool array
words = _getWordCount(index, elements, bit_count)
request = self._add_read_service(ioi, words)
elif BitofWord(tag):
# bits of word
split_tag = tag_name.split('.')
bit_pos = split_tag[len(split_tag)-1]
bit_pos = int(bit_pos)
words = _getWordCount(bit_pos, elements, bit_count)
tags += self._extractTagPacket(ret_data, programName=None)
else:
return Response(None, None, status)
if allTags:
for program_name in self.ProgramNames:
self.Offset = 0
request = self._buildTagListRequest(program_name)
eip_header = self._buildEIPHeader(request)
status, ret_data = self._getBytes(eip_header)
if status == 0 or status == 6:
tags += self._extractTagPacket(ret_data, program_name)
else:
return Response(None, None, status)
while status == 6:
self.Offset += 1
request = self._buildTagListRequest(program_name)
eip_header = self._buildEIPHeader(request)
status, ret_data = self._getBytes(eip_header)
if status == 0 or status == 6:
tags += self._extractTagPacket(ret_data, program_name)
else:
return Response(None, None, status)
return Response(None, tags, status)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0.5)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.sendto(request, ('255.255.255.255', 44818))
try:
while(1):
ret = s.recv(4096)
context = unpack_from('
def _multiWriteParser(self, write_data, data):
# remove the beginning of the packet because we just don't care about it
stripped = data[50:]
tag_count = unpack_from('
if status == 0 or status == 6:
tags += self._extractTagPacket(ret_data, program_name)
else:
return Response(None, None, status)
while status == 6:
self.Offset += 1
request = self._buildTagListRequest(program_name)
eip_header = self._buildEIPHeader(request)
status, ret_data = self._getBytes(eip_header)
if status == 0 or status == 6:
tags += self._extractTagPacket(ret_data, program_name)
else:
return Response(None, None, status)
return Response(None, tags, status)
"""
# If ProgramNames is empty then _getTagList hasn't been called
if not self.ProgramNames:
self._getTagList(False)
# Get single program tags if progragName exists
if programName in self.ProgramNames:
program_tags = self._getProgramTagList(programName)
# Getting status from program_tags Response object
# _getUDT returns a list of LGXTags might need rework in the future
status = program_tags.Status
program_tags = self._getUDT(program_tags.Value)
return Response(None, program_tags, status)
else:
return Response(None, None, 'Program not found, please check name!')
words = _getWordCount(bit_pos, elements, bit_count)
request = self._add_read_service(ioi, words)
else:
# everything else
request = self._add_read_service(ioi, elements)
eip_header = self._buildEIPHeader(request)
status, ret_data = self._getBytes(eip_header)
if status == 0 or status == 6:
return_value = self._parseReply(tag_name, elements, ret_data)
return Response(tag_name, return_value, status)
else:
return Response(tag_name, None, status)
elif dataTypeValue == 211:
dataTypeFormat = self.CIPTypes[dataTypeValue][2]
val = unpack_from(dataTypeFormat, stripped, offset+6)[0]
bitState = _getBitOfWord(tag, val)
response = Response(tag, bitState, replyStatus)
elif dataTypeValue == 160:
strlen = unpack_from('
"""
Requests the controller tag list and returns a list of LgxTag type
"""
if not self._connect():
return None
self.Offset = 0
tags = []
request = self._buildTagListRequest(programName=None)
eip_header = self._buildEIPHeader(request)
status, ret_data = self._getBytes(eip_header)
if status == 0 or status == 6:
tags += self._extractTagPacket(ret_data, programName=None)
else:
return Response(None, None, status)
while status == 6:
self.Offset += 1
request = self._buildTagListRequest(programName=None)
eip_header = self._buildEIPHeader(request)
status, ret_data = self._getBytes(eip_header)
if status == 0 or status == 6:
tags += self._extractTagPacket(ret_data, programName=None)
else:
return Response(None, None, status)
if allTags:
for program_name in self.ProgramNames:
self.Offset = 0