Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import json
import sqlite3
import sys
import markovify
import config
import utils
class SimulatorText(markovify.Text):
def sentence_split(self, text):
return text.split('<>')
def get_recent_tweets():
pass
def generate_random_tweet():
db_connection = sqlite3.connect(config.SQLITE_DB)
db_cursor = db_connection.cursor()
db_cursor.execute('SELECT * FROM tweets ORDER BY id DESC LIMIT 150')
dataset = ''
for tweet in db_cursor.fetchall():
dataset += tweet[1]
dataset += '<>' # delimiter
class Chain:
@staticmethod
def load_messages(client_id, user_id):
return message.Message.query.filter(client_id=client_id, user_id=user_id)
@staticmethod
def generate_message(messages):
pass
with open('corpus/test.txt', 'r', encoding='utf-8') as f:
text = f.read()
text_model = markovify.Text(text)
for i in range(3):
print(text_model.make_short_sentence(140))
return '\n\n-----\n\n[^^Info](%s) ^^| [^^Subreddit](%s)' % (INFO_URL, SUB_URL)
def log(*msg, file=None, additional=''):
"""
Prepends a timestamp and prints a message to the console and LOGFILE
"""
output = "%s:\t%s" % (time.strftime("%Y-%m-%d %X"), ' '.join(msg))
if file:
print(output, file=file)
else:
print(output + additional)
with open(LOGFILE, 'a') as f:
f.write(output + '\n')
class QText(markovify.Text):
"""
This subclass makes three changes: it modifies the sentence filter
to allow emotes in comments, it uses the Natural Language Toolkit
for slightly more coherent responses, and it guarantees a response
every time with make_sentence.
"""
max_overlap_ratio = 0.7
max_overlap_cap = 6
def __init__(self, input_text, state_size=2, chain=None):
"""
input_text: A list of strings representing individual comments.
state_size: An integer indicating the number of words in the model's state.
chain: A trained markovify.Chain instance for this text, if pre-processed.
"""
if chain == None:
char = secrets.choice(list(files))
char_file = files[char]
with open(f'doki/{char_file}.lc', 'r') as quote_file:
ciphered = quote_file.read()
if not glitch:
glitch = secrets.randbelow(6)
glitch = not bool(glitch)
if glitch:
line_count = 1
thumbnail = char_glitches[char]
else:
line_count = 3
thumbnail = secrets.choice(chars[char])
lines = []
for x in range(0, line_count):
output = markovify.Text(ciphered).make_short_sentence(500, tries=100)
output = clean(output, pld.msg.author)
if glitch:
cipher = get_encryptor(cmd.bot.cfg)
if cipher:
output = cipher.encrypt(output.encode('utf-8')).decode('utf-8')
lines.append(output)
output_final = ' '.join(lines)
if glitch:
title = title_glitches[char]
else:
title = titles[char]
response = discord.Embed(color=0xe75a70)
response.add_field(name=f'💟 {title}', value=output_final)
response.set_thumbnail(url=thumbnail)
await pld.msg.channel.send(embed=response)
def make_padding(self):
if self.dynamic:
f = open(self.corpus, 'r')
text = markovify.Text(f)
self.logger.info("Spam Evader: Generating dynamic padding from corpus...")
pad = '<p style="font-size: 0px">'
for i in range(1, 50):
temp = text.make_sentence()
if temp is not None:
pad += ' ' + temp
if i % 5 == 0:
pad +=' <br>'
else:
pad += ' <br>'
pad += ' </p>'
self.logger.info("Spam Evader: Dynamic Padding Generated Successfully")
f.close()
else:
self.logger.warning("Spam Evader: Message created using static padding!")
pad = STATIC_PADDING
def generate_tweet_text(mood):
filename = ("emotions/{}.txt").format(mood)
with open(filename, encoding='utf-8') as f:
text = f.read()
text = utils.strip_non_ascii(text)
text_model = markovify.Text(text)
sentence = text_model.make_short_sentence(120) # generate short tweet
synonymset = dictionary.synonym(mood)
synonym = choice(synonymset)
sentence += " #{}".format(synonym) # generate hashtag
return sentence.encode('utf-8')
messages = []
pfx = await ev.db.get_guild_settings(cl_chn.guild.id, 'prefix') or ev.bot.cfg.pref.prefix
# noinspection PyBroadException
try:
async for log in cl_chn.history(limit=100000):
cnt = log.content
if log.author.id == cl_usr.id and len(log.content) > 8:
if not check_for_bot_prefixes(pfx, cnt) and not check_for_bad_content(cnt):
cnt = cleanse_content(log, cnt)
if cnt not in messages and cnt and len(cnt) > 1:
messages.append(cnt)
except Exception as e:
print(e)
pass
try:
new_chain = markovify.Text(f'{". ".join(messages)}.')
combined = markovify.combine([chain, new_chain]) if chain else new_chain
insert_data = {'user_id': cl_usr.id, 'chain': serialize(combined.to_dict())}
await ev.db[ev.db.db_nam].MarkovChains.delete_one({'user_id': cl_usr.id})
await ev.db[ev.db.db_nam].MarkovChains.insert_one(insert_data)
await notify_target(cl_ath, cl_usr, cl_chn, len(messages), combined.parsed_sentences)
current_user_collecting = None
ev.log.info(f'Collected a chain for {cl_usr.name}#{cl_usr.discriminator} [{cl_usr.id}]')
except Exception as e:
await notify_failure(cl_ath, cl_usr, cl_chn)
ev.log.error(f"Markov generation failure for {cl_usr.id}: {e}.")
await asyncio.sleep(1)
def get_text_model(lines):
for i in range(len(lines)):
line = ""
for j in range(len(lines[i])):
line += lines[i][j]
if j < len(lines[i]) - 1:
line += " "
else:
line += "."
lines[i] = line
text_model = markovify.Text(lines, state_size=2)
return text_model