Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class FakeModelContext(ModelContext):
async def train(self, sources: Sources):
pass
async def accuracy(self, sources: Sources) -> AccuracyType:
return AccuracyType(0.42)
async def predict(self, repos: AsyncIterator[Repo]) -> AsyncIterator[Repo]:
async for repo in repos:
repo.predicted(random.random(), float(repo.src_url))
yield repo
@entry_point("fake")
class FakeModel(Model):
CONTEXT = FakeModelContext
CONFIG = FakeConfig
def feature_load(loading=None):
if loading == "fake":
return FakeFeature()
return [FakeFeature()]
def model_load(loading):
if loading == "fake":
return FakeModel
return [FakeModel]
self.mc_atomic = atomic
try:
# If we are testing then RUN_YIELD will be an asyncio.Event
if self.RUN_YIELD_START is not False:
await self.RUN_YIELD_START.put(self)
await self.RUN_YIELD_FINISH.wait()
else: # pragma: no cov
# Wait for ctrl-c
while True:
await asyncio.sleep(60)
finally:
await self.app.cleanup()
await self.site.stop()
@entry_point("http")
class HTTPService(CMD):
"""
HTTP interface to access DFFML API.
"""
server = Server
createtls = CreateTLS
async def predict(
self, repos: AsyncIterator[Repo]
) -> AsyncIterator[Tuple[Repo, Any, float]]:
if self.regression_line is None:
raise ModelNotTrained("Train model before prediction.")
async for repo in repos:
feature_data = repo.features(self.features)
repo.predicted(
await self.predict_input(feature_data[self.features[0]]),
self.regression_line[2],
)
yield repo
@entry_point("slr")
class SLR(Model):
"""
Simple Linear Regression Model for 2 variables implemented from scratch.
Models are saved under the ``directory`` in subdirectories named after the
hash of their feature names.
.. code-block:: console
$ cat > dataset.csv << EOF
Years,Salary
1,40
2,50
3,60
4,70
5,80
EOF
# Lies
return 1.0
async def predict(
self, repos: AsyncIterator[Repo]
) -> AsyncIterator[Tuple[Repo, Any, float]]:
"""
Uses trained data to make a prediction about the quality of a repo.
"""
async for repo in repos:
yield repo, self.parent.config.classifications[
repo.feature(self.parent.config.features.names()[0])
], 1.0
@entry_point("misc")
class MiscModel(Model):
CONTEXT = MiscModelContext
class OpenJSONFile:
data: Dict[str, Dict]
active: int
lock: asyncio.Lock
async def inc(self):
async with self.lock:
self.active += 1
async def dec(self):
async with self.lock:
self.active -= 1
return bool(self.active < 1)
@entry_point("json")
class JSONSource(FileSource, MemorySource):
"""
JSONSource reads and write from a JSON file on open / close. Otherwise
stored in memory.
"""
CONFIG = JSONSourceConfig
OPEN_JSON_FILES: Dict[str, OpenJSONFile] = {}
OPEN_JSON_FILES_LOCK: asyncio.Lock = asyncio.Lock()
@asynccontextmanager
async def _open_json(self, fd=None):
async with self.OPEN_JSON_FILES_LOCK:
if self.config.filename not in self.OPEN_JSON_FILES:
self.logger.debug(f"{self.config.filename} first open")
self.OPEN_JSON_FILES[self.config.filename] = OpenJSONFile(
CSV_SOURCE_CONFIG_DEFAULT_KEY = "src_url"
CSV_SOURCE_CONFIG_DEFAULT_LABEL = "unlabeled"
CSV_SOURCE_CONFIG_DEFAULT_LABEL_COLUMN = "label"
@config
class CSVSourceConfig(FileSourceConfig):
key: str = CSV_SOURCE_CONFIG_DEFAULT_KEY
label: str = CSV_SOURCE_CONFIG_DEFAULT_LABEL
labelcol: str = CSV_SOURCE_CONFIG_DEFAULT_LABEL_COLUMN
# CSVSource is a bit of a mess
@entry_point("csv")
class CSVSource(FileSource, MemorySource):
"""
Uses a CSV file as the source of repo feature data
"""
CONFIG = CSVSourceConfig
# Headers we've added to track data other than feature data for a repo
CSV_HEADERS = ["prediction", "confidence"]
OPEN_CSV_FILES: Dict[str, OpenCSVFile] = {}
OPEN_CSV_FILES_LOCK: asyncio.Lock = asyncio.Lock()
@asynccontextmanager
async def _open_csv(self, fd=None):
async with self.OPEN_CSV_FILES_LOCK:
self, config: BaseConfig, parent: "MemoryKeyValueStore"
) -> None:
super().__init__(config, parent)
self.memory: Dict[str, bytes] = {}
self.lock = asyncio.Lock()
async def get(self, key: str) -> Union[bytes, None]:
async with self.lock:
return self.memory.get(key)
async def set(self, key: str, value: bytes):
async with self.lock:
self.memory[key] = value
@entry_point("memory")
class MemoryKeyValueStore(BaseKeyValueStore, BaseMemoryDataFlowObject):
"""
Key Value store backed by dict
"""
CONTEXT = MemoryKeyValueStoreContext
class MemoryInputSetConfig(NamedTuple):
ctx: BaseInputSetContext
inputs: List[Input]
class MemoryInputSet(BaseInputSet):
def __init__(self, config: MemoryInputSetConfig) -> None:
super().__init__(config)
Uses trained data to make a prediction about the quality of a repo.
"""
if not os.path.isdir(self.model_dir_path):
raise ModelNotTrained("Train model before prediction.")
# Create the input function
input_fn, predict = await self.predict_input_fn(repos)
# Makes predictions on classifications
predictions = self.model.predict(input_fn=input_fn)
for repo, pred_dict in zip(predict, predictions):
class_id = pred_dict["class_ids"][0]
probability = pred_dict["probabilities"][class_id]
repo.predicted(self.cids[class_id], probability)
yield repo
@entry_point("tfdnnc")
class DNNClassifierModel(Model):
"""
Implemented using Tensorflow's DNNClassifier.
.. code-block:: console
$ wget http://download.tensorflow.org/data/iris_training.csv
$ wget http://download.tensorflow.org/data/iris_test.csv
$ head iris_training.csv
$ sed -i 's/.*setosa,versicolor,virginica/SepalLength,SepalWidth,PetalLength,PetalWidth,classification/g' *.csv
$ head iris_training.csv
$ dffml train \\
-model tfdnnc \\
-model-epochs 3000 \\
-model-steps 20000 \\
-model-classification classification \\
Evaluates the accuracy of our model after training using the input repos
as test data.
'''
# Lies
return 1.0
async def predict(self, repos: AsyncIterator[Repo], features: Features,
classifications: List[Any]) -> \
AsyncIterator[Tuple[Repo, Any, float]]:
'''
Uses trained data to make a prediction about the quality of a repo.
'''
async for repo in repos:
yield repo, classifications[0], 1.0
@entry_point('misc')
class Misc(Model):
CONTEXT = MiscContext
if not os.path.isdir(self.model_dir_path):
raise NotADirectoryError("Model not trained")
# Create the input function
input_fn, predict_repo = await self.predict_input_fn(repos)
# Makes predictions on
predictions = self.model.predict(input_fn=input_fn)
for repo, pred_dict in zip(predict_repo, predictions):
# TODO Instead of float("nan") save accuracy value and use that.
repo.predicted(float(pred_dict["predictions"]), float("nan"))
yield repo
@entry_point("tfdnnr")
class DNNRegressionModel(Model):
"""
Implemented using Tensorflow's DNNEstimator.
Usage:
* predict: Name of the feature we are trying to predict or using for training.
Generating train and test data
* This creates files `train.csv` and `test.csv`,
make sure to take a BACKUP of files with same name in the directory
from where this command is run as it overwrites any existing files.
.. code-block:: console