Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from subprocess import call,PIPE,check_output, Popen
from tasklib import TaskWarrior,Task
import unittest
import shutil
import dbus
import time
import re
import os
dirname, filename = os.path.split(os.path.realpath(__file__))
#change to the project dir
os.chdir(dirname)
class TestPomodoro(unittest.TestCase):
tw = TaskWarrior(data_location='.task/', create=True)
new_task = Task(tw, description="task for the test")
new_task.save()
uuid = new_task['uuid']
def get_active_task(self):
for task in self.tw.tasks.pending():
if task.active:
#get all fields in task
task.refresh()
return task
return {}
def tearDown(self):
try:
interface.do_fsm("stop")[0]
def test_done_current(self):
other_task = Task(self.tw, description="task to be done")
other_task.save()
uuid2 = other_task['uuid']
self.assertEquals(interface.do_start(dbus.Dictionary({'uuid': uuid2 , 'resume': 'No'}))[0],"started:"+uuid2)
prog = re.compile('started .* left.*')
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.done_current()[0],"ok")
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.done_current()[0],"no active task")
def test_resume(self):
self.start()
self.assertEquals(interface.do_fsm("stop")[0],"ok")
self.assertEquals(interface.do_fsm("start")[0],"ok")
prog = re.compile('started .* left.*')
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.do_fsm("pause")[0],"ok")
prog = re.compile('paused .* left.*')
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.do_fsm("start")[0],"ok")
self.assertEquals(interface.do_fsm("start")[0],"Already started")
# test done and resume
other_task = Task(self.tw, description="task 2 for the test")
other_task.save()
uuid2 = other_task['uuid']
self.assertEquals(interface.do_start(dbus.Dictionary({'uuid': uuid2 , 'resume': 'No'}))[0],"started:"+uuid2)
prog = re.compile('started .* left.*')
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.do_fsm("stop")[0],"ok")
other_task.done()
self.assertEquals(interface.do_fsm("start")[0],"ok")
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
# test delete and resume
other_task = Task(self.tw, description="task 3 for the test")
other_task.save()
uuid2 = other_task['uuid']
self.assertEquals(interface.do_start(dbus.Dictionary({'uuid': uuid2 , 'resume': 'No'}))[0],"started:"+uuid2)
prog = re.compile('started .* left.*')
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
def generate_data(self):
super(MultipleSourceTest, self).generate_data()
self.extra_dir = tempfile.mkdtemp(dir='/tmp/')
self.extra_tw = TaskWarrior(
data_location=self.extra_dir,
taskrc_location='/'
)
extra_tasks = [Task(self.extra_tw, **task_kwargs)
for task_kwargs in self.extra_tasks]
self.extra_tasks = extra_tasks
for task in self.extra_tasks:
task.save()
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.do_fsm("start")[0],"ok")
self.assertEquals(interface.do_fsm("start")[0],"Already started")
# test done and resume
other_task = Task(self.tw, description="task 2 for the test")
other_task.save()
uuid2 = other_task['uuid']
self.assertEquals(interface.do_start(dbus.Dictionary({'uuid': uuid2 , 'resume': 'No'}))[0],"started:"+uuid2)
prog = re.compile('started .* left.*')
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.do_fsm("stop")[0],"ok")
other_task.done()
self.assertEquals(interface.do_fsm("start")[0],"ok")
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
# test delete and resume
other_task = Task(self.tw, description="task 3 for the test")
other_task.save()
uuid2 = other_task['uuid']
self.assertEquals(interface.do_start(dbus.Dictionary({'uuid': uuid2 , 'resume': 'No'}))[0],"started:"+uuid2)
prog = re.compile('started .* left.*')
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
self.assertEquals(interface.do_fsm("stop")[0],"ok")
other_task.delete()
self.assertEquals(interface.do_fsm("start")[0],"ok")
self.assertTrue(prog.match(interface.do_fsm("status")[0]))
try:
return self.cache.task[self.uuid]
except Task.DoesNotExist:
# Task with stale uuid, recreate
self.__unsaved_task = Task(self.tw)
# If task cannot be loaded, we need to remove the UUID
vim.command(
'echom "UUID \'{0}\' not found, Task on line {1} will be '
're-created in TaskWarrior."'.format(
self.uuid,
self['line_number'] + 1
))
self.uuid = None
else:
# New task object accessed first time
self.__unsaved_task = Task(self.tw)
return self.__unsaved_task
def load_tasks(self, filename):
f = open(filename, 'r')
self.projects = []
self.contexts = []
for line in f.readlines():
task = tasklib.Task(line.strip('\n'))
for project in task.projects:
self.projects.append(project)
for context in task.contexts:
self.contexts.append(context)
self.tasks.append(task)
self.projects = list(set(self.projects))
self.contexts = list(set(self.contexts))
f.seek(0)
self.file_contents = f.read()
def task(self):
# New task object accessed second or later time
if self.__unsaved_task is not None:
return self.__unsaved_task
# Return the corresponding task if alrady set
# Else try to load it or create a new one
if self.uuid:
try:
return self.cache.task[self.uuid]
except Task.DoesNotExist:
# Task with stale uuid, recreate
self.__unsaved_task = Task(self.tw)
# If task cannot be loaded, we need to remove the UUID
vim.command(
'echom "UUID \'{0}\' not found, Task on line {1} will be '
're-created in TaskWarrior."'.format(
self.uuid,
self['line_number'] + 1
))
self.uuid = None
else:
# New task object accessed first time
self.__unsaved_task = Task(self.tw)
return self.__unsaved_task