How to use the followthemoney.model.map_entities function in followthemoney

To help you get started, we’ve selected a few followthemoney examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github alephdata / followthemoney / tests / test_mapping.py View on Github external
"properties": {"name": {"column": "name"}},
                },
                "ownership": {
                    "schema": "Ownership",
                    "keys": ["person_id", "le_id"],
                    "properties": {
                        "owner": {"entity": "owner"},
                        "asset": {"entity": "person"},
                        "percentage": {"column": "percentage"},
                    },
                },
            },
        }

        with self.assertRaises(InvalidMapping):
            list(model.map_entities(mapping_slavery))
github alephdata / followthemoney / tests / test_mapping.py View on Github external
def test_kek_sqlite(self):
        entities = list(model.map_entities(self.kek_mapping))
        assert len(entities) == 8712, len(entities)
        ids = set([e.id for e in entities])
        assert len(ids) == 5607, len(ids)
github alephdata / followthemoney / tests / test_keys.py View on Github external
"expert": {
                    "schema": "Person",
                    "key": "id",
                    "id_column": "id",
                    "properties": {
                        "name": {"column": "name"},
                        "nationality": {"column": "nationality"},
                        "gender": {"column": "gender"},
                    },
                }
            },
        }

        # only use key/keys or key_column
        with self.assertRaises(InvalidMapping):
            list(model.map_entities(mapping))

        del mapping["entities"]["expert"]["key"]

        entities = list(model.map_entities(mapping))
        self.assertEqual(len(entities), 14)
        self.assertEqual(entities[0].id, "1")
        self.assertEqual(entities[-1].id, "42")
github alephdata / followthemoney / tests / test_mapping.py View on Github external
"director": {
                    "schema": "Person",
                    "key": "id",
                    "key_literal": "person",
                    "properties": {
                        "name": {"column": "name"},
                        "address": {
                            "join": ", ",
                            "columns": ["house_number", "town", "zip"],
                        },
                    },
                }
            },
        }

        entities = list(model.map_entities(mapping))
        assert len(entities) == 1, len(entities)
        assert entities[0].get("address") == ["64, The Desert, 01234"], entities  # noqa
github alephdata / followthemoney / tests / test_mapping.py View on Github external
mapping = {
            "csv_url": url,
            "entities": {
                "director": {
                    "schema": "Person",
                    "key": "id",
                    "key_literal": "person",
                    "properties": {
                        "name": {"column": "name"},
                        "notes": {"split": "; ", "column": "fave_colours"},
                    },
                }
            },
        }

        entities = list(model.map_entities(mapping))
        assert len(entities) == 1, len(entities)
        self.assertCountEqual(
            entities[0].get("notes"), ["brown", "black", "blue"]
        )  # noqa
github alephdata / followthemoney / followthemoney / cli / mapping.py View on Github external
def run_mapping(outfile, mapping_yaml, sign=True):
    config = load_mapping_file(mapping_yaml)
    try:
        for dataset, meta in config.items():
            ns = Namespace(dataset)
            for mapping in keys_values(meta, "queries", "query"):
                entities = model.map_entities(mapping, key_prefix=dataset)
                for entity in entities:
                    if sign:
                        entity = ns.apply(entity)
                    write_object(outfile, entity)
    except BrokenPipeError:
        raise click.Abort()
    except Exception as exc:
        raise click.ClickException(str(exc))
github alephdata / followthemoney / experiment.py View on Github external
def execute_mapping(query):
    for entity in model.map_entities(query):
        schema = model.get(entity['schema'])
        properties = entity.pop('properties')
        entity_ref = registry.entity.ref(entity)
        for name, values in properties.items():
            prop = schema.get(name)
            for value in values:
                if prop.type.prefix:
                    stmt = Statement(entity_ref, prop, value)
                    print(stmt)
                    print(stmt.invert())