Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pyexcel.constants as constants
from pyexcel import Book, Sheet, get_book
from _compact import StringIO, OrderedDict
from pyexcel.source import AbstractSource, MemorySourceMixin
from pyexcel.plugins import SourceInfo
from nose.tools import eq_, raises
FIXTURE = "dummy"
@SourceInfo(
"source",
fields=[FIXTURE],
targets=(constants.BOOK, constants.SHEET),
actions=(constants.WRITE_ACTION,),
attributes=[FIXTURE],
key=FIXTURE,
)
class DummySource(AbstractSource, MemorySourceMixin):
"""
For a dynamically loaded plugin, you will need to create the following
fields. and implement to class level functions: keywords and
is_my_business.
"""
def __init__(self, file_type=None, file_stream=None, **keywords):
if file_stream:
self._content = file_stream
else:
self._content = StringIO()
else:
list_of_file_types = RENDERER.get_all_file_types()
file_types = []
lowercase_file_name = file_name.lower()
for a_supported_type in list_of_file_types:
if lowercase_file_name.endswith(a_supported_type):
file_types.append(a_supported_type)
if len(file_types) > 1:
file_types = sorted(file_types, key=lambda x: len(x))
file_type = file_types[-1]
elif len(file_types) == 1:
file_type = file_types[0]
else:
file_type = lowercase_file_name.split('.')[-1]
raise FileTypeNotSupported(
constants.FILE_TYPE_NOT_SUPPORTED_FMT % (file_type, action))
return file_type
"application/vnd.oasis.opendocument.spreadsheet": "ods",
"application/vnd.ms-excel": "xls",
XLSX: "xlsx",
"application/vnd.ms-excel.sheet.macroenabled.12": "xlsm",
"text/html": "html",
}
# pylint: disable=W0223
class HttpSource(AbstractSource):
"""
Multiple sheet data source via http protocol
"""
fields = [params.URL]
targets = (constants.SHEET, constants.BOOK)
actions = (constants.READ_ACTION,)
attributes = [params.URL]
key = params.URL
def __init__(self, url=None, **keywords):
self.__url = url
AbstractSource.__init__(self, **keywords)
def get_data(self):
connection = request.urlopen(self.__url)
info = connection.info()
if PY2:
mime_type = info.type
else:
mime_type = info.get_content_type()
file_type = FILE_TYPE_MIME_TABLE.get(mime_type, None)
def _cleanse_a_row(row):
for item in row:
if item == constants.DEFAULT_NA:
yield " "
else:
yield to_format(str, item)
def register_an_attribute(meta_cls, target, action, attr):
"""Register a file type as an attribute"""
if attr in ATTRIBUTE_REGISTRY[target][constants.RW_ACTION]:
# No registration required
return
ATTRIBUTE_REGISTRY[target][action].add(attr)
if action == constants.READ_ACTION:
meta_cls.register_input(attr)
else:
meta_cls.register_presentation(attr)
intersection = (
attr in ATTRIBUTE_REGISTRY[target][constants.READ_ACTION]
and attr in ATTRIBUTE_REGISTRY[target][constants.WRITE_ACTION]
)
if intersection:
ATTRIBUTE_REGISTRY[target][constants.RW_ACTION].add(attr)
def parse_file_stream(self, file_stream, struct=AUTO_DETECT,
sheet_name=constants.DEFAULT_NAME,
**keywords):
content = json_loads(file_stream)
if struct == AUTO_DETECT:
struct, content = detect_format(content)
if struct == AUTO_DETECT:
raise Exception(
"No auto detection is supported in this version")
if struct in READERS:
reader = READERS[struct](content, **keywords)
else:
raise Exception("Unknown data structure")
return {sheet_name: reader.to_array()}
| | | | | | | 42 | 43 | 44 | 22 | 32 | 42 |
+----+----+----+----+----+----+----+----+----+----+----+----+
| | | | | | | | | | 23 | 33 | 43 |
+----+----+----+----+----+----+----+----+----+----+----+----+
| | | | | | | | | | 24 | 34 | 44 |
+----+----+----+----+----+----+----+----+----+----+----+----+
| | | | | | | | | | 25 | 35 | 45 |
+----+----+----+----+----+----+----+----+----+----+----+----+
"""
if rows:
self._paste_rows(topleft_corner, rows)
elif columns:
self._paste_columns(topleft_corner, columns)
else:
raise ValueError(constants.MESSAGE_DATA_ERROR_EMPTY_CONTENT)
def empty_to_format(_, target_format):
"""Convert empty value to specified format"""
if target_format == float:
ret = 0.0
elif target_format == int:
ret = 0
else:
ret = constants.DEFAULT_NA
return ret
def get_sheet(**keywords):
"""
Get an instance of :class:`Sheet` from an excel source
"""
sheet_params = {}
for field in constants.VALID_SHEET_PARAMETERS:
if field in keywords:
sheet_params[field] = keywords.pop(field)
named_content = sources.get_sheet_stream(**keywords)
sheet = Sheet(named_content.payload, named_content.name, **sheet_params)
return sheet
def extend_columns(self, columns):
"""Inserts two dimensional data after the rightmost column
This is how it works:
Given::
s s s t t
Get::
s s s + t t
"""
if not isinstance(columns, list):
raise TypeError(constants.MESSAGE_DATA_ERROR_DATA_TYPE_MISMATCH)
incoming_data = columns
if not compact.is_array_type(columns, list):
incoming_data = [columns]
incoming_data = transpose(incoming_data)
self._extend_columns_with_rows(incoming_data)