Coverage for src/sankey_cashflow/sankey_cash.py: 10%
1021 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-04 06:21 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-04 06:21 +0000
1import pygsheets
2import pandas as pd
3import plotly.graph_objects as go
4import datetime
5import logging
6from os import path
7from numpy import isnan, float64
8from csv import DictReader
9import re
10import networkx as nx
11from uuid import uuid4
12from pathlib import Path
13# Types
14from pandas._libs.tslibs import timestamps, nattype
15from typing import List, Dict, Tuple, Union, Optional
17logger = logging.getLogger(__name__)
18ConsoleOutputHandler = logging.StreamHandler()
19# TODO: override this with params (note: root logger level will also need to be changed)
20ConsoleOutputHandler.setLevel(logging.WARNING)
21ConsoleOutputHandler.name = "console"
22logger.addHandler(ConsoleOutputHandler)
24"""
25See README.md for usage instructions and examples
27References:
28- See: https://lifewithdata.com/2022/08/29/how-to-create-a-sankey-diagram-in-plotly-python/
29- https://erikrood.com/Posts/py_gsheets.html
30- https://github.com/nithinmurali/pygsheets
31- https://pygsheets.readthedocs.io/en/stable/
32- More complex example: https://plotly.com/python/sankey-diagram/
35Notes:
36 - About mutability: Pandas DataFrames are mutable except when they're not... I am treating them as immutable here
37 and when necessary to make changes using return values and re-assignment. From the Pandas docs:
38 ## Mutability and copying of data
39 All pandas data structures are value-mutable (the values they contain can be altered) but not always size-mutable.
40 The length of a Series cannot be changed, but, for example, columns can be inserted into a DataFrame.
41 However, the vast majority of methods produce new objects and leave the input data untouched. In general we like
42 to favor immutability where sensible.
43- Permissions: Create a service account and download credentials in json format,
44 (detailed instructions here: https://pygsheets.readthedocs.io/en/stable/authorization.html)
45 then share spreadsheet to service account user
46- Tag usage scenarios:
47 - Override sources/targets on individual tag level using --tags arg. [done]
48 - Explode all tags...
49TODO: make tag/store overrides as source with previous category assignments preserved [done, except as targets]
50TODO: prevent multiple s-tags
51"""
53# Define some classes =================================================================================================
56class AppSettings:
57 """
58 Application settings and defaults + validation, getters/setters, etc
59 """
60 def __init__(self, args):
61 self.DEFAULT_START_DATE = pd.to_datetime("10/1/2022")
62 # A csv file or a google Sheets document containing transactions data.
63 # (In the case of the latter, a sheet name must be provided as well)
64 self.data_source = args.source
65 self.audit_mode = args.audit
66 # The name (or prefix plus wildcard) of the sheet containing the transactions data
67 # (if data_source is a google Sheets document
68 self.data_sheet = args.sheet or "Transactions_*"
69 # A csv file or sheet name containing sources and targets
70 self._labels_source = args.srcmap or "Sources-Targets"
71 self.filter_dates = args.range
72 self.separate_taxes = args.separate_tax
73 self.verbose = args.verbose
74 self._g_creds = args.creds or './google_service_account_key.json'
75 self.distribute_amounts = args.distributions
76 self.all_time = args.all_time
77 self.recurring = args.recurring
78 self.base_title = "Cashflow"
79 self._date_filter_start = None
80 self._date_filter_end = None
81 self.tags = None
82 self.feed_in = None
83 self.exclude_tags = None
84 self.stores = None
85 self.tag_override = False
86 self.hover = "Category"
87 self.chart_resolution = None
88 self.sales_tax_classification = "Taxes"
89 self.tip_classification = "xTips"
90 self.diagram_type = "sankey"
91 if args.verbose:
92 logger.setLevel(logging.DEBUG)
93 logger.handlers[0].setLevel(logging.DEBUG) # Assumes only one handler
94 if args.hover:
95 if args.hover.lower() in ["desc", "stores", "description"]:
96 self.hover = "Description"
97 if args.hover == "tags":
98 logger.warn("Tags in hovertext not yet implemented!")
99 if args.hover.lower() in ["none", "no", "false"]:
100 self.hover = None
101 if args.dtype:
102 if args.dtype.lower() in ["sankey", "line"]:
103 self.diagram_type = args.dtype.lower()
104 else:
105 logger.warn(f"Unknown diagram type: {args.dtype}")
106 if args.tags:
107 self.tags = [i.strip() for i in args.tags.split(',')]
108 if args.tag_override:
109 self.tag_override = True
110 if args.feed_in:
111 self.feed_in = True
112 if args.exclude:
113 self.exclude_tags = [i.strip() for i in args.exclude.split(',')]
114 if args.stores:
115 self.stores = [i.strip() for i in args.stores.split(',')]
116 self.colors = {} # label: [link, node]
117 if self.tags and self.stores:
118 raise Exception("Stores and tags visualizations should not be combined!")
119 self.validate_sources()
121 @property
122 def date_filter_start(self):
123 return self._date_filter_start
125 @date_filter_start.setter
126 def date_filter_start(self, val):
127 if not val or len(val) == 0:
128 self._date_filter_start = None
129 else:
130 self._date_filter_start = pd.to_datetime(val)
132 @property
133 def date_filter_end(self):
134 return self._date_filter_end
136 @date_filter_end.setter
137 def date_filter_end(self, val):
138 if not val or len(val) == 0:
139 self._date_filter_end = None
140 else:
141 self._date_filter_end = pd.to_datetime(val)
143 @property
144 def g_creds(self):
145 return self._g_creds
147 @g_creds.setter
148 def g_creds(self, val):
149 if not val or len(val) == 0 or not path.isfile(val):
150 raise Exception(f"Credentials file not found: {val}")
151 self._g_creds = val
153 @property
154 def labels_source(self):
155 return self._labels_source
157 @labels_source.setter
158 def labels_source(self, val):
159 if not val or len(val) == 0 or (val.endswith('.csv') and not path.isfile(val)):
160 raise Exception(f"Sources-targets file not found: {val}")
161 self._labels_source = val
163 def source_data_location(self):
164 if self.data_source.endswith('.csv'):
165 return self.data_source
166 else:
167 return f"{self.data_source}: {self.data_sheet}"
169 def validate_sources(self):
170 # check sources etc
171 if not self.data_source or len(self.data_source) == 0:
172 logger.warn("Please enter a valid data source!")
173 raise Exception("Missing data source.")
175 if self.data_source.endswith(".csv"):
176 logger.debug(f"Using csv data source: {self.data_source}")
177 # Using csv data source
178 # Note: additional data validation happens when loading this data
179 if not self.labels_source or not self.labels_source.endswith(".csv"):
180 raise Exception("A csv sources-targets sheet must be used when using csv source data.")
181 if not path.isfile(self.data_source):
182 raise Exception(f"Could not find provided data source: {self.data_source}")
183 if not path.isfile(self.labels_source):
184 raise Exception(f"Could not find provided sources-targets source: {self.labels_source}")
185 else:
186 # Using Google Sheets data source
187 # Note: additional access/permissions/data validation happens when fetching and loading this data
188 logger.debug(f"Using Google Sheets data source: {self.data_source}")
189 if not self.data_sheet or len(self.data_sheet) == 0:
190 raise Exception("Missing Google worksheet name.")
191 if not self.labels_source or len(self.labels_source) == 0:
192 raise Exception("A sources-targets sheet name must be supplied.")
193 if not self.g_creds or len(self.g_creds) == 0:
194 raise Exception("Google service account credentials must be provided.")
195 if not path.isfile(self.g_creds):
196 raise Exception(f"Invalid service account credential file provided: {self.g_creds}")
199class RowLabels:
200 """
201 Contains row label data and methods, used to map data categories to source, target, and color information.
202 Accepts an array of dicts on init which can come from a Google sheet or csv.
203 Data looks like: [{'Category Name': 'House', 'Type': 'computed', 'Source': 'Income', 'Target': 'House',
204 'Link color': 'rgba(153, 187, 255, 0.8)', 'Node color': 'rgba(102, 153, 255, 1)', 'Comments': '', '': ''}]
205 Also creates a DAG with source target edges based on labels definitions
206 """
207 def __init__(self, labeldata):
208 self._labeldata = labeldata
209 self._digraph = nx.DiGraph()
210 self._digraph.add_node("Income", ntype="income")
211 self.process_report = f"RowLabel report\n{'=' * 60}\n\n\n"
212 required_columns = ['Category Name', 'Type', 'Source', 'Target', 'Classification', 'Link color', 'Node color']
213 if False in [k in self._labeldata[0].keys() for k in required_columns]:
214 raise Exception(f"Sources-Targets sheet does not have all required columns! Needed: {required_columns}. \
215 Found: {list(self._labeldata[0].keys())}")
216 self._available_attributes = ['source', 'target', 'classification', 'link_color', 'node_color', 'type']
217 # TODO: validation
218 self._lookup = {}
219 # Map out data to lookup dict
220 for i in self._labeldata:
221 if len(i['Category Name']) == 0: # Skip empties
222 self.process_report += f"SKIPPING: {i}\n"
223 continue
224 if i['Category Name'] in self._lookup:
225 raise Exception(f"Duplicate label! {i['Category Name']}")
226 # Add to internal lookup dict
227 self.process_report += f"ADDING LOOKUP: {i}\n"
228 self._lookup[i['Category Name']] = {
229 'source': i.get('Source'),
230 'target': i.get('Target'),
231 'classification': i.get('Classification'),
232 'link_color': i.get('Link color'),
233 'node_color': i.get('Node color'),
234 'type': i.get('Type')
235 }
236 if i.get("Source") == "DEDUCTIONS":
237 # These have to be dynamically generated - skip adding to DAG for now (or maybe entirely)
238 # TODO (maybe): switch to using type and/or DAG attributes
239 self.process_report += f"SKIPPING DEDUCTION: {i}\n"
240 continue
241 if i["Type"] in ['tag', 's-tag']:
242 # NOT adding tag labels to graph - if needed they will be added on the fly
243 # TODO: investigate using DAG attributes instead. Test with distant/complex tag targets.
244 if i["Type"] == 's-tag':
245 self.process_report += f"Adding NODE for s-tag: {i} to DAG\n"
246 self._digraph.add_node(i['Category Name'], type='s-tag')
247 else:
248 self.process_report += f"NOT Adding tag: {i} to DAG\n"
249 continue
250 if i.get('Source') and i.get('Target'): # Add all source/target pairs as edges to DAG
251 # Add to DAG
252 if i.get('Type'):
253 self.process_report += \
254 f"ADDING EDGE: {i.get('Source')} -> {i.get('Target')} (ntype={i.get('Type')})\n"
255 self._digraph.add_edge(i.get('Source'), i.get('Target'), ntype=i.get('Type'))
256 else:
257 self.process_report += f"ADDING EDGE: {i.get('Source')} -> {i.get('Target')}\n"
258 self._digraph.add_edge(i.get('Source'), i.get('Target'))
259 else:
260 self.process_report += f"ERROR (SKIPPING): {i}\n"
261 logger.warn(f"Category: {i['Category Name']} yielded an empty source and/or target! \
262 {i.get('Source')}:{i.get('Target')}")
264 @property
265 def data(self):
266 return self._labeldata
268 def get_longest_path(self):
269 return nx.dag_longest_path(self._digraph)
271 def get_path(self, source, target):
272 path = [i for i in nx.all_simple_paths(self._digraph, source, target)]
273 # Note path obj may contain 0 or multiple paths
274 if not path or len(path) == 0:
275 logger.warn(f"No path found for {source}:{target}")
276 elif len(path) > 0:
277 logger.warn(f"Multiple paths found for {source}:{target}. ({path})")
278 else:
279 return path[0]
281 def get_label(self, labelname, labeltype=None):
282 """
283 Return a label name & tag pair or None
284 """
285 item = self._lookup.get(labelname) # TODO: test duplicate category names
286 if labeltype == "any":
287 return item
288 if labeltype and labeltype == "tag":
289 if item and item.get('type') == 'tag':
290 return item
291 return None
292 elif labeltype and labeltype == "s-tag":
293 if item and item.get('type') == 's-tag':
294 return item
295 return None
296 if item and item.get('type') not in ['tag', 's-tag']:
297 return item
298 return None
300 def get_attribute(self, labelname, labelattribute, use_default=True, labeltype=None, original_category=None):
301 """
302 Get named attribute by label. Cases:
303 Unknown attribute: raise exception
304 label & labeltype are exist:
305 attribute exists: return attribute
306 attribute doesn't exist: return default attribute
307 label & labeltype doesn't exists:
308 return default attributes
309 """
310 if labelattribute not in self._available_attributes:
311 raise Exception(f"Unknown attribute: {labelattribute}")
312 if labelattribute == "type":
313 item = self.get_label(labelname, "any")
314 else:
315 item = self.get_label(labelname, labeltype)
316 default_item = self.get_label('default')
317 if item:
318 if labelattribute == "type":
319 return item.get("type", "")
320 if not is_null(item[labelattribute]) and len(item[labelattribute]) != 0:
321 return item[labelattribute]
322 elif labelattribute == "type":
323 return None
324 if use_default:
325 # Did not find the attribute - use defaults
326 if labelattribute == "source":
327 return "Uncategorized"
328 if labelattribute == "target":
329 return labelname
330 if labelattribute == "classification":
331 return 'Uncategorized'
332 if labelattribute in ["node_color", "link_color"]:
333 return default_item[labelattribute]
334 return None
336 def print_graph(self, filename):
337 from matplotlib import pyplot as plt
338 plt.tight_layout()
339 nx.draw_networkx(self._digraph, arrows=True)
340 # plt.figure(figsize=(20,20))
341 plt.savefig(filename, dpi=200, format="PNG")
342 plt.clf()
345class Transactions:
346 """
347 Contains transaction data and all helper methods for transforming and outputing.
348 init with a Pandas dataframe from Google Sheets or a csv. NOTE: depending on the activity, dataframes are mutable.
349 TODO:
350 empty values from CSV are 'NaN'
351 Make sure various synthetic entries don't cause problems for other computations
352 Make sure that various methods can be run in any order, or enforce precedence/locking
353 """
354 def __init__(self, dataframe, labels_obj, app_settings_obj):
355 self._df = dataframe
356 self._grouped_df = None
357 self.length = len(dataframe)
358 self._app_settings = app_settings_obj
359 self._labels_obj = labels_obj
360 is_valid = self._validate_df() # Will throw an exception if invalid
361 # Convert all dates to datetimes and sort earliest to latest
362 if self._app_settings.verbose:
363 logger.info(f"Converting data in {self.length} fetched rows to datetimes...")
364 self._df["Date"] = pd.to_datetime(self._df["Date"]) # Does not mutate dataframe
365 if nattype.NaTType in [type(i) for i in self._df["Date"]]:
366 logger.critical(repr(self._df["Date"]))
367 raise Exception("Empty date found!") # There is probably a better way to do this.
368 self.earliest_date = self._df["Date"].sort_values().iloc[0] # Returns pandas._libs.tslibs.timestamps.Timestamp
369 self.latest_date = self._df["Date"].sort_values().iloc[len(self._df) - 1]
370 self.default_date = self.latest_date - datetime.timedelta(days=1) # Day before latest date in dataset.
371 self.max_depth = 1
372 self.tips_processed = False
373 self.sales_tax_processed = False
374 self.surplus_deficit_processed = False
375 self.amount_distributions = False
376 self.process_report = f"Transactions report\n{'=' * 60}\n\n\n"
378 def _validate_df(self) -> bool:
379 # Validate header row
380 # WIP: port over new sources-targets column
381 header_is_valid = DataRow.validate(self._df.columns.to_list(), True)
382 if not header_is_valid[0]:
383 raise Exception(f"Source columns failed validation! Error was: {header_is_valid[1]}")
384 # Validate data rows
385 amt_types = [isinstance(i, float) for i in self._df["Amount"]]
386 if False in amt_types:
387 invalid_loc = amt_types.index(False) # Note, only return first invalid location
388 raise Exception(f"Invalid data found at row {invalid_loc}!\n {self._df.iloc[invalid_loc]}")
389 return True
391 def audit(self, audit_data, date_range=None):
392 """
393 Compare transaction data to audit data. Note this is specifically set up to use the column format for my
394 bank export data. YMMV.
395 Step 1: Apply date filtering (if applicable) (TODO: Test date handling)
396 Step 2: Create a column with sums for amount, tax, tips to use for lookups
397 Step 3: Loop through the bank export and search for matching entries based on the transaction amount
398 Step 3a: If multiple transactions have the same amount, check that any of them fall +/- 5 days.
399 Note: this does create a edge case where you could have a false negative if two transaction had the
400 same values. within the search time.
401 Write out a report with any suspected missings. Note: this will be a little noisy since I break
402 transactions into multiple rows sometimes. (eg, Costco visits)
403 """
404 dt_today = datetime.datetime.today()
405 audit_report = ""
407 # Step 1
408 if self._app_settings.filter_dates:
409 if not date_range:
410 raise Exception("Filter dates flag was True, but no dates were passed in!")
411 start_date = date_range[0]
412 end_date = date_range[1]
413 if end_date is None and not self._app_settings.all_time:
414 end_date = dt_today
415 if start_date is None and not self._app_settings.all_time:
416 start_date = self._app_settings.DEFAULT_START_DATE
417 audit_data = df_date_filter(audit_data, start_date, end_date)
418 elif not self._app_settings.all_time:
419 audit_data = df_date_filter(audit_data, self._app_settings.DEFAULT_START_DATE, dt_today)
421 def safe_sum(pd_series):
422 """
423 Safely sum a transaction frame
424 """
425 def vval(val):
426 if not val:
427 return 0
428 return round(float(val), 2)
429 return round(pd_series["Amount"] + vval(pd_series.get("Sales Tax")) + vval(pd_series.get("Tips")), 2)
431 # Step 2
432 rowsums = self._df.apply(safe_sum, axis=1)
434 # Step 3
435 for idx, row in audit_data.iterrows():
436 transaction_found = False
437 hits = self._df[rowsums == row["Amount"]]
438 for hit in hits["Date"]:
439 if (row["Date"] < (hit + datetime.timedelta(days=5))) and \
440 (row["Date"] > (hit - datetime.timedelta(days=5))):
441 transaction_found = True
442 if not transaction_found:
443 audit_report += f"Transaction not found for {row['Date']} - {row['Description']} - {row['Amount']}\n"
445 return audit_report
447 def process(self, date_range=None):
448 """
449 Process dataframe for sankey diagram
450 Step 1: Drop rows based on tag exclusions
451 Step 2: Split out any entries containing distributions (if feature flag is turned on)
452 Step 3: Apply date filtering (if applicable)
453 Step 4: Update transaction rows with sources and targets as defined by labels spreadsheet for all entries,
454 modify sources/targets based on tags & store filters. Also handle recurring items.
455 Step 5: Loop through each entry and:
456 a: check for sales tax and/or tips column. Update total amount and create synthetic flows for tax/tips.
457 b: crawl back through DAG, creating synthetic entries for predecessor nodes along the way,
458 to ensure flows appear correctly.
459 Step 6: Compute surplus or deficit flows
460 Step 7: aggregate amounts for all shared source:target pairs
461 """
463 dt_today = datetime.datetime.today()
464 self.process_report += f"Processing {len(self._df)} transactions \
465 from {self._app_settings.source_data_location()}\n{'-' * 60}\n"
466 if self._app_settings.verbose:
467 logger.info(f"Processing {len(self._df)} transactions from {self._app_settings.source_data_location()}")
469 # Step 1:
470 if self._app_settings.exclude_tags:
471 self.filter_tags(self._app_settings.exclude_tags)
473 # Step 2:
474 if self._app_settings.distribute_amounts:
475 if self._app_settings.verbose:
476 logger.info("Distributing amounts...")
477 self.distribute_amounts() # process report logging happens in called method
479 # Step 3:
480 if self._app_settings.filter_dates:
481 if not date_range:
482 raise Exception("Filter dates flag was True, but no dates were passed in!")
483 start_date = date_range[0]
484 end_date = date_range[1]
485 if end_date is None and not self._app_settings.all_time:
486 end_date = dt_today
487 if start_date is None and not self._app_settings.all_time:
488 start_date = self._app_settings.DEFAULT_START_DATE
489 self.process_report += f"Filtering for dates from {start_date} to {end_date}\n{'-' * 60}\n"
490 if self._app_settings.verbose:
491 logger.info(f"Filtering for dates from {start_date} to {end_date}")
492 # TODO: test and find edge cases!
493 self.filter_dates(start_date, end_date)
494 elif not self._app_settings.all_time:
495 self.filter_dates(self._app_settings.DEFAULT_START_DATE, dt_today)
497 self.update_title()
498 # Step 4:
499 self.apply_labels()
500 # Step 5:
501 self.process_rows()
502 # Step 6:
503 self.create_surplus_deficit_flows()
504 # Step 7:
505 self.collapse()
506 # -- END Transactions.process() --
508 def process_line(self, date_range=None):
509 """
510 Process dataframe for line chart diagram
511 Step 1: Drop rows based on tag exclusions
512 Step 2: Split out any entries containing distributions (if feature flag is turned on)
513 Step 3: Apply date filtering (if applicable)
514 Step 4: Update transaction rows with sources and targets as defined by labels spreadsheet for all entries,
515 modify sources/targets based on tags & store filters. Also handle recurring items.
516 """
518 dt_today = datetime.datetime.today()
519 self.process_report += f"Processing {len(self._df)} transactions from \
520 {self._app_settings.source_data_location()}\n{'-' * 60}\n"
521 if self._app_settings.verbose:
522 logger.info(f"Processing {len(self._df)} transactions from {self._app_settings.source_data_location()}")
524 # Step 1:
525 if self._app_settings.exclude_tags:
526 self.filter_tags(self._app_settings.exclude_tags)
528 # Step 2:
529 if self._app_settings.distribute_amounts:
530 if self._app_settings.verbose:
531 logger.info("Distributing amounts...")
532 self.distribute_amounts() # process report logging happens in called method
534 # Step 3:
535 if self._app_settings.filter_dates:
536 if not date_range:
537 raise Exception("Filter dates flag was True, but no dates were passed in!")
538 start_date = date_range[0]
539 end_date = date_range[1]
540 if end_date is None and not self._app_settings.all_time:
541 end_date = dt_today
542 if start_date is None and not self._app_settings.all_time:
543 start_date = self._app_settings.DEFAULT_START_DATE
544 self.process_report += f"Filtering for dates from {start_date} to {end_date}\n{'-' * 60}\n"
545 if self._app_settings.verbose:
546 logger.info(f"Filtering for dates from {start_date} to {end_date}")
547 # TODO: test and find edge cases!
548 self.filter_dates(start_date, end_date)
549 elif not self._app_settings.all_time:
550 self.filter_dates(self._app_settings.DEFAULT_START_DATE, dt_today)
552 # self.update_title() # may need to use a different title block for line charts
553 # Step 4:
554 self.apply_labels()
555 # -- END Transactions.process_line() --
557 def filter_tags(self, tags_to_exclude):
558 # tags_to_exclude: ['tag1','tag2', ...]
559 self.process_report += f"Checking for tags to exclude: {tags_to_exclude}\n{'-' * 60}\n"
560 df_changed = False
561 rows_to_drop = []
562 for k, v in enumerate(self._df["Tags"]):
563 tag_matches = DataRow.tag_matches(v, tags_to_exclude) # None if either arg is None or if no matches
564 if tag_matches:
565 df_changed = True
566 rows_to_drop.append(self._df.index[k])
567 if df_changed and rows_to_drop: # Do as a separate loop to avoid changing the frame as we're iterating over it.
568 for row_idx in rows_to_drop:
569 self.process_report += f"DROPPING row due to exclude tags: {self._df.loc[row_idx]}\n"
570 if self._app_settings.verbose:
571 logger.info(f"DROPPING row due to exclude tags: {self._df.loc[row_idx]}")
572 self._df.drop(row_idx, inplace=True)
573 if df_changed:
574 self._df.reset_index(inplace=True, drop=True)
576 def add_row(self, row_data, already_validated=False):
577 try:
578 idx = len(self._df) # Could use self.length...
579 if already_validated:
580 self._df.loc[idx] = row_data
581 else:
582 self._df.loc[idx] = DataRow.validate(row_data)
583 self.length = idx + 1
584 except Exception as e:
585 logger.error(f"Error adding row: {row_data} - {e}")
586 import pdb
587 pdb.set_trace()
589 def apply_labels(self):
590 """
591 Loop through each row in dataframe, looking up source-target nodes using category names, and overriding if
592 indicated by tags or stores flags.
593 Also add each source:target pair as an edge in a DAG, to be used in the sankey diagram generator to create
594 intermediate transactions.
595 NOTE: this process will not be adding intermediate transactions, but will ensure the DAG is correct so that
596 intermediate transactions can be added later.
597 """
598 self.process_report += f"Running Transactions.apply_labels(). Tags has: {self._app_settings.tags}, \
599 tag_override is {self._app_settings.tag_override} and stores has: {self._app_settings.stores}\n\n"
600 if self._app_settings.tags and self._app_settings.verbose:
601 logger.info(f"Tag search enabled: Looking for tags: {self._app_settings.tags}")
602 if self._app_settings.tag_override:
603 logger.info("Overriding tags...")
604 if self._app_settings.stores and self._app_settings.verbose:
605 logger.info(f"Store search enabled: Looking for stores: {self._app_settings.stores}")
606 if self._app_settings.verbose:
607 logger.info(f"Applying labels for {len(self._df)} transactions")
609 if self._app_settings.recurring:
610 if self._app_settings.verbose:
611 logger.info("Recurring transactions to be split out") # TODO: precludes tag:recurring handling.
612 # Add edge from Income to Recurring
613 self._labels_obj._digraph.add_edge("Income", "Recurring")
615 # Util functions ...................................................................................
616 def get_source_target_labels(this_obj, this_category_key, this_category_val, step_id):
617 # Get default labels defined for category from sources-targets sheet, override from data sheet if set there.
618 src = this_obj._labels_obj.get_attribute(this_category_val, "source")
619 tgt = this_obj._labels_obj.get_attribute(this_category_val, "target")
620 classification = this_obj._labels_obj.get_attribute(this_category_val, "classification")
621 this_obj.process_report += f"[{step_id}] Found default src:target for {this_category_val} -> {src}:{tgt} \
622 (classification: {classification})\n"
623 # Allow individual transaction rows to override label lookups
624 data_override_s_t = False
625 transaction_source = this_obj._df.at[this_category_key, "Source"]
626 transaction_target = this_obj._df.at[this_category_key, "Target"]
627 if not is_empty(transaction_source) and not is_empty(transaction_target):
628 # Both a source and target were specifed in the transaction data
629 src = transaction_source
630 tgt = transaction_target
631 data_override_s_t = True
632 elif not is_empty(transaction_source) and is_empty(transaction_target):
633 # A source but not target were specifed in the transaction data
634 src = transaction_source
635 data_override_s_t = True
636 elif is_empty(transaction_source) and not is_empty(transaction_target):
637 # A Target but not source were specifed in the transaction data,
638 # we will append it to the default source-target
639 if transaction_target != tgt: # Skip if the override is the same as the default target
640 if not this_obj._labels_obj._digraph.has_edge(src, tgt):
641 this_obj._labels_obj._digraph.add_edge(src, tgt)
642 src = tgt
643 tgt = transaction_target
644 data_override_s_t = True
646 if data_override_s_t:
647 this_obj.process_report += f"[{this_step_id}] Override source/target for {this_category_val} from \
648 transaction data -> {src}: {tgt}\n"
650 return src, tgt, classification
652 # MAIN LOOP ........................................................................................
654 for k, v in enumerate(self._df["Category"]):
655 # Main labeling loop. Iterate over each transaction, look up source-target information in labels
656 # spreadsheet applying/overriding as indicated.
657 this_step_id = str(uuid4())[:8]
658 self.process_report += f"[{this_step_id}] START Processing {self._df.at[k, 'Date']} | \
659 {v} | {self._df.at[k, 'Tags']} | ${self._df.at[k, 'Amount']}\n"
660 is_deduction = False
661 if is_empty(v):
662 # Note: this should not happen... raise exception instead?
663 self.process_report += f"[{this_step_id}] SKIPPING empty category {self._df.loc[k]}\n"
664 logger.info(f"Skipping empty category {self._df.loc[k]}")
665 continue
666 # Tag logic
667 # TODO: test source tags
669 this_source, this_target, this_classification = get_source_target_labels(self, k, v, this_step_id)
671 # Handle deduction types (these go directly from a income to an expense, skipping the 'Income' category
672 # and have a variable source based on their description)
673 # Use case is a transaction with income would normally be something like "My Job" -> "Income" and then a
674 # second transaction with income taxes would be "My Job" -> "Income Taxes" (skipping income category)
675 # TODO: Verify this works correctly with s-tags
676 if this_source == "DEDUCTIONS":
677 this_source = self._df.at[k, "Description"]
678 self._df.at[k, "Type"] = "deduction"
679 is_deduction = True
680 self.process_report += f"[{this_step_id}] Deduction type found src:target -> \
681 {this_source}:{this_target}\n"
682 if self._app_settings.verbose:
683 logger.info(f"Found DEDUCTION type transaction. Set to {this_source}:{this_target}")
685 if self._app_settings.recurring and this_source == "Income":
686 # Replace with recurring
687 this_source = "Recurring"
688 # Verify base edge is in DAG
689 if not self._labels_obj._digraph.has_edge(this_source, this_target):
690 logger.info(f"Edge: {this_source}:{this_target} not found! Adding to graph.")
691 self._labels_obj._digraph.add_edge(this_source, this_target)
693 # For now logic around tags/stores with deductions flow is undefined - skip processing.
694 if not is_deduction:
695 # Check for store match
696 store_matches = False
697 if self._app_settings.stores:
698 this_name = self._df.at[k, "Description"]
699 if self._app_settings.stores and this_name in self._app_settings.stores:
700 store_matches = True
702 # Check for tag match(es)
703 # Note currently we will only ever use the first match.
704 tag_matches = DataRow.tag_matches(self._df.at[k, "Tags"], self._app_settings.tags)
705 # None if flag is not enabled or no matches
706 tag_type = None
707 if tag_matches:
708 if self._app_settings.recurring and tag_matches and tag_matches[0] == "Recurring":
709 raise Exception("Not double processing recurring tags!") # TODO: handle more quietly
710 if self._app_settings.verbose:
711 logger.info(f"Got tag matches: {tag_matches}")
712 # Check for s-tags
713 # Get tag type
714 tag_type = self._labels_obj.get_attribute(tag_matches[0], "type")
715 if tag_type == "s-tag":
716 # If the flow is directly to/from "Income", replace "Income" with the Tag
717 if this_source == "Income":
718 this_source = tag_matches[0]
719 elif this_target == "Income":
720 this_target = tag_matches[0]
721 else:
722 self._labels_obj._digraph.add_edge(this_source, this_target, type="s-tag")
723 # NOTE: if tag is distant from "Income", we'll need to handle it while reconciling DAG
724 elif self._app_settings.tag_override:
725 # If overriding tags, we'll use the labels sheet to determine placement
726 this_source = self._labels_obj.get_attribute(tag_matches[0], "source", labeltype="tag",
727 use_default=False)
728 this_target = self._labels_obj.get_attribute(tag_matches[0], "target", labeltype="tag",
729 use_default=False)
730 if not this_source:
731 # If we don't have matching tag defined in sources-targets sheet,
732 # just create it to/from income
733 # lookup the target we would have without tag matching
734 def_target = self._labels_obj.get_attribute(v, "target", use_default=False)
735 if def_target == "Income": # Note: Breaks if we have income flows more than one deep
736 this_source = tag_matches[0]
737 this_target = "Income"
738 else:
739 this_source = "Income"
740 this_target = tag_matches[0]
741 else:
742 # We are appending the tags as new target to the end of the flow
743 this_source = this_target
744 this_target = tag_matches[0]
745 if self._app_settings.verbose:
746 logger.info(f"Adding edge to graph for tag ({tag_matches}[0]): \
747 {this_source} -> {this_target}")
748 # self._labels_obj._digraph.add_edge(this_source, this_target)
750 if store_matches:
751 this_source = this_target
752 this_target = self._df.at[k, "Description"]
753 if self._app_settings.verbose:
754 logger.info(f"Adding edge to graph for store ({this_name}): {this_source} -> {this_target}")
755 # self._labels_obj._digraph.add_edge(this_source, this_target)
757 self.process_report += f"[{this_step_id}] RESOLVED src:target for {v} -> {this_source}:{this_target}\n"
758 if self._app_settings.verbose:
759 logger.info(f"RESOLVED source/target > {this_source}:{this_target}")
760 # Circuit breaker
761 if is_empty(this_source) or is_empty(this_target):
762 raise Exception(f"Got empty source or target for category {v}! ({this_source}:{this_target})")
764 # Check for final edge in DAG and add if necessary
765 if not self._labels_obj._digraph.has_edge(this_source, this_target):
766 logger.info(f"Edge: {this_source}:{this_target} not found! Adding to graph.")
767 self._labels_obj._digraph.add_edge(this_source, this_target)
769 # Sanity check that we haven't created an orphan edge
770 if not (is_deduction or tag_type == 's-tag') and \
771 "Income" not in nx.ancestors(self._labels_obj._digraph, this_target) and \
772 "Income" not in nx.descendants(self._labels_obj._digraph, this_source):
773 logger.debug(f"{self._df.loc[k]}")
774 raise Exception(f"No path to \'Income\' from {this_source}:{this_target}")
776 # Set source-target + classification on original transaction
777 self._df.at[k, "Source"] = this_source
778 self._df.at[k, "Target"] = this_target
779 self._df.at[k, "Classification"] = this_classification
781 self.process_report += f"[{this_step_id}] FINISHED processing labels\n"
783 def process_rows(self):
784 """
785 Process individual transactions, creating synthetic transactions as needed to satisfy flows
786 """
787 msg = f"Processing row data on {len(self._df)} rows"
788 self.process_report += f"\n{'-' * 60}\nRunning Transactions.process_rows()\n{'-' * 60}\n"
789 self.process_report += msg + "\n"
790 if self._app_settings.verbose:
791 logger.info(msg)
792 for k, v in enumerate(self._df["Source"]):
793 this_row = self._df.loc[k]
794 this_step_id = str(uuid4())[:8]
795 is_recurring = False
796 self.process_report += f"[{this_step_id}] START Processing {self._df.at[k, 'Date']} | \
797 {self._df.at[k, 'Description']} | ${self._df.at[k, 'Amount']}\n"
798 if self._app_settings.verbose:
799 logger.info(f"{'-' * 40}\nGot a transaction: {self._df.at[k, 'Date']} | \
800 {self._df.at[k, 'Description']} | {self._df.at[k, 'Source']}:{self._df.at[k, 'Target']} | \
801 ${self._df.at[k, 'Amount']}\n")
803 # Check for tag match(es)
804 # Note currently we will only ever use the first match.
805 # TODO: discard tag if tag is "Recurring" AND _app_settings.recurring is True
806 # None if flag is not enabled or no matches
807 tag_matches = DataRow.tag_matches(self._df.at[k, "Tags"], self._app_settings.tags)
808 tag_type = None
809 if tag_matches:
810 tag_type = self._labels_obj.get_attribute(tag_matches[0], "type") # Get tag type
812 # Check for recurring tag
813 has_recurring = DataRow.tag_matches(self._df.at[k, "Tags"], ["Recurring"])
814 if self._app_settings.recurring and has_recurring and "Recurring" in has_recurring:
815 is_recurring = True
816 if self._app_settings.verbose:
817 logger.info(">> Processing recurring transaction")
819 # Handle taxes
820 if not is_empty(this_row["Sales Tax"], True):
821 if self._app_settings.separate_taxes:
822 # Add sales tax to it's own root category as a new row
823 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \
824 'Income' -> 'Sales Tax' | ${this_row['Sales Tax']}\n"
825 self.add_row(DataRow.create(
826 date=this_row.Date,
827 category_name="Sales Tax",
828 amount=this_row["Sales Tax"],
829 source="Income",
830 target="Sales Tax",
831 description=this_row.Description,
832 tags=this_row.Tags,
833 comment='Synthetic row for sales tax',
834 distribution=this_row.Distribution,
835 classification=self._app_settings.sales_tax_classification
836 ), True)
837 else:
838 # Create new sales tax child target from this original target row &
839 # add sales tax back to original row amount
840 # Note: if store or tag processing is being done, this may already be one removed from
841 # the original category
842 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \
843 {this_row.Target} -> 'Sales Tax' | ${this_row['Sales Tax']}\n"
844 if not is_empty(this_row["Sales Tax"], True):
845 self.add_row(DataRow.create(
846 date=this_row.Date,
847 category_name="Sales Tax",
848 amount=this_row["Sales Tax"],
849 source=this_row.Target,
850 target="Sales Tax",
851 description=this_row.Description,
852 tags=this_row.Tags,
853 comment='Synthetic row for sales tax',
854 distribution=this_row.Distribution,
855 classification=self._app_settings.sales_tax_classification
856 ), True)
857 # For this to behave as expected, it needs to add the sales tax amount back
858 # to the original Amount
859 self._df.at[k, "Amount"] = round(this_row.Amount + this_row["Sales Tax"], 2)
860 self.process_report += f"[{this_step_id}] UPDATED: {this_row.Date} | {this_row.Description} | \
861 {this_row.Source} -> {this_row.Target} | \
862 ${this_row.Amount} -> ${self._df.at[k, 'Amount']}\n"
864 # Handle tips by creating new Tips child target from this original target row & add tip back
865 # to original row amount
866 # Note: if store or tag processing is being done, this may already be one removed from the original category
867 if not is_empty(this_row["Tips"], True):
868 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \
869 {this_row.Target} -> 'Tips' | ${this_row['Tips']}\n"
870 # Sales tax computation may have changed from this_row.Amount value
871 orig_amount = self._df.at[k, "Amount"]
872 self.add_row(DataRow.create(
873 date=this_row.Date,
874 category_name="Tips",
875 amount=this_row["Tips"],
876 source=this_row.Target,
877 target="Tips",
878 description=this_row.Description,
879 tags=this_row.Tags,
880 comment='Synthetic row for tips',
881 distribution=this_row.Distribution,
882 classification=self._app_settings.tip_classification
883 ), True)
884 # For this to behave as expected, it needs to add the tips amount back to the original Amount
885 self._df.at[k, "Amount"] = round(orig_amount + this_row["Tips"], 2)
886 self.process_report += f"[{this_step_id}] UPDATED: {this_row.Date} | {this_row.Description} | \
887 {this_row.Source} -> {this_row.Target} | ${orig_amount} -> ${self._df.at[k, 'Amount']}\n"
889 # Traverse DAG from row source back to Income, adding a synthetic row for each edge it finds.
890 # NOTE: if using s-tags, will go back to the tag instead of Income
891 # TODO: explore cases where we are multiple synthetic rows deep, or a synthetic row has been added that
892 # flows INTO income, or orphan flows (eg deductions) include synthetic nodes.
893 s_tag_d1 = False
894 if self._df.at[k, "Type"] == 'deduction': # deduction types skip DAG processing for now
895 self.process_report += f"[{this_step_id}] SKIPPING DAG traversal, since this is a deduction type.\n"
896 if self._app_settings.verbose:
897 logger.info(f"Skipping DAG checks as this was a deductions type entry: {this_row.Date} | \
898 {this_row.Description} | {this_row.Source} -> {this_row.Target} | ${this_row.Amount}")
899 continue
901 # traverse graph
902 if tag_type == 's-tag' and (this_row.Source == tag_matches[0] or this_row.Target == tag_matches[0]):
903 # First order edge and is s-tag - skip processing
904 s_tag_d1 = True
905 elif "Income" in nx.ancestors(self._labels_obj._digraph, this_row.Target):
906 # Must be an expense category:
907 start_node = "Income"
908 # This breaks if there are multiple paths to the end node, eg when using tags/stores flows
909 end_node = this_row.Source
910 else:
911 # Must be an income category
912 start_node = this_row.Source
913 end_node = "Income"
915 if not s_tag_d1:
916 self.process_report += f"[{this_step_id}] Starting to traverse DAG for {start_node} -> {end_node}\n"
917 if self._app_settings.verbose:
918 logger.info(f"Traversing graph for {start_node}:{end_node}...")
920 pgroups = [i for i in nx.all_simple_edge_paths(self._labels_obj._digraph, start_node, end_node)]
921 if is_recurring:
922 new_groups = [[]]
923 for g in pgroups[0]:
924 if g[0] == "Income":
925 if self._app_settings.verbose:
926 logger.info(f"--- Injecting Income:Recurring and Recurring:{g[1]} nodes ----")
927 new_groups[0].append(("Income", "Recurring"))
928 new_groups[0].append(("Recurring", g[1]))
929 else:
930 new_groups[0].append(g)
931 pgroups = new_groups
933 self.process_report += f"[{this_step_id}] DAG search yielded groups: {pgroups}\n"
934 if self._app_settings.verbose:
935 logger.info(f"Searched DAG for {start_node} -> {end_node} and got group: {pgroups}...")
936 if len(pgroups) != 1:
937 # Potentially an error condition. Maybe raise an exception
938 logger.info(f"Edge paths search did not yield the expected number of groups! {pgroups}")
939 for pgroup in pgroups:
940 # Each edge path will be an array of tuples, like [(source1,target1), (source2,target2), ...]
941 # Iterate over the paths (ignoring the one that matches the original entry) and create synthetic
942 # entries for each one.
943 for pitem in pgroup:
944 # Don't need to process the pair we already have
945 if pitem == (this_row.Source, this_row.Target):
946 continue
947 syn_source, syn_target = pitem
948 if syn_source == "Income" and tag_type == 's-tag':
949 # Since this is an s-tag flow, the root of the flow should be the tag
950 syn_source = tag_matches[0]
951 if syn_target == "Income" and tag_type == 's-tag':
952 syn_target = tag_matches[0] # TODO: verify that this case is handled as expected
953 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \
954 {syn_source} -> {syn_target} | ${self._df.at[k, 'Amount']}\n"
955 if self._app_settings.verbose:
956 logger.info(f"Adding synthetic entry: {this_row.Date} | {this_row.Description} | \
957 {syn_source} -> {syn_target} | ${self._df.at[k, 'Amount']}")
958 self.add_row(DataRow.create(
959 date=this_row.Date,
960 category_name=this_row.Category,
961 amount=self._df.at[k, "Amount"],
962 source=syn_source,
963 target=syn_target,
964 description=this_row.Description,
965 tags=this_row.Tags,
966 comment='Synthetic row',
967 distribution=this_row.Distribution,
968 classification="Uncategorized"
969 ), True)
971 self.process_report += f"[{this_step_id}] DONE processing.\n{'-' * 40}\n"
973 def collapse(self):
974 self.process_report += f"\n{'-' * 40}\nStepping into Transactions.collapse()\n{'-' * 40}\n"
975 if self._app_settings.verbose:
976 logger.info("Aggregating all source-target pairs")
977 # Collapse all the pairs down for cleaner flows
978 grouped_df = self._df.groupby(['Source', 'Target']).agg({'Amount': 'sum'})
979 # Resetting an index appears to just create a new one unless the drop argument is passed in,
980 # but that's fine in this case.
981 grouped_df.reset_index(inplace=True)
982 self._grouped_df = grouped_df # TODO: Review grouped_df vs _df
983 if self._app_settings.verbose:
984 logger.info(f"Collapsed {len(self._df)} transactions down to {len(self._grouped_df)}")
986 def create_surplus_deficit_flows(self):
987 self.process_report += f"\n{'-' * 40}\nStepping into Transactions.create_surplus_deficit_flows()\n{'-' * 40}\n"
988 if self.surplus_deficit_processed:
989 logger.info("Surplus/deficit flows have already been processed!")
990 return
991 self.surplus_deficit_processed = True
992 if self._app_settings.verbose:
993 logger.info("Computing source/deficit flows")
994 # Check for s-tag nodes
995 # Returns a dict like: {'a': 's-tag', 'd': 's-tag, 'c': 'tag', ...}
996 node_types = nx.get_node_attributes(self._labels_obj._digraph, 'type')
997 s_nodes = [i for i in node_types if node_types[i] == 's-tag'] # A list of s-nodes
998 s_nodes.append("Income")
1000 for s_node in s_nodes:
1001 # Create synthetic entries showing difference between flows into and out of Income as either a
1002 # surplus or deficit.
1003 # Date should always be within the current filter range, if used.
1004 # TODO: review for race conditions with feed_in arg and computing surpluses
1005 total_income = self._df.loc[self._df["Target"] == s_node].agg({'Amount': 'sum'})["Amount"]
1006 total_expenses = self._df.loc[self._df["Source"] == s_node].agg({'Amount': 'sum'})["Amount"]
1007 if total_income > total_expenses:
1008 surplus = total_income - total_expenses
1009 if s_node != "Income" and self._app_settings.feed_in:
1010 # Feeding s-tag surplus back to Income
1011 self.process_report += f"ADDED: {self.default_date} | '{s_node} Surplus' | {s_node} -> 'Income' | \
1012 ${surplus}\n"
1013 self.add_row(DataRow.create(
1014 date=self.default_date,
1015 category_name=f"{s_node} Surplus",
1016 amount=surplus,
1017 source=s_node,
1018 target=f"Income",
1019 comment=f"Synthetic {s_node} surplus entry"
1020 ), True)
1021 else:
1022 # Keeping s-tag surplus(es) as distinct flow
1023 self.process_report += f"ADDED: {self.default_date} | '{s_node} Surplus' | {s_node} -> \
1024 '{s_node} Surplus' | ${surplus}\n"
1025 self.add_row(DataRow.create(
1026 date=self.default_date,
1027 category_name=f"{s_node} Surplus",
1028 amount=surplus,
1029 source=s_node,
1030 target=f"{s_node} Surplus",
1031 comment=f"Synthetic {s_node} surplus entry"
1032 ), True)
1034 # Copy 'Surplus' color information to new entry
1035 this_label = self._labels_obj._lookup.get("Surplus")
1036 if this_label:
1037 this_label["source"] = {s_node}
1038 this_label["target"] = f'{s_node} Surplus'
1039 self._labels_obj._lookup[f'{s_node} Surplus'] = this_label
1041 elif total_expenses > total_income:
1042 # TODO: If using feed_in arg, copy Income surplus (if any) to s-tag??
1043 # (or, more accurately, s-tag deficit from income)
1044 deficit = total_expenses - total_income
1045 self.process_report += f"ADDED: {self.default_date} | '{s_node} Deficit' | '{s_node} Deficit' -> \
1046 {s_node} | ${deficit}\n"
1047 self.add_row(DataRow.create(
1048 date=self.default_date,
1049 category_name=f"{s_node} Deficit",
1050 amount=deficit,
1051 source=f"{s_node} Deficit",
1052 target=s_node,
1053 comment=f"Synthetic {s_node} deficit entry"
1054 ), True)
1055 # Copy 'Deficit' color information to new entry
1056 this_label = self._labels_obj._lookup.get("Deficit")
1057 if this_label:
1058 this_label["source"] = {s_node}
1059 this_label["target"] = f'{s_node} Deficit'
1060 self._labels_obj._lookup[f'{s_node} Deficit'] = this_label
1062 def filter_dates(self, start_date, end_date):
1063 self.process_report += f"\n{'-' * 40}\nStepping into Transactions.filter_dates({start_date}, \
1064 {end_date})\n{'-' * 40}\n"
1065 # All times should be pandas._libs.tslibs.timestamps.Timestamp
1066 # Will discard data outside supplied daterange... TODO: preserve original df??
1068 if self._app_settings.verbose:
1069 logger.info(f"Filtering data from {start_date} .. {end_date}...")
1071 if start_date is None and end_date is None:
1072 return # no op.
1074 # Coerce to timestamp
1075 if type(start_date) is not timestamps.Timestamp:
1076 start_date = pd.to_datetime(start_date) # pd.to_datetime(None) returns None
1077 if type(end_date) is not timestamps.Timestamp:
1078 end_date = pd.to_datetime(end_date)
1080 if end_date: # Set up a default date guaranteed to be within the filter range.
1081 self.default_date = end_date - datetime.timedelta(days=1) # One day before our end date
1082 elif start_date:
1083 self.default_date = start_date + datetime.timedelta(days=1) # One day ater our start date
1085 # Start or end date is unbounded, set it to the earliest (or latest) date in the fetched data.
1086 if not start_date:
1087 start_date = self.earliest_date
1088 if not end_date:
1089 end_date = self.latest_date
1091 if start_date > end_date:
1092 raise Exception(f"Start date ({start_date.date()}) is after end date ({end_date.date()})!")
1094 self.process_report += f">> final dates to use for filtering: {start_date} - {end_date} <<\n{'-' * 60}\n"
1096 dt_mask = (self._df["Date"] >= start_date) & (self._df["Date"] <= end_date) # Boolean sum of the two masks
1097 self._df = self._df[dt_mask]
1098 self._df = self._df.reset_index(drop=True)
1099 if len(self._df) == 0:
1100 raise Exception(f"Supplied date range ({start_date.date()} - {end_date.date()}) does not contain \
1101 any transactions!")
1102 self.earliest_date = self._df["Date"].sort_values().iloc[0]
1103 self.latest_date = self._df["Date"].sort_values().iloc[len(self._df) - 1]
1104 self.default_date = self.latest_date - datetime.timedelta(days=1)
1105 self.process_report += f"DONE filtering dates. Earliest date is: {self.earliest_date}, latest date is: \
1106 {self.latest_date}, default date is: {self.default_date}, \
1107 and the dataset now contains {len(self._df)} transactions.\n{'-' * 60}\n"
1109 def explode_tags(self):
1110 # Split each tag out to its own column, with true/false value for a given row
1111 # Note: currently unused but possible future functionality around tags.
1112 result = {}
1113 unique_df_tags = [val.strip() for sublist in self._df["Tags"].str.split(",").tolist() for val in sublist]
1114 unique_df_tags = list(set(unique_df_tags))
1115 if '' in unique_df_tags:
1116 unique_df_tags.remove('')
1117 for tag in unique_df_tags:
1118 # TODO: fix edge case if you had a tag 'foo' and another tag 'foot' where 'foot' is marked as having 'foo'
1119 self._df[tag] = self._df["Tags"].str.contains(tag).to_list()
1121 def distribute_amounts(self):
1122 # Distribute a payment over a time period
1123 # Note: this creates synthetic transactions in the future, which will affect latest date.
1124 # TODO: verify that this will fall within the current date filters, if being used.
1125 # current method is to just call this before filter_dates() would need to refactor to be more robust
1126 # TODO: handle negative values to distribute backwards (as in, a charge that represents past costs)
1127 if self.amount_distributions:
1128 logger.info("Amounts have already been distributed!")
1129 return
1130 self.amount_distributions = True
1131 self.process_report += f"{'-' * 60}\nRunning Transactions.distribute_amounts()\n{'-' * 60}\n"
1132 df_idx = len(self._df)
1133 # Loop through dataset looking for distributed rows
1134 for k, v in enumerate(self._df["Distribution"]):
1135 if not is_empty(v, True):
1136 reverse_distribution = False
1137 v = int(v)
1138 if v < 0:
1139 # Negative distribution
1140 reverse_distribution = True
1141 v = abs(v)
1142 # A tuple with (Amount, Sales Tax)
1143 original_amount = float(self._df.at[k, "Amount"]), self._df.at[k, "Sales Tax"]
1144 original_date = self._df.at[k, "Date"]
1145 dist_amount = original_amount[0] / int(v) # Calculate total amount / distributions
1146 dist_sales_tax = 0
1147 dists = []
1148 if not is_empty(original_amount[1], True):
1149 dist_sales_tax = float(original_amount[1]) / int(v) # Calculate sales tax amount / distributions
1150 # Reset original transaction to distirbution amount
1151 self.process_report += f"UPDATED: {self._df.at[k, 'Date']} | {self._df.at[k, 'Description']} | \
1152 {self._df.at[k, 'Source']} -> {self._df.at[k, 'Target']} | ${dist_amount} (+ ${dist_sales_tax})\n"
1153 self._df.at[k, "Amount"] = dist_amount
1154 self._df.at[k, "Sales Tax"] = dist_sales_tax
1155 # Create Synthetic entries for distributed transactions
1156 counter = v
1157 while counter > 1: # Don't need to do the first one, as we changed it in place
1158 if reverse_distribution:
1159 # We assume that the distrubtion value is in months.
1160 new_date = original_date - datetime.timedelta(weeks=(counter - 1) * 4.33)
1161 else:
1162 # We assume that the distrubtion value is in months.
1163 new_date = original_date + datetime.timedelta(weeks=(counter - 1) * 4.33)
1164 self.process_report += f"ADDED: {new_date} | {self._df.at[k, 'Description']} | \
1165 {self._df.at[k, 'Source']} -> {self._df.at[k, 'Target']} | \
1166 ${dist_amount} (+ ${dist_sales_tax})\n"
1167 # create(date, category_name, source, target, amount, description="", sales_tax=0, tips=0,
1168 # comment="", tags="", row_type="", distribution=0):
1169 # Assuming no tips on distributed transactions for now
1170 dists.append(DataRow.create(
1171 new_date,
1172 self._df.at[k, "Category"],
1173 self._df.at[k, "Source"],
1174 self._df.at[k, "Target"],
1175 dist_amount,
1176 self._df.at[k, "Description"],
1177 dist_sales_tax,
1178 0,
1179 f"Synthetic transaction from original transaction on {original_date} of {original_amount[0]} \
1180 (+{original_amount[1]})",
1181 self._df.at[k, "Tags"],
1182 self._df.at[k, "Type"],
1183 0,
1184 self._df.at[k, "Classification"]
1185 ))
1186 counter -= 1
1187 for row in dists:
1188 self._df.loc[df_idx] = row # Add check_data_row here?
1189 df_idx += 1
1190 self.latest_date = self._df["Date"].sort_values()[len(self._df) - 1] # Reset latest date value
1192 def update_title(self):
1193 # TODO: add flag information to title
1194 # TODO: Move to sankeyutils class
1195 self.title = f"{self._app_settings.base_title} ({self.earliest_date.month}/{self.earliest_date.day}/\
1196 {self.earliest_date.year} - {self.latest_date.month}/{self.latest_date.day}/{self.latest_date.year}) \
1197 [{(self.latest_date - self.earliest_date).days} days]"
1198 if self._app_settings.distribute_amounts:
1199 self.title += "<br> Multi-month transactions are being distributed"
1200 if self._app_settings.exclude_tags:
1201 self.title += f"<br> Tags being excluded: {', '.join(self._app_settings.exclude_tags)}"
1202 if self._app_settings.tags:
1203 self.title += f"<br> Tags being used: {', '.join(self._app_settings.tags)}"
1204 if self._app_settings.recurring:
1205 self.title += f"<br> Recurring transactions are being split out"
1208class TransactionRow:
1209 """
1210 WIP: not in use at this time.
1211 """
1212 def __init__(self, df, key):
1213 self.key = key
1214 self.data = {}
1215 for col in DataRow.fields:
1216 val = df.at[key, col]
1217 if is_null(val) and DataRow.fields[col]["required"]:
1218 raise Exception(f"Required column {col} was null for {repr(df[key])}")
1219 self.data[col] = val
1221 class TransactionDate:
1222 def __init__(self, value):
1223 self._required = True
1224 self._nullable = True
1225 self._datatype = timestamps.Timestamp
1226 self._coerce_type = True # TODO: switch to a function that coerces. Or just use getters and setters.
1227 self._comment = ""
1228 self._value = pd.to_datetime(value)
1230 @property
1231 def value(self):
1232 return self._value
1234 @value.setter
1235 def value(self, val):
1236 self._value = pd.to_datetime(val)
1239class DataRow:
1240 # static class - just a container for some related methods around single rows of expense data.
1241 # The columns we expect to see in the data
1242 fields = {
1243 "Date": {
1244 "required": True,
1245 "nullable": False,
1246 "type": timestamps.Timestamp,
1247 "force_type": False,
1248 "comment": ""
1249 },
1250 "Category": {
1251 "required": True,
1252 "nullable": False,
1253 "type": str,
1254 "force_type": False,
1255 "comment": ""
1256 },
1257 "Description": {
1258 "required": False,
1259 "nullable": True,
1260 "type": str,
1261 "force_type": False,
1262 "comment": ""
1263 },
1264 "Tags": {
1265 "required": False,
1266 "nullable": True,
1267 "type": str,
1268 "force_type": False,
1269 "comment": ""
1270 },
1271 "Comments": {
1272 "required": False,
1273 "nullable": True,
1274 "type": str,
1275 "force_type": False,
1276 "comment": ""
1277 },
1278 "Source": {
1279 "required": False,
1280 "nullable": True,
1281 "type": str,
1282 "force_type": False,
1283 "comment": ""
1284 },
1285 "Target": {
1286 "required": False,
1287 "nullable": True,
1288 "type": str,
1289 "force_type": False,
1290 "comment": ""
1291 },
1292 "Type": {
1293 "required": False,
1294 "nullable": True,
1295 "type": str,
1296 "allowed_values": ["computed", "tag", ""],
1297 "force_type": False,
1298 "comment": ""
1299 },
1300 "Distribution": {
1301 "required": False,
1302 "nullable": True,
1303 "type": int,
1304 "force_type": True,
1305 "comment": "Value in whole months to distribute the row amount over"
1306 },
1307 "Amount": {
1308 "required": True,
1309 "nullable": False,
1310 "type": float,
1311 "force_type": True,
1312 "comment": ""
1313 },
1314 "Sales Tax": {
1315 "required": False,
1316 "nullable": True,
1317 "type": float,
1318 "force_type": True,
1319 "comment": ""
1320 },
1321 "Tips": {
1322 "required": False,
1323 "nullable": True,
1324 "type": float,
1325 "force_type": True,
1326 "comment": ""
1327 }
1328 }
1330 @staticmethod
1331 def validate(drow, header_only=False, include_classifications=False):
1332 # Validate that data rows are correct
1333 this_fields = DataRow.fields
1334 if include_classifications:
1335 # Add classification to the fields
1336 this_fields["Classification"] = {
1337 "required": True,
1338 "nullable": False,
1339 "type": str,
1340 "force_type": False,
1341 "comment": ""
1342 }
1343 if len(drow) != len(DataRow.fields):
1344 raise Exception(f"Data rows should contain {len(DataRow.fields)} elements")
1345 if header_only:
1346 if drow != list(DataRow.fields.keys()):
1347 return False, f"Data rows need to be in the form: {list(DataRow.fields.keys())}"
1348 return True, None
1349 vkeys = list(DataRow.fields.keys())
1350 counter = 0
1351 while counter < len(drow):
1352 this_validator = DataRow.fields[vkeys[counter]]
1353 this_value = drow[counter]
1354 counter += 1
1355 if is_null(this_value): # Also check for length = 0?
1356 if this_validator["nullable"]:
1357 pass
1358 else:
1359 raise Exception(f"Non-nullable field {vkeys[counter - 1]} was nulled in {drow}")
1360 else:
1361 if type(this_value) is not this_validator["type"]:
1362 if this_validator["force_type"]:
1363 try:
1364 this_value = this_validator["type"](this_value)
1365 except ValueError:
1366 raise Exception(f"Could not coerce \'{this_value}\' at idx {counter - 1} to \
1367 {this_validator['type']} for this row: {drow}")
1368 if type(this_value) is not this_validator["type"]:
1369 raise Exception(f"Invalid type at idx {counter - 1} for {this_value} in {drow}")
1370 if this_validator.get("allowed_values") and this_value not in this_validator["allowed_values"]:
1371 raise Exception(f"Non-allowed value of {this_value} in {drow}")
1372 return drow
1374 @staticmethod
1375 def create(
1376 date,
1377 category_name,
1378 source, target,
1379 amount,
1380 description="",
1381 sales_tax=0,
1382 tips=0,
1383 comment="",
1384 tags="",
1385 row_type="",
1386 distribution=0,
1387 classification="Uncategorized"):
1388 if is_null(amount):
1389 amount = 0
1390 else:
1391 try:
1392 amount = float(amount)
1393 except ValueError:
1394 amount = 0
1395 if is_null(tips):
1396 tips = 0
1397 else:
1398 try:
1399 tips = float(tips)
1400 except ValueError:
1401 tips = 0
1402 if is_null(distribution):
1403 distribution = 0
1404 else:
1405 try:
1406 distribution = int(distribution)
1407 except ValueError:
1408 distribution = 0
1409 if is_null(sales_tax):
1410 sales_tax = 0
1411 else:
1412 try:
1413 sales_tax = float(sales_tax)
1414 except ValueError:
1415 sales_tax = 0
1416 return DataRow.validate([
1417 date,
1418 category_name,
1419 description,
1420 tags,
1421 comment,
1422 source,
1423 target,
1424 row_type,
1425 distribution,
1426 amount,
1427 sales_tax,
1428 tips,
1429 classification], False, True)
1431 @staticmethod
1432 def tag_matches(row_tags, search_tags):
1433 # Tag logic
1434 this_exploded_tags = None
1435 this_tag_matches = None
1436 this_store_matches = False
1437 if search_tags and not is_empty(search_tags) and not is_empty(row_tags):
1438 this_exploded_tags = [i.strip() for i in row_tags.split(',')] # TODO: Do this case insensitively?
1439 if this_exploded_tags:
1440 return [i for i in set(search_tags).intersection(set(this_exploded_tags))]
1441 return None
1443# WIP: Move utils into separate classes
1446class AuditUtils:
1447 pass
1450class DiagramUtils:
1451 """
1452 Static class, holds utility functions used for all diagram types.
1453 """
1454 pass
1457class SankeyUtils(DiagramUtils):
1458 """
1459 Static class, holds utility functions used for generating Sankey diagrams.
1460 """
1461 @staticmethod
1462 def update_title(transaction_obj):
1463 # Mutate passed object or return a value?
1464 # TODO: add flag information to title
1465 transaction_obj.title = f"{transaction_obj._app_settings.base_title} ({transaction_obj.earliest_date.month}/\
1466 {transaction_obj.earliest_date.day}/{transaction_obj.earliest_date.year} - \
1467 {transaction_obj.latest_date.month}/{transaction_obj.latest_date.day}/{transaction_obj.latest_date.year})"
1468 if transaction_obj._app_settings.distribute_amounts:
1469 transaction_obj.title += "<br> Multi-month transactions are being distributed"
1470 if transaction_obj._app_settings.exclude_tags:
1471 transaction_obj.title += f"<br> Tags being excluded: \
1472 {', '.join(transaction_obj._app_settings.exclude_tags)}"
1473 if transaction_obj._app_settings.tags:
1474 transaction_obj.title += f"<br> Tags being used: {', '.join(transaction_obj._app_settings.tags)}"
1475 if transaction_obj._app_settings.recurring:
1476 transaction_obj.title += f"<br> Recurring transactions are being split out"
1478 @staticmethod
1479 def build_dag(transaction_obj: Transactions):
1480 """
1481 Create directed acyclic graph of all transactions
1482 """
1483 if transaction_obj._app_settings.recurring:
1484 if transaction_obj._app_settings.verbose:
1485 logger.info("Recurring transactions to be split out") # TODO: precludes tag:recurring handling.
1486 # Add edge from Income to Recurring
1487 transaction_obj._labels_obj._digraph.add_edge("Income", "Recurring")
1490class LineUtils(DiagramUtils):
1491 """
1492 Static class, holds utility functions used for generating line charts.
1493 """
1494 pass
1496# Define some helper functions =======================================================================================
1499def is_null(obj):
1500 # Just using numpy.isnan() will throw errors for types that cannot be coerced to float64.
1501 # Could also use a try...catch
1502 # use as a general purpose null/none/NaN catch
1503 if obj is None:
1504 return True
1505 if type(obj) is float64 and isnan(obj):
1506 return True
1507 if type(obj) is nattype.NaTType:
1508 return True
1509 obj_as_str = None
1510 try:
1511 obj_as_str = str(obj)
1512 except Exception as e:
1513 pass
1514 if obj_as_str in ["None", "none", "NaN", "nan", "Null", "null"]:
1515 return True
1516 return False
1519def is_empty(obj, nonzero=False):
1520 # Check for null, nan, none, etc as well as empty string. Optionally check for zero values.
1521 # Swallow errors casting to values
1522 if is_null(obj):
1523 return True
1524 obj_as_str = None
1525 try:
1526 obj_as_str = str(obj)
1527 except Exception as e:
1528 pass
1529 if obj_as_str == "":
1530 return True
1531 if nonzero:
1532 obj_as_float = None # int() truncates values like 0.25 to 0
1533 try:
1534 obj_as_float = float(obj)
1535 except Exception as e:
1536 pass
1537 if obj_as_float == 0:
1538 return True
1539 return False
1542def df_date_filter(df, start_date, end_date):
1543 # Filter a dataframe by date
1544 if is_empty(start_date) and is_empty(end_date):
1545 return df
1546 if is_empty(start_date):
1547 df = df[df["Date"] <= end_date]
1548 elif is_empty(end_date):
1549 df = df[df["Date"] >= start_date]
1550 else:
1551 df = df[(df["Date"] >= start_date) & (df["Date"] <= end_date)]
1552 df.reset_index(drop=True)
1553 if len(df) == 0:
1554 # TODO: this will error if start_date or end_date are not dates
1555 raise Exception(f"Supplied date range ({start_date.date()} - {end_date.date()}) does not contain any \
1556 transactions!")
1557 earliest_date = df["Date"].sort_values().iloc[0]
1558 latest_date = df["Date"].sort_values().iloc[len(df) - 1]
1559 return df
1562def save_report(report_data, basename):
1563 dtnow = pd.Timestamp.today()
1564 fname = f"{basename}-{dtnow}.txt"
1565 if path.isfile(fname):
1566 raise Exception(f"File named {fname} already exists!")
1567 with open(fname, 'wt') as f:
1568 f.write(report_data)
1571def validate_date_string(input, allow_empty=False):
1572 # Pandas will accept YYYY-MM-DD or MM/DD/YYYY
1573 if is_empty(input, True) and allow_empty:
1574 return True
1575 match_obj = re.match(r"^([\d]{4})-([\d]{1,2})-([\d]{1,2})$", input)
1576 if match_obj:
1577 match_year = int(match_obj.groups()[0])
1578 match_month = int(match_obj.groups()[1])
1579 match_day = int(match_obj.groups()[2])
1580 else:
1581 match_obj = re.match(r"^([\d]{1,2})/([\d]{1,2})/([\d]{4})$", input)
1582 if match_obj:
1583 match_year = int(match_obj.groups()[2])
1584 match_month = int(match_obj.groups()[0])
1585 match_day = int(match_obj.groups()[1])
1586 if match_obj:
1587 if not (1900 < match_year < 2100): # Update if doing historical work!
1588 logger.warn(f"Supplied year doesn\t look right: {match_year}")
1589 return False
1590 if not (1 <= match_month <= 12):
1591 logger.warn(f"Invalid month value: {match_month}")
1592 return False
1593 if not (1 <= match_day <= 31):
1594 logger.warn(f"Invalid day value: {match_day}")
1595 return False
1596 return True
1597 return False
1600def func_Convert_Gsheet_dates(g_timestamp, default_date):
1601 # Note: currently unused
1602 if g_timestamp:
1603 try:
1604 g_timestamp_as_int = int(g_timestamp)
1605 # Likely means we got an unformatted timestamp from gsheets.
1606 # See also: https://developers.google.com/sheets/api/guides/formats for information about
1607 # Google sheets timestamp format
1608 # See: http://www.cpearson.com/excel/datetime.htm for why Dec 30, 1899
1609 return pd.to_datetime(g_timestamp_as_int * 86400 * 1000, unit='ms', origin="1899-12-30")
1610 except ValueError:
1611 # We'll assume this means we got a formatted date string
1612 return pd.to_datetime(g_timestamp)
1613 else:
1614 if not default_date:
1615 return pd.Timestamp.today()
1616 return pd.to_datetime(default_date) # default_date
1619def fetch_data(app_settings_obj): # source_spreadsheet, source_worksheet, csv_src_target, service_account_credentials):
1620 # app_settings_obj.data_source,app_settings_obj.data_sheet,app_settings_obj.labels_source,app_settings_obj.g_creds
1621 # WIP: Handle source data in mutiple sheets, eg. 1 sheet per year...
1623 """
1624 self.data_source = args.source # A csv file or a google Sheets document containing transactions data.
1625 (In the case of the latter, a sheet name must be provided as well)
1626 self.data_sheet = args.sheet or "Transactions_*" # The name (or prefix plus wildcard) of the sheet containing
1627 the transactions data (if data_source is a google Sheets document
1628 self._labels_source = args.srcmap or "Sources-Targets" #A csv file or sheet name containing sources and targets
1629 """
1630 def data_source_router(
1631 filename: str,
1632 file_kind: str,
1633 gcreds,
1634 sheetname: Union[str, None] = None,
1635 gcreds_obj=None,
1636 gsheets_obj=None):
1637 """
1638 Look for data sources either locally or in Google Sheets. Handle wildcards expressions for multiple sheets.
1639 Return a list of filenames.
1640 :param filename: A filename or wildcard expression, or None (Note: only csv files will use the wildcard
1641 expression for this value)
1642 :param file_kind: "sources-targets" or "transactions"
1643 :param gcreds: Google credentials object
1644 :param sheetname: A sheet name or wildcard expression, or None (only applicable to Google Sheets)
1645 :param gc: Google client object (optional)
1647 Returns: {
1648 "filename" -> list of strings,
1649 "sheetname" -> list of strings,
1650 "filetype" -> ["csv", "gsheet"],
1651 "file_kind" -> ["sources-targets","transactions"],
1652 "gcreds_obj" -> Google credentials object",
1653 "gsheets_obj" -> Google sheets object
1654 }
1655 """
1656 file_kind = file_kind.lower()
1657 if file_kind not in ["sources-targets", "transactions"]:
1658 raise Exception(f"Invalid file kind: {file_kind}")
1659 if filename is None:
1660 filename = input(f"Enter location for {file_kind} data: ")
1661 return data_source_router(filename, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj)
1662 if filename.endswith(".csv"):
1663 if path.isfile(filename): # Switch to pathlib?
1664 return {
1665 "filename": [filename],
1666 "sheetname": [None],
1667 "filetype": "csv",
1668 "file_kind": file_kind,
1669 "gcreds_obj": gcreds_obj,
1670 "gsheets_obj": gsheets_obj}
1671 logger.warn(f"File not found: {filename}")
1672 return data_source_router(None, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj)
1673 if filename.endswith("*"):
1674 # Wildcards on filenames only supported for csvs, Gsheets would use sheetnames for wildcard.
1675 fileparts = filename.split("/")
1676 filepattern = f"{fileparts[-1]}.csv"
1677 if len(fileparts[0]) == 0:
1678 # This must be an absolute path
1679 this_dir = "/" + "/".join(fileparts[1:-1])
1680 else:
1681 # Possibly a relative path
1682 if len(fileparts) > 1:
1683 this_dir = "./" + "/".join(fileparts[:-1])
1684 else:
1685 this_dir = "."
1686 dir_obj = Path(this_dir)
1687 if dir_obj.is_dir():
1688 files = list(dir_obj.glob(filepattern))
1689 if len(files) > 0:
1690 return {"filename": [str(f) for f in files],
1691 "sheetname": [None],
1692 "filetype": "csv",
1693 "file_kind": file_kind,
1694 "gcreds_obj": gcreds_obj,
1695 "gsheets_obj": gsheets_obj}
1696 logger.warn("No files found at: {filename}")
1697 return data_source_router(None, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj)
1698 # If we've gotten this far, we must be looking for a Google Sheets file (late binding of the
1699 # gc object to reduce calls to authorize)
1700 if not gcreds_obj:
1701 gcreds_obj = pygsheets.authorize(service_file=gcreds) # trying to avoid calling this multiple times
1702 if filename not in gcreds_obj.spreadsheet_titles():
1703 logger.warn(f"Spreadsheet not found: {filename}")
1704 return data_source_router(None, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj)
1705 gsheets_obj = gcreds_obj.open(filename) # TODO: error handling.
1706 if not sheetname:
1707 sheetname = input(f"Enter worksheet title for spreadsheet {filename}: ")
1708 return data_source_router(filename, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj)
1709 gsheet_titles = [i.title for i in gsheets_obj.worksheets()]
1710 if sheetname.endswith("*"):
1711 # Wildcard handling for sheetnames
1712 results = []
1713 for sheet in gsheet_titles:
1714 if sheet.startswith(sheetname[:-1]):
1715 results.append(sheet)
1716 if len(results) > 0:
1717 return {
1718 "filename": [filename],
1719 "sheetname": results,
1720 "filetype": "gsheet",
1721 "file_kind": file_kind,
1722 "gcreds_obj": gcreds_obj,
1723 "gsheets_obj": gsheets_obj}
1724 logger.warn(f"No spreadsheets found matching pattern: \"{sheetname}\" in Google Sheets: \"{filename}\"")
1725 return data_source_router(filename, file_kind, gcreds, None, gcreds_obj, gsheets_obj)
1726 if sheetname in gsheet_titles:
1727 return {"filename": [filename],
1728 "sheetname": [sheetname],
1729 "filetype": "gsheet",
1730 "file_kind": file_kind,
1731 "gcreds_obj": gcreds_obj,
1732 "gsheets_obj": gsheets_obj}
1733 logger.warn(f"Spreadsheet \"{sheetname}\" not found in Google Sheets: \"{filename}\"")
1734 return data_source_router(filename, file_kind, gcreds, None, gcreds_obj, gsheets_obj)
1736 try:
1737 # Get sources-targets file location data
1738 # See sample_data/expenses.csv and labels.csv for examples of data format.
1739 # TODO: validation & error handling
1740 # TODO: there should only ever be one sources-targets file, so data_source_router should handle that
1741 src_target = None
1742 if app_settings_obj.labels_source.endswith(".csv"):
1743 src_target_file = data_source_router(app_settings_obj.labels_source,
1744 "sources-targets",
1745 app_settings_obj.g_creds, None, None, None)
1746 else:
1747 src_target_file = data_source_router(app_settings_obj.data_source,
1748 "sources-targets",
1749 app_settings_obj.g_creds,
1750 app_settings_obj.labels_source, None, None)
1751 if src_target_file["filetype"] == "csv":
1752 # Sources-targets data is returned differently than transactions data, so we need to handle it differently.
1753 with open(src_target_file["filename"][0], 'r') as f:
1754 dr = DictReader(f)
1755 src_target = list(dr)
1756 else:
1757 # sh = src_target_file["greds_obj"].open(src_target_file["filename"][0]) # TODO: error handling
1758 # Fetch source-target info and colors.
1759 src_target = src_target_file["gsheets_obj"].\
1760 worksheet_by_title(src_target_file["sheetname"][0]).get_all_records()
1762 # Get transactions file(s) location data
1763 df = None
1764 transaction_data_file = data_source_router(app_settings_obj.data_source,
1765 "transactions",
1766 app_settings_obj.g_creds,
1767 app_settings_obj.data_sheet, None, None)
1768 if transaction_data_file["filetype"] == "csv":
1769 # open one or multiple csv files and return a pandas dataframe
1770 df = read_csv_as_df(transaction_data_file["filename"])
1771 else:
1772 # Open one or multiple gsheet worksheets and return a pandas dataframe
1773 df = read_gsheet_as_df(transaction_data_file["sheetname"], transaction_data_file["gsheets_obj"])
1775 except Exception as e:
1776 logger.error(f"Unable to open data source: {app_settings_obj.data_source}. \
1777 Please check your names and try again. Error was {e}")
1778 raise SystemExit
1780 # Clean up value formatting
1781 df.reset_index(inplace=True, drop=True)
1782 df = df.transform(normalize_amounts, axis=1)
1784 is_valid = DataRow.validate(df.columns.to_list(), True)
1785 if not is_valid[0]:
1786 raise Exception(f"Source data is not in correct format! Message was: {is_valid[1]}")
1788 return src_target, df
1791def read_csv_as_df(csv_file):
1792 if type(csv_file) is list:
1793 main_csv = csv_file[0]
1794 addl_csvs = csv_file[1:]
1795 else:
1796 main_csv = csv_file
1797 addl_csvs = []
1798 if not main_csv.endswith('.csv') or not path.isfile(main_csv):
1799 raise Exception(f"Supplied CSV file ({main_csv}) was not found or is the wrong format.")
1800 main_df = pd.DataFrame(pd.read_csv(main_csv))
1801 for i in addl_csvs:
1802 if not i.endswith('.csv') or not path.isfile(i):
1803 raise Exception(f"Supplied CSV file ({i}) was not found or is the wrong format.")
1804 # df = df.append(pd.DataFrame(pd.read_csv(i)), ignore_index=True)
1805 add_df = pd.DataFrame(pd.read_csv(i))
1806 main_df = pd.concat([main_df, add_df], axis=0)
1807 return main_df
1810def read_gsheet_as_df(gsheet_file, gsheet_obj):
1811 # .get_as_df(value_render=pygsheets.ValueRenderOption.UNFORMATTED_VALUE)
1812 # # <-- Using unformatted value rounded decimal amounts - not sure why
1813 if type(gsheet_file) is list:
1814 main_sheet = gsheet_file[0]
1815 addl_sheets = gsheet_file[1:]
1816 else:
1817 main_sheet = gsheet_file
1818 addl_sheets = []
1819 main_df = gsheet_obj.worksheet_by_title(main_sheet).get_as_df()
1820 for i in addl_sheets:
1821 add_df = gsheet_obj.worksheet_by_title(i).get_as_df()
1822 main_df = pd.concat([main_df, add_df], axis=0)
1823 return main_df
1826def normalize_amounts(df_row):
1827 for atype in ["Amount", "Sales Tax", "Tips"]:
1828 val = df_row[atype]
1829 if is_empty(val):
1830 continue
1831 try:
1832 val = float(val)
1833 except ValueError:
1834 if '$' in val or ',' in val:
1835 val = float(val.replace('$', '').replace(',', ''))
1836 df_row[atype] = val
1837 return df_row