Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_writing_unicode(self):
record = Record()
record.add_field(Field(245, ["1", "0"], ["a", chr(0x1234)]))
record.leader = " a "
writer = MARCWriter(open("test/foo", "wb"))
writer.write(record)
writer.close()
reader = MARCReader(open("test/foo", "rb"), to_unicode=True)
record = next(reader)
self.assertEqual(record["245"]["a"], chr(0x1234))
reader.close()
os.remove("test/foo")
def test_copy_utf8(self):
writer = pymarc.MARCWriter(open("test/write-utf8-test.dat", "wb"))
new_record = pymarc.Record(to_unicode=True, force_utf8=True)
def process_xml(record):
new_record.leader = record.leader
for field in record.get_fields():
new_record.add_field(field)
pymarc.map_xml(process_xml, "test/utf8.xml")
try:
writer.write(new_record)
writer.close()
finally:
# remove it
def run(self):
with self.input().get('codes').open() as handle:
filter_codes = map(string.strip, handle.readlines())
with self.input().get('snaphost').open() as handle:
with self.output().open('w') as output:
reader = pymarc.MARCReader(handle, to_unicode=True)
writer = pymarc.MARCWriter(output)
for record in reader:
record = marcx.FatRecord.from_record(record)
# 240.a Uniform title
if record.has('240.a'):
continue
# 007 Physical Description Fixed Field, Text
if not record.test('007', lambda s: s.startswith('t')):
continue
# 072.2 Subject Category Code, Source
if record.test('072.2', lambda s: s.startswith('bicssc')):
for code in record.itervalues('072.a'):
if code in filter_codes:
writer.write(record.to_record())
def run(self):
tmp = shellout("""yaz-marcdump -l 9=97 -f UTF8 -t UTF8
-i marcxml -o marc {input} > {output}""",
input=self.input().path, ignoremap={5: 'INVESTIGATE'})
# filter dups, bad ids and unreadable records ...
# TODO: make this possible with gomarckit as well
seen = set()
with open(tmp) as handle:
with self.output().open('w') as output:
reader = pymarc.MARCReader(handle, to_unicode=True)
writer = pymarc.MARCWriter(output)
while True:
try:
record = reader.next()
record_id = record['001'].value()
if record_id in ('15270', '15298', '15318', '15335'):
self.logger.debug("Skipping {0}".format(record_id))
continue
if not record_id in seen:
writer.write(record)
seen.add(record_id)
else:
self.logger.debug("Skipping duplicate: {0}".format(record_id))
except pymarc.exceptions.RecordDirectoryInvalid as err:
self.logger.warn(err)
except StopIteration:
break
with tempfile.NamedTemporaryFile(delete=False) as dst:
shutil.copyfileobj(handle, dst)
shellout("yaz-marcdump -i marcxml -o marc {input} >> {output}",
input=dst.name,
output=combined,
ignoremap={5: 'expected error from yaz'})
os.remove(dst.name)
# Finally, concatenate initial dump.
shellout("cat {input} >> {output}", input=self.input().get('dump').path, output=combined)
# Already seen identifier.
seen = set()
with self.output().open('wb') as output:
writer = pymarc.MARCWriter(output)
# Iterate over MARC records (which are newest to oldest, keep track of seen identifiers).
with open(combined) as handle:
reader = pymarc.MARCReader(handle, force_utf8=True, to_unicode=True)
for record in reader:
field = record["001"]
if not field:
self.logger.debug("missing identifier")
continue
id = field.value()
if id in seen:
self.logger.debug("skipping duplicate: %s", id)
continue
if id in deleted:
import pandas
import marcx
import pymarc
warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")
inputfilename = "160_input.csv"
outputfilename = "160_output.mrc"
if len(sys.argv) == 3:
inputfilename, outputfilename = sys.argv[1:]
outputfile = io.open(outputfilename, "wb")
writer = pymarc.MARCWriter(outputfile)
csv_records = pandas.read_csv(inputfilename, encoding="latin-1", sep=";")
for csv_record in csv_records.iterrows():
csv_record = csv_record[1]
marc_record = marcx.Record(force_utf8=True)
marc_record.leader = " nam 22 4500"
f001 = "finc-160-" + str(csv_record["001"])
marc_record.add("001", data=f001)
# Zugangsformat
marc_record.add("007", data="tu")
# Sprache
"""
Convert an IMSLP tarball to MARC binary output file without extracting it.
If outputfile is not given, write to a temporary location.
Returns the location of the resulting MARC file.
A maximum number of failed conversions can be specified with `max_failures`,
as of 2018-04-25, there were 30 records w/o title.
"""
if outputfile is None:
_, outputfile = tempfile.mkstemp(prefix="siskin-")
stats = collections.Counter()
with open(outputfile, "wb") as output:
writer = pymarc.MARCWriter(output)
with tarfile.open(tarball) as tar:
for member in tar.getmembers():
fobj = tar.extractfile(member)
try:
record = imslp_xml_to_marc(fobj.read(), legacy_mapping=legacy_mapping)
writer.write(record)
except ValueError as exc:
logger.warn("conversion failed: %s", exc)
stats["failed"] += 1
finally:
fobj.close()
stats["processed"] += 1
writer.close()
if stats["failed"] > max_failures:
Taken from 012_filter.sh
unzip -p $f | tr -d '\t' | sed -e 's/\(\)/\t\1/g' | tr -d
'\n' | tr '\t' '\n' | grep '9,2ssgn' | grep 'digit' >$t
"""
counter = collections.Counter()
self.logger.debug("filtering out records from %s", self.input().path)
with open(self.input().path, 'rb') as handle:
with tempfile.NamedTemporaryFile('wb', delete=False) as output:
reader = pymarc.MARCReader(handle)
writer = pymarc.MARCWriter(output)
for i, record in enumerate(reader):
if i % 100000 == 0:
self.logger.debug('filtered %d/%d records, %s', counter['written'], i, counter)
record = marcx.Record.from_record(record)
if not 'ssgn' in record.values('084.2'):
counter['not-ssgn'] += 1
continue
if not '9,2' in record.values('084.a'):
counter['not-9,2'] += 1
continue
if not 'digit' in record.values('912.a'):
counter['not-digit'] += 1
continue
writer.write(record)
counter['written'] += 1
luigi.LocalTarget(output.name).move(self.output().path)