User:Dani/Catglue

From LifeWiki
< User:Dani
Revision as of 23:48, 4 February 2023 by DroneBetter (talk | contribs) (Add (with brief explanation and program itself in <pre>'s because I couldn't find a plain text template like Wikipedia has), for purposes of citation (I don't like MediaWiki rendering tabs as octuple-spaces like Github, albeit))
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Catglue is a program written by dani for reporting discoveries in Catagolue censuses (for Life in particular) to the Conwaylife Lounge, the most recent version of which is included here (for lack of a Github repository elsewhere).

# catglue notifier v1.2
# programmed by dani, 2021-09-02
# this version by dani, 2022-08-08

import os
import time
import hashlib
from requests import get, post

webhook_url = "boom tetris for jeff"
notifier_role = "876266912663892030"

total_wait = 3600 # seconds
compare_wait = 60
get_wait = total_wait - compare_wait

fail_time = 30
post_fail_time = 30

symmetries = ["D8_4", "D8_1", "D4_+4", "D4_+2", "D4_+1", "D4_x4", "D4_x1", "D2_+2", "D2_+1", "D2_x", "C4_4", "C4_1", "C2_4", "C2_2", "C2_1", "H4_+4", "H4_+2", "H4_+1", "G2_4", "G2_2", "G2_1", "C1", "G1"]

names = {**{name.split("\"")[1]: name.split("\"")[3] for name in open("all-common-names.txt", "r").readlines()[1:-1]},
	**{name.split("\"")[1]: name.split("\"")[3] for name in open("wiki-names.txt", "r").readlines()}} #get common names

colours = {
	"xs": 0xFFFFCE,
	"xp": 0xCECEFF,
	"xq": 0xCEFFCE,
	"yl": 0xFFCECE,
	"zz": 0xFFCEFF,
	"ov": 0xCECECE,
	"PA": 0xFFFFFF,
	"me": 0xCEFFFF
}

def rget(url): # get but it retries if fail
	backoff = fail_time
	while True:
		try:
			data = get(url)
			if data.status_code == 200:
				return data
			else:
				print(data.status_code)
		except Exception as e:
			print(e)
		print("Server Error, sleeping for " + str(backoff) + " seconds...")
		time.sleep(backoff)
		backoff += fail_time

def rpost(url, js): # get but it retries if fail
	backoff = post_fail_time
	while True:
		try:
			data = post(url, json=js)
			if data.status_code == 204:
				return data
			else:
				print(data.status_code)
		except Exception as e:
			print(e)
		print("Server Error, sleeping for " + str(backoff) + " seconds...")
		time.sleep(backoff)
		backoff += post_fail_time

def notify(apgcode, occurrences, occurrences_old, symmetry, pcount): # notify about object
	print(apgcode, occurrences, occurrences_old, symmetry, pcount)

	samples = rget("https://catagolue.hatsya.com/textsamples/" + apgcode + "/b3s23")
	soup = samples.text.split(symmetry+"/")[-1].split("\n")[0]

	# Get additional information
	
	soups = rget("https://catagolue.hatsya.com/attribute/" + apgcode + "/b3s23").text
	soup_index = soups.rfind(soup)

	prefix = apgcode.split("_")[0]

	if soup_index != -1:
		found, owner = soups.split(" on ")[-1].split(" and is owned by ")
		owner = owner.split("\n")[0]
		found.replace(" at ", "")
		if owner.find("@") > 2:
			owner = owner.split("@")[0]
	else: # FALLBACK: Get haul manually and find discoverer.
		for r in [14, 48, 12]:
			root = soup[:r]
			haul = rget("https://catagolue.hatsya.com/haul/b3s23/"+symmetry+"/"+hashlib.md5(root.encode("ascii")).hexdigest()).text
			if haul.find("submitted") != -1:
				found = haul.split("UTC")[0].split(" on ")[-1] + "UTC"
				owner = haul.split("submitted by ")[1].split(">")[1].split("<")[0]
				break
			else:
				found = "???"
				owner = "???"
	
	name = names.get(apgcode)
	if name:
		apgcodetext = "["+name.split("!")[0]+" ("+apgcode+")](https://catagolue.hatsya.com/object/"+apgcode+"/b3s23)"
	else:
		apgcodetext = "["+apgcode+"](https://catagolue.hatsya.com/object/"+apgcode+"/b3s23)"

	embed = {
		"embeds": [{
			"title": "Object found!",
			"description": "A(n) "+prefix+" with a low number of occurrences has been located." if occurrences_old != 0 else "A(n) "+apgcode.split("_")[0]+" with zero previous occurrences has been located!",
			"color": colours[apgcode[:2]],
			"footer": {
				"icon_url": "https://cdn.discordapp.com/avatars/876266072125358090/82e14c7e2b9e9bcc84b08c477007ec60.png",
				"text": "catgIue.py"
			},
			"fields": [
				{
					"name": "apgcode",
					"value": apgcodetext,
					"inline": True
				},
				{
					"name": "Occurrences",
					"value": str(occurrences) + " (" + str(occurrences_old) + ")",
					"inline": True
				},
				{
					"name": "Symmetry",
					"value": symmetry,
					"inline": True
				},
				{
					"name": "Soup",
					"value": "["+soup+"](https://catagolue.hatsya.com/hashsoup/"+symmetry+"/"+soup+"/b3s23)" if found != "???" else "???",
					"inline": True
				},
				{
					"name": "Discoverer",
					"value": owner,
					"inline": True
				},
				{
					"name": "Time",
					"value": found,
					"inline": True
				}
			]
		}]
	}

	if not isinstance(pcount, bool) and apgcode[0] in "xy":
		# List USO
		embed["embeds"][0]["fields"].append({"name": "Unique Similar Objects", "value": pcount, "inline": True})
		

	if occurrences_old == 0:
		# ping for new object
		embed["content"] = "<@&" + notifier_role + "> " + prefix + " get!"
	else:
		embed["content"] = prefix + " get!"

	response = rpost(webhook_url, embed)

def is_active(symmetry): # checks whether a given symmetry has been updated in the last wait_time
	data = rget("https://catagolue.hatsya.com/texthaul/b3s23/" + symmetry).text.split(" ")[2]
	last_haul = time.strptime(data, "%Y-%m-%dT%H:%M:%S")
	last_call = time.gmtime(time.time() - total_wait)
	return last_haul > last_call

def get_pop(apgcode): # gets population of apgcode. this code isn't used but i wrote anyway
	letters = "0123456789abcdefghijklmnopqrstuv"
	population, space = 0, 0
	if apgcode[:2] == "xs":
		return int(apgcode[2:apgcode.find("_")])
	elif apgcode[:2] in ["me", "ov", "PA", "yl", "zz"]:
		return 0
	code = apgcode.split("_")[1]

	for c in code:
		if c == "y" and space == 0:
			space = 1
		elif c not in "wxz" and space == 0:
			population += bin(letters.find(c)).count("1")
		elif space == 1:
			space = 0
	return population

def diff(symmetry): # compares textcensus files
	old_file = open(symmetry + "_old", "r").read()
	new_file = open(symmetry + "_new", "r").read()
	old = {i.split(",")[0][1:-1]: int(i.split(",")[1][1:-1]) for i in old_file.splitlines()[1:] if i[0] == "\""}
	new = {i.split(",")[0][1:-1]: int(i.split(",")[1][1:-1]) for i in new_file.splitlines()[1:] if i[0] == "\""}

	amounts = {} # Only check one object per prefix

	if symmetry not in ["C1", "G1"]:
		largest_still_life = 0
		largest_messless = 0
		largest_methuselah = 0
		largest_megasized = 0

		# Find largest still life in symmetry
		for apgcode in old.keys():
			if apgcode[:2] == "xs":
				pop = int(apgcode[2:].split("_")[0])
				if pop > largest_still_life:
					largest_still_life = pop
			elif apgcode[:8] == "messless":
				pop = int(apgcode[8:-1].split("_")[1])
				if pop > largest_messless:
					largest_messless = pop
			elif apgcode[:10] == "methuselah":
				pop = int(apgcode[10:-1].split("_")[1])
				if pop > largest_methuselah:
					largest_methuselah = pop
			elif apgcode[:9] == "megasized":
				pop = int(apgcode[9:-1].split("_")[1])
				if pop > largest_megasized:
					largest_megasized = pop

	def is_notable(apgcode, old_num, symmetry): # This has been nested inside diff so it can access the full file text.
		name = names.get(apgcode)
		if old_num >= 10 or (symmetry not in ["C1", "G1"] and (old_num >= 1 or apgcode[:4] == "xp2_") and not name):
			return False

		label = apgcode[:2]
		prefix = apgcode.split("_")[0]
		if apgcode != prefix:
			prefix += "_"

		if symmetry in ["C1", "G1"]:
			#if symmetry == "G1":
			#	samples = rget("https://catagolue.hatsya.com/textsamples/" + apgcode + "/b3s23").text
			#	if samples[:3] == "C1/" or samples.find("\nC1/") != -1:
			#		return False
			if name:
				return True
			if label == "xs":
				pop = int(prefix[2:-1])
				if pop >= 33 and old_num < 10:
					return True
				elif pop <= 16:
					return True
			elif label == "xp":
				per = int(prefix[2:-1])
				if per > 2 and old_num < 10:
					return True
				elif get_pop(apgcode) < 18 and old_num < 10:
					return True
			else:
				if old_num < 10:
					return True
			return False
		
		if amounts.get(prefix, None):
			pcount = amounts[prefix]
		else:
			pcount = old_file.count(prefix)
			amounts[prefix] = pcount
		
		if name:
			return pcount
		if prefix[:2] == "xs" and int(prefix[2:-1]) < largest_still_life:
			return False
		elif prefix == "messless_" and int(apgcode[9:-1]) < largest_messless:
			return False
		elif prefix == "methuselah_" and int(apgcode[11:-1]) < largest_methuselah:
			return False
		elif prefix == "megasized_" and int(apgcode[10:-1]) < largest_megasized:
			return False
		return pcount

	diff = {}
	for k in new:
		# print(k)
		old_num = old.get(k, 0)
		diff[k] = new[k] - old_num
		pcount = is_notable(k, old_num, symmetry)
		if diff[k] == 0 or pcount is False: # pcount only returned if notable
			diff.pop(k, None)
		else:
			notify(k, new[k], old_num, symmetry, pcount)

def get_new(symmetry): # gets new textcensus
	if os.path.exists(symmetry + "_old"):
		os.remove(symmetry + "_old")
	if os.path.exists(symmetry + "_new"):
		os.rename(symmetry + "_new", symmetry + "_old")
	if symmetry not in ["C1", "G1"]:
		open(symmetry + "_new", "w+").write(rget("https://catagolue.hatsya.com/textcensus/b3s23/" + symmetry + "/summary").text)
	else:
		open(symmetry + "_new", "w+").write(rget("https://catagolue.hatsya.com/textcensus/b3s23/" + symmetry).text)

print("wlecome to catglue")
first_time = True

while True:
	time.sleep(get_wait - (time.time()-(compare_wait))%total_wait)
	actives = []
	for symmetry in symmetries:
		if first_time or is_active(symmetry):
			actives.append(symmetry)
			get_new(symmetry)
			print("Got " + symmetry)
	print("Done " + str(time.time()))
	time.sleep(compare_wait)
	for symmetry in actives:
		if os.path.exists(symmetry + "_old"):
			diff(symmetry)
		print("Compared " + symmetry)
	print("Done " + str(time.time()))
	first_time = False

"""for symmetry in symmetries:
	if os.path.exists(symmetry + "_old"):
		diff(symmetry)
	print("Compared " + symmetry)"""