Coverage for src/sankey_cashflow/sankey_cash.py: 10%

1021 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-04 06:21 +0000

1import pygsheets 

2import pandas as pd 

3import plotly.graph_objects as go 

4import datetime 

5import logging 

6from os import path 

7from numpy import isnan, float64 

8from csv import DictReader 

9import re 

10import networkx as nx 

11from uuid import uuid4 

12from pathlib import Path 

13# Types 

14from pandas._libs.tslibs import timestamps, nattype 

15from typing import List, Dict, Tuple, Union, Optional 

16 

17logger = logging.getLogger(__name__) 

18ConsoleOutputHandler = logging.StreamHandler() 

19# TODO: override this with params (note: root logger level will also need to be changed) 

20ConsoleOutputHandler.setLevel(logging.WARNING) 

21ConsoleOutputHandler.name = "console" 

22logger.addHandler(ConsoleOutputHandler) 

23 

24""" 

25See README.md for usage instructions and examples 

26 

27References: 

28- See: https://lifewithdata.com/2022/08/29/how-to-create-a-sankey-diagram-in-plotly-python/ 

29- https://erikrood.com/Posts/py_gsheets.html 

30- https://github.com/nithinmurali/pygsheets 

31- https://pygsheets.readthedocs.io/en/stable/ 

32- More complex example: https://plotly.com/python/sankey-diagram/ 

33 

34 

35Notes: 

36 - About mutability: Pandas DataFrames are mutable except when they're not... I am treating them as immutable here 

37 and when necessary to make changes using return values and re-assignment. From the Pandas docs: 

38 ## Mutability and copying of data 

39 All pandas data structures are value-mutable (the values they contain can be altered) but not always size-mutable. 

40 The length of a Series cannot be changed, but, for example, columns can be inserted into a DataFrame. 

41 However, the vast majority of methods produce new objects and leave the input data untouched. In general we like 

42 to favor immutability where sensible. 

43- Permissions: Create a service account and download credentials in json format, 

44 (detailed instructions here: https://pygsheets.readthedocs.io/en/stable/authorization.html) 

45 then share spreadsheet to service account user 

46- Tag usage scenarios: 

47 - Override sources/targets on individual tag level using --tags arg. [done] 

48 - Explode all tags... 

49TODO: make tag/store overrides as source with previous category assignments preserved [done, except as targets] 

50TODO: prevent multiple s-tags 

51""" 

52 

53# Define some classes ================================================================================================= 

54 

55 

56class AppSettings: 

57 """ 

58 Application settings and defaults + validation, getters/setters, etc 

59 """ 

60 def __init__(self, args): 

61 self.DEFAULT_START_DATE = pd.to_datetime("10/1/2022") 

62 # A csv file or a google Sheets document containing transactions data. 

63 # (In the case of the latter, a sheet name must be provided as well) 

64 self.data_source = args.source 

65 self.audit_mode = args.audit 

66 # The name (or prefix plus wildcard) of the sheet containing the transactions data 

67 # (if data_source is a google Sheets document 

68 self.data_sheet = args.sheet or "Transactions_*" 

69 # A csv file or sheet name containing sources and targets 

70 self._labels_source = args.srcmap or "Sources-Targets" 

71 self.filter_dates = args.range 

72 self.separate_taxes = args.separate_tax 

73 self.verbose = args.verbose 

74 self._g_creds = args.creds or './google_service_account_key.json' 

75 self.distribute_amounts = args.distributions 

76 self.all_time = args.all_time 

77 self.recurring = args.recurring 

78 self.base_title = "Cashflow" 

79 self._date_filter_start = None 

80 self._date_filter_end = None 

81 self.tags = None 

82 self.feed_in = None 

83 self.exclude_tags = None 

84 self.stores = None 

85 self.tag_override = False 

86 self.hover = "Category" 

87 self.chart_resolution = None 

88 self.sales_tax_classification = "Taxes" 

89 self.tip_classification = "xTips" 

90 self.diagram_type = "sankey" 

91 if args.verbose: 

92 logger.setLevel(logging.DEBUG) 

93 logger.handlers[0].setLevel(logging.DEBUG) # Assumes only one handler 

94 if args.hover: 

95 if args.hover.lower() in ["desc", "stores", "description"]: 

96 self.hover = "Description" 

97 if args.hover == "tags": 

98 logger.warn("Tags in hovertext not yet implemented!") 

99 if args.hover.lower() in ["none", "no", "false"]: 

100 self.hover = None 

101 if args.dtype: 

102 if args.dtype.lower() in ["sankey", "line"]: 

103 self.diagram_type = args.dtype.lower() 

104 else: 

105 logger.warn(f"Unknown diagram type: {args.dtype}") 

106 if args.tags: 

107 self.tags = [i.strip() for i in args.tags.split(',')] 

108 if args.tag_override: 

109 self.tag_override = True 

110 if args.feed_in: 

111 self.feed_in = True 

112 if args.exclude: 

113 self.exclude_tags = [i.strip() for i in args.exclude.split(',')] 

114 if args.stores: 

115 self.stores = [i.strip() for i in args.stores.split(',')] 

116 self.colors = {} # label: [link, node] 

117 if self.tags and self.stores: 

118 raise Exception("Stores and tags visualizations should not be combined!") 

119 self.validate_sources() 

120 

121 @property 

122 def date_filter_start(self): 

123 return self._date_filter_start 

124 

125 @date_filter_start.setter 

126 def date_filter_start(self, val): 

127 if not val or len(val) == 0: 

128 self._date_filter_start = None 

129 else: 

130 self._date_filter_start = pd.to_datetime(val) 

131 

132 @property 

133 def date_filter_end(self): 

134 return self._date_filter_end 

135 

136 @date_filter_end.setter 

137 def date_filter_end(self, val): 

138 if not val or len(val) == 0: 

139 self._date_filter_end = None 

140 else: 

141 self._date_filter_end = pd.to_datetime(val) 

142 

143 @property 

144 def g_creds(self): 

145 return self._g_creds 

146 

147 @g_creds.setter 

148 def g_creds(self, val): 

149 if not val or len(val) == 0 or not path.isfile(val): 

150 raise Exception(f"Credentials file not found: {val}") 

151 self._g_creds = val 

152 

153 @property 

154 def labels_source(self): 

155 return self._labels_source 

156 

157 @labels_source.setter 

158 def labels_source(self, val): 

159 if not val or len(val) == 0 or (val.endswith('.csv') and not path.isfile(val)): 

160 raise Exception(f"Sources-targets file not found: {val}") 

161 self._labels_source = val 

162 

163 def source_data_location(self): 

164 if self.data_source.endswith('.csv'): 

165 return self.data_source 

166 else: 

167 return f"{self.data_source}: {self.data_sheet}" 

168 

169 def validate_sources(self): 

170 # check sources etc 

171 if not self.data_source or len(self.data_source) == 0: 

172 logger.warn("Please enter a valid data source!") 

173 raise Exception("Missing data source.") 

174 

175 if self.data_source.endswith(".csv"): 

176 logger.debug(f"Using csv data source: {self.data_source}") 

177 # Using csv data source 

178 # Note: additional data validation happens when loading this data 

179 if not self.labels_source or not self.labels_source.endswith(".csv"): 

180 raise Exception("A csv sources-targets sheet must be used when using csv source data.") 

181 if not path.isfile(self.data_source): 

182 raise Exception(f"Could not find provided data source: {self.data_source}") 

183 if not path.isfile(self.labels_source): 

184 raise Exception(f"Could not find provided sources-targets source: {self.labels_source}") 

185 else: 

186 # Using Google Sheets data source 

187 # Note: additional access/permissions/data validation happens when fetching and loading this data 

188 logger.debug(f"Using Google Sheets data source: {self.data_source}") 

189 if not self.data_sheet or len(self.data_sheet) == 0: 

190 raise Exception("Missing Google worksheet name.") 

191 if not self.labels_source or len(self.labels_source) == 0: 

192 raise Exception("A sources-targets sheet name must be supplied.") 

193 if not self.g_creds or len(self.g_creds) == 0: 

194 raise Exception("Google service account credentials must be provided.") 

195 if not path.isfile(self.g_creds): 

196 raise Exception(f"Invalid service account credential file provided: {self.g_creds}") 

197 

198 

199class RowLabels: 

200 """ 

201 Contains row label data and methods, used to map data categories to source, target, and color information. 

202 Accepts an array of dicts on init which can come from a Google sheet or csv. 

203 Data looks like: [{'Category Name': 'House', 'Type': 'computed', 'Source': 'Income', 'Target': 'House', 

204 'Link color': 'rgba(153, 187, 255, 0.8)', 'Node color': 'rgba(102, 153, 255, 1)', 'Comments': '', '': ''}] 

205 Also creates a DAG with source target edges based on labels definitions 

206 """ 

207 def __init__(self, labeldata): 

208 self._labeldata = labeldata 

209 self._digraph = nx.DiGraph() 

210 self._digraph.add_node("Income", ntype="income") 

211 self.process_report = f"RowLabel report\n{'=' * 60}\n\n\n" 

212 required_columns = ['Category Name', 'Type', 'Source', 'Target', 'Classification', 'Link color', 'Node color'] 

213 if False in [k in self._labeldata[0].keys() for k in required_columns]: 

214 raise Exception(f"Sources-Targets sheet does not have all required columns! Needed: {required_columns}. \ 

215 Found: {list(self._labeldata[0].keys())}") 

216 self._available_attributes = ['source', 'target', 'classification', 'link_color', 'node_color', 'type'] 

217 # TODO: validation 

218 self._lookup = {} 

219 # Map out data to lookup dict 

220 for i in self._labeldata: 

221 if len(i['Category Name']) == 0: # Skip empties 

222 self.process_report += f"SKIPPING: {i}\n" 

223 continue 

224 if i['Category Name'] in self._lookup: 

225 raise Exception(f"Duplicate label! {i['Category Name']}") 

226 # Add to internal lookup dict 

227 self.process_report += f"ADDING LOOKUP: {i}\n" 

228 self._lookup[i['Category Name']] = { 

229 'source': i.get('Source'), 

230 'target': i.get('Target'), 

231 'classification': i.get('Classification'), 

232 'link_color': i.get('Link color'), 

233 'node_color': i.get('Node color'), 

234 'type': i.get('Type') 

235 } 

236 if i.get("Source") == "DEDUCTIONS": 

237 # These have to be dynamically generated - skip adding to DAG for now (or maybe entirely) 

238 # TODO (maybe): switch to using type and/or DAG attributes 

239 self.process_report += f"SKIPPING DEDUCTION: {i}\n" 

240 continue 

241 if i["Type"] in ['tag', 's-tag']: 

242 # NOT adding tag labels to graph - if needed they will be added on the fly 

243 # TODO: investigate using DAG attributes instead. Test with distant/complex tag targets. 

244 if i["Type"] == 's-tag': 

245 self.process_report += f"Adding NODE for s-tag: {i} to DAG\n" 

246 self._digraph.add_node(i['Category Name'], type='s-tag') 

247 else: 

248 self.process_report += f"NOT Adding tag: {i} to DAG\n" 

249 continue 

250 if i.get('Source') and i.get('Target'): # Add all source/target pairs as edges to DAG 

251 # Add to DAG 

252 if i.get('Type'): 

253 self.process_report += \ 

254 f"ADDING EDGE: {i.get('Source')} -> {i.get('Target')} (ntype={i.get('Type')})\n" 

255 self._digraph.add_edge(i.get('Source'), i.get('Target'), ntype=i.get('Type')) 

256 else: 

257 self.process_report += f"ADDING EDGE: {i.get('Source')} -> {i.get('Target')}\n" 

258 self._digraph.add_edge(i.get('Source'), i.get('Target')) 

259 else: 

260 self.process_report += f"ERROR (SKIPPING): {i}\n" 

261 logger.warn(f"Category: {i['Category Name']} yielded an empty source and/or target! \ 

262 {i.get('Source')}:{i.get('Target')}") 

263 

264 @property 

265 def data(self): 

266 return self._labeldata 

267 

268 def get_longest_path(self): 

269 return nx.dag_longest_path(self._digraph) 

270 

271 def get_path(self, source, target): 

272 path = [i for i in nx.all_simple_paths(self._digraph, source, target)] 

273 # Note path obj may contain 0 or multiple paths 

274 if not path or len(path) == 0: 

275 logger.warn(f"No path found for {source}:{target}") 

276 elif len(path) > 0: 

277 logger.warn(f"Multiple paths found for {source}:{target}. ({path})") 

278 else: 

279 return path[0] 

280 

281 def get_label(self, labelname, labeltype=None): 

282 """ 

283 Return a label name & tag pair or None 

284 """ 

285 item = self._lookup.get(labelname) # TODO: test duplicate category names 

286 if labeltype == "any": 

287 return item 

288 if labeltype and labeltype == "tag": 

289 if item and item.get('type') == 'tag': 

290 return item 

291 return None 

292 elif labeltype and labeltype == "s-tag": 

293 if item and item.get('type') == 's-tag': 

294 return item 

295 return None 

296 if item and item.get('type') not in ['tag', 's-tag']: 

297 return item 

298 return None 

299 

300 def get_attribute(self, labelname, labelattribute, use_default=True, labeltype=None, original_category=None): 

301 """ 

302 Get named attribute by label. Cases: 

303 Unknown attribute: raise exception 

304 label & labeltype are exist: 

305 attribute exists: return attribute 

306 attribute doesn't exist: return default attribute 

307 label & labeltype doesn't exists: 

308 return default attributes 

309 """ 

310 if labelattribute not in self._available_attributes: 

311 raise Exception(f"Unknown attribute: {labelattribute}") 

312 if labelattribute == "type": 

313 item = self.get_label(labelname, "any") 

314 else: 

315 item = self.get_label(labelname, labeltype) 

316 default_item = self.get_label('default') 

317 if item: 

318 if labelattribute == "type": 

319 return item.get("type", "") 

320 if not is_null(item[labelattribute]) and len(item[labelattribute]) != 0: 

321 return item[labelattribute] 

322 elif labelattribute == "type": 

323 return None 

324 if use_default: 

325 # Did not find the attribute - use defaults 

326 if labelattribute == "source": 

327 return "Uncategorized" 

328 if labelattribute == "target": 

329 return labelname 

330 if labelattribute == "classification": 

331 return 'Uncategorized' 

332 if labelattribute in ["node_color", "link_color"]: 

333 return default_item[labelattribute] 

334 return None 

335 

336 def print_graph(self, filename): 

337 from matplotlib import pyplot as plt 

338 plt.tight_layout() 

339 nx.draw_networkx(self._digraph, arrows=True) 

340 # plt.figure(figsize=(20,20)) 

341 plt.savefig(filename, dpi=200, format="PNG") 

342 plt.clf() 

343 

344 

345class Transactions: 

346 """ 

347 Contains transaction data and all helper methods for transforming and outputing. 

348 init with a Pandas dataframe from Google Sheets or a csv. NOTE: depending on the activity, dataframes are mutable. 

349 TODO: 

350 empty values from CSV are 'NaN' 

351 Make sure various synthetic entries don't cause problems for other computations 

352 Make sure that various methods can be run in any order, or enforce precedence/locking 

353 """ 

354 def __init__(self, dataframe, labels_obj, app_settings_obj): 

355 self._df = dataframe 

356 self._grouped_df = None 

357 self.length = len(dataframe) 

358 self._app_settings = app_settings_obj 

359 self._labels_obj = labels_obj 

360 is_valid = self._validate_df() # Will throw an exception if invalid 

361 # Convert all dates to datetimes and sort earliest to latest 

362 if self._app_settings.verbose: 

363 logger.info(f"Converting data in {self.length} fetched rows to datetimes...") 

364 self._df["Date"] = pd.to_datetime(self._df["Date"]) # Does not mutate dataframe 

365 if nattype.NaTType in [type(i) for i in self._df["Date"]]: 

366 logger.critical(repr(self._df["Date"])) 

367 raise Exception("Empty date found!") # There is probably a better way to do this. 

368 self.earliest_date = self._df["Date"].sort_values().iloc[0] # Returns pandas._libs.tslibs.timestamps.Timestamp 

369 self.latest_date = self._df["Date"].sort_values().iloc[len(self._df) - 1] 

370 self.default_date = self.latest_date - datetime.timedelta(days=1) # Day before latest date in dataset. 

371 self.max_depth = 1 

372 self.tips_processed = False 

373 self.sales_tax_processed = False 

374 self.surplus_deficit_processed = False 

375 self.amount_distributions = False 

376 self.process_report = f"Transactions report\n{'=' * 60}\n\n\n" 

377 

378 def _validate_df(self) -> bool: 

379 # Validate header row 

380 # WIP: port over new sources-targets column 

381 header_is_valid = DataRow.validate(self._df.columns.to_list(), True) 

382 if not header_is_valid[0]: 

383 raise Exception(f"Source columns failed validation! Error was: {header_is_valid[1]}") 

384 # Validate data rows 

385 amt_types = [isinstance(i, float) for i in self._df["Amount"]] 

386 if False in amt_types: 

387 invalid_loc = amt_types.index(False) # Note, only return first invalid location 

388 raise Exception(f"Invalid data found at row {invalid_loc}!\n {self._df.iloc[invalid_loc]}") 

389 return True 

390 

391 def audit(self, audit_data, date_range=None): 

392 """ 

393 Compare transaction data to audit data. Note this is specifically set up to use the column format for my 

394 bank export data. YMMV. 

395 Step 1: Apply date filtering (if applicable) (TODO: Test date handling) 

396 Step 2: Create a column with sums for amount, tax, tips to use for lookups 

397 Step 3: Loop through the bank export and search for matching entries based on the transaction amount 

398 Step 3a: If multiple transactions have the same amount, check that any of them fall +/- 5 days. 

399 Note: this does create a edge case where you could have a false negative if two transaction had the 

400 same values. within the search time. 

401 Write out a report with any suspected missings. Note: this will be a little noisy since I break 

402 transactions into multiple rows sometimes. (eg, Costco visits) 

403 """ 

404 dt_today = datetime.datetime.today() 

405 audit_report = "" 

406 

407 # Step 1 

408 if self._app_settings.filter_dates: 

409 if not date_range: 

410 raise Exception("Filter dates flag was True, but no dates were passed in!") 

411 start_date = date_range[0] 

412 end_date = date_range[1] 

413 if end_date is None and not self._app_settings.all_time: 

414 end_date = dt_today 

415 if start_date is None and not self._app_settings.all_time: 

416 start_date = self._app_settings.DEFAULT_START_DATE 

417 audit_data = df_date_filter(audit_data, start_date, end_date) 

418 elif not self._app_settings.all_time: 

419 audit_data = df_date_filter(audit_data, self._app_settings.DEFAULT_START_DATE, dt_today) 

420 

421 def safe_sum(pd_series): 

422 """ 

423 Safely sum a transaction frame 

424 """ 

425 def vval(val): 

426 if not val: 

427 return 0 

428 return round(float(val), 2) 

429 return round(pd_series["Amount"] + vval(pd_series.get("Sales Tax")) + vval(pd_series.get("Tips")), 2) 

430 

431 # Step 2 

432 rowsums = self._df.apply(safe_sum, axis=1) 

433 

434 # Step 3 

435 for idx, row in audit_data.iterrows(): 

436 transaction_found = False 

437 hits = self._df[rowsums == row["Amount"]] 

438 for hit in hits["Date"]: 

439 if (row["Date"] < (hit + datetime.timedelta(days=5))) and \ 

440 (row["Date"] > (hit - datetime.timedelta(days=5))): 

441 transaction_found = True 

442 if not transaction_found: 

443 audit_report += f"Transaction not found for {row['Date']} - {row['Description']} - {row['Amount']}\n" 

444 

445 return audit_report 

446 

447 def process(self, date_range=None): 

448 """ 

449 Process dataframe for sankey diagram 

450 Step 1: Drop rows based on tag exclusions 

451 Step 2: Split out any entries containing distributions (if feature flag is turned on) 

452 Step 3: Apply date filtering (if applicable) 

453 Step 4: Update transaction rows with sources and targets as defined by labels spreadsheet for all entries, 

454 modify sources/targets based on tags & store filters. Also handle recurring items. 

455 Step 5: Loop through each entry and: 

456 a: check for sales tax and/or tips column. Update total amount and create synthetic flows for tax/tips. 

457 b: crawl back through DAG, creating synthetic entries for predecessor nodes along the way, 

458 to ensure flows appear correctly. 

459 Step 6: Compute surplus or deficit flows 

460 Step 7: aggregate amounts for all shared source:target pairs 

461 """ 

462 

463 dt_today = datetime.datetime.today() 

464 self.process_report += f"Processing {len(self._df)} transactions \ 

465 from {self._app_settings.source_data_location()}\n{'-' * 60}\n" 

466 if self._app_settings.verbose: 

467 logger.info(f"Processing {len(self._df)} transactions from {self._app_settings.source_data_location()}") 

468 

469 # Step 1: 

470 if self._app_settings.exclude_tags: 

471 self.filter_tags(self._app_settings.exclude_tags) 

472 

473 # Step 2: 

474 if self._app_settings.distribute_amounts: 

475 if self._app_settings.verbose: 

476 logger.info("Distributing amounts...") 

477 self.distribute_amounts() # process report logging happens in called method 

478 

479 # Step 3: 

480 if self._app_settings.filter_dates: 

481 if not date_range: 

482 raise Exception("Filter dates flag was True, but no dates were passed in!") 

483 start_date = date_range[0] 

484 end_date = date_range[1] 

485 if end_date is None and not self._app_settings.all_time: 

486 end_date = dt_today 

487 if start_date is None and not self._app_settings.all_time: 

488 start_date = self._app_settings.DEFAULT_START_DATE 

489 self.process_report += f"Filtering for dates from {start_date} to {end_date}\n{'-' * 60}\n" 

490 if self._app_settings.verbose: 

491 logger.info(f"Filtering for dates from {start_date} to {end_date}") 

492 # TODO: test and find edge cases! 

493 self.filter_dates(start_date, end_date) 

494 elif not self._app_settings.all_time: 

495 self.filter_dates(self._app_settings.DEFAULT_START_DATE, dt_today) 

496 

497 self.update_title() 

498 # Step 4: 

499 self.apply_labels() 

500 # Step 5: 

501 self.process_rows() 

502 # Step 6: 

503 self.create_surplus_deficit_flows() 

504 # Step 7: 

505 self.collapse() 

506 # -- END Transactions.process() -- 

507 

508 def process_line(self, date_range=None): 

509 """ 

510 Process dataframe for line chart diagram 

511 Step 1: Drop rows based on tag exclusions 

512 Step 2: Split out any entries containing distributions (if feature flag is turned on) 

513 Step 3: Apply date filtering (if applicable) 

514 Step 4: Update transaction rows with sources and targets as defined by labels spreadsheet for all entries, 

515 modify sources/targets based on tags & store filters. Also handle recurring items. 

516 """ 

517 

518 dt_today = datetime.datetime.today() 

519 self.process_report += f"Processing {len(self._df)} transactions from \ 

520 {self._app_settings.source_data_location()}\n{'-' * 60}\n" 

521 if self._app_settings.verbose: 

522 logger.info(f"Processing {len(self._df)} transactions from {self._app_settings.source_data_location()}") 

523 

524 # Step 1: 

525 if self._app_settings.exclude_tags: 

526 self.filter_tags(self._app_settings.exclude_tags) 

527 

528 # Step 2: 

529 if self._app_settings.distribute_amounts: 

530 if self._app_settings.verbose: 

531 logger.info("Distributing amounts...") 

532 self.distribute_amounts() # process report logging happens in called method 

533 

534 # Step 3: 

535 if self._app_settings.filter_dates: 

536 if not date_range: 

537 raise Exception("Filter dates flag was True, but no dates were passed in!") 

538 start_date = date_range[0] 

539 end_date = date_range[1] 

540 if end_date is None and not self._app_settings.all_time: 

541 end_date = dt_today 

542 if start_date is None and not self._app_settings.all_time: 

543 start_date = self._app_settings.DEFAULT_START_DATE 

544 self.process_report += f"Filtering for dates from {start_date} to {end_date}\n{'-' * 60}\n" 

545 if self._app_settings.verbose: 

546 logger.info(f"Filtering for dates from {start_date} to {end_date}") 

547 # TODO: test and find edge cases! 

548 self.filter_dates(start_date, end_date) 

549 elif not self._app_settings.all_time: 

550 self.filter_dates(self._app_settings.DEFAULT_START_DATE, dt_today) 

551 

552 # self.update_title() # may need to use a different title block for line charts 

553 # Step 4: 

554 self.apply_labels() 

555 # -- END Transactions.process_line() -- 

556 

557 def filter_tags(self, tags_to_exclude): 

558 # tags_to_exclude: ['tag1','tag2', ...] 

559 self.process_report += f"Checking for tags to exclude: {tags_to_exclude}\n{'-' * 60}\n" 

560 df_changed = False 

561 rows_to_drop = [] 

562 for k, v in enumerate(self._df["Tags"]): 

563 tag_matches = DataRow.tag_matches(v, tags_to_exclude) # None if either arg is None or if no matches 

564 if tag_matches: 

565 df_changed = True 

566 rows_to_drop.append(self._df.index[k]) 

567 if df_changed and rows_to_drop: # Do as a separate loop to avoid changing the frame as we're iterating over it. 

568 for row_idx in rows_to_drop: 

569 self.process_report += f"DROPPING row due to exclude tags: {self._df.loc[row_idx]}\n" 

570 if self._app_settings.verbose: 

571 logger.info(f"DROPPING row due to exclude tags: {self._df.loc[row_idx]}") 

572 self._df.drop(row_idx, inplace=True) 

573 if df_changed: 

574 self._df.reset_index(inplace=True, drop=True) 

575 

576 def add_row(self, row_data, already_validated=False): 

577 try: 

578 idx = len(self._df) # Could use self.length... 

579 if already_validated: 

580 self._df.loc[idx] = row_data 

581 else: 

582 self._df.loc[idx] = DataRow.validate(row_data) 

583 self.length = idx + 1 

584 except Exception as e: 

585 logger.error(f"Error adding row: {row_data} - {e}") 

586 import pdb 

587 pdb.set_trace() 

588 

589 def apply_labels(self): 

590 """ 

591 Loop through each row in dataframe, looking up source-target nodes using category names, and overriding if 

592 indicated by tags or stores flags. 

593 Also add each source:target pair as an edge in a DAG, to be used in the sankey diagram generator to create 

594 intermediate transactions. 

595 NOTE: this process will not be adding intermediate transactions, but will ensure the DAG is correct so that 

596 intermediate transactions can be added later. 

597 """ 

598 self.process_report += f"Running Transactions.apply_labels(). Tags has: {self._app_settings.tags}, \ 

599 tag_override is {self._app_settings.tag_override} and stores has: {self._app_settings.stores}\n\n" 

600 if self._app_settings.tags and self._app_settings.verbose: 

601 logger.info(f"Tag search enabled: Looking for tags: {self._app_settings.tags}") 

602 if self._app_settings.tag_override: 

603 logger.info("Overriding tags...") 

604 if self._app_settings.stores and self._app_settings.verbose: 

605 logger.info(f"Store search enabled: Looking for stores: {self._app_settings.stores}") 

606 if self._app_settings.verbose: 

607 logger.info(f"Applying labels for {len(self._df)} transactions") 

608 

609 if self._app_settings.recurring: 

610 if self._app_settings.verbose: 

611 logger.info("Recurring transactions to be split out") # TODO: precludes tag:recurring handling. 

612 # Add edge from Income to Recurring 

613 self._labels_obj._digraph.add_edge("Income", "Recurring") 

614 

615 # Util functions ................................................................................... 

616 def get_source_target_labels(this_obj, this_category_key, this_category_val, step_id): 

617 # Get default labels defined for category from sources-targets sheet, override from data sheet if set there. 

618 src = this_obj._labels_obj.get_attribute(this_category_val, "source") 

619 tgt = this_obj._labels_obj.get_attribute(this_category_val, "target") 

620 classification = this_obj._labels_obj.get_attribute(this_category_val, "classification") 

621 this_obj.process_report += f"[{step_id}] Found default src:target for {this_category_val} -> {src}:{tgt} \ 

622 (classification: {classification})\n" 

623 # Allow individual transaction rows to override label lookups 

624 data_override_s_t = False 

625 transaction_source = this_obj._df.at[this_category_key, "Source"] 

626 transaction_target = this_obj._df.at[this_category_key, "Target"] 

627 if not is_empty(transaction_source) and not is_empty(transaction_target): 

628 # Both a source and target were specifed in the transaction data 

629 src = transaction_source 

630 tgt = transaction_target 

631 data_override_s_t = True 

632 elif not is_empty(transaction_source) and is_empty(transaction_target): 

633 # A source but not target were specifed in the transaction data 

634 src = transaction_source 

635 data_override_s_t = True 

636 elif is_empty(transaction_source) and not is_empty(transaction_target): 

637 # A Target but not source were specifed in the transaction data, 

638 # we will append it to the default source-target 

639 if transaction_target != tgt: # Skip if the override is the same as the default target 

640 if not this_obj._labels_obj._digraph.has_edge(src, tgt): 

641 this_obj._labels_obj._digraph.add_edge(src, tgt) 

642 src = tgt 

643 tgt = transaction_target 

644 data_override_s_t = True 

645 

646 if data_override_s_t: 

647 this_obj.process_report += f"[{this_step_id}] Override source/target for {this_category_val} from \ 

648 transaction data -> {src}: {tgt}\n" 

649 

650 return src, tgt, classification 

651 

652 # MAIN LOOP ........................................................................................ 

653 

654 for k, v in enumerate(self._df["Category"]): 

655 # Main labeling loop. Iterate over each transaction, look up source-target information in labels 

656 # spreadsheet applying/overriding as indicated. 

657 this_step_id = str(uuid4())[:8] 

658 self.process_report += f"[{this_step_id}] START Processing {self._df.at[k, 'Date']} | \ 

659 {v} | {self._df.at[k, 'Tags']} | ${self._df.at[k, 'Amount']}\n" 

660 is_deduction = False 

661 if is_empty(v): 

662 # Note: this should not happen... raise exception instead? 

663 self.process_report += f"[{this_step_id}] SKIPPING empty category {self._df.loc[k]}\n" 

664 logger.info(f"Skipping empty category {self._df.loc[k]}") 

665 continue 

666 # Tag logic 

667 # TODO: test source tags 

668 

669 this_source, this_target, this_classification = get_source_target_labels(self, k, v, this_step_id) 

670 

671 # Handle deduction types (these go directly from a income to an expense, skipping the 'Income' category 

672 # and have a variable source based on their description) 

673 # Use case is a transaction with income would normally be something like "My Job" -> "Income" and then a 

674 # second transaction with income taxes would be "My Job" -> "Income Taxes" (skipping income category) 

675 # TODO: Verify this works correctly with s-tags 

676 if this_source == "DEDUCTIONS": 

677 this_source = self._df.at[k, "Description"] 

678 self._df.at[k, "Type"] = "deduction" 

679 is_deduction = True 

680 self.process_report += f"[{this_step_id}] Deduction type found src:target -> \ 

681 {this_source}:{this_target}\n" 

682 if self._app_settings.verbose: 

683 logger.info(f"Found DEDUCTION type transaction. Set to {this_source}:{this_target}") 

684 

685 if self._app_settings.recurring and this_source == "Income": 

686 # Replace with recurring 

687 this_source = "Recurring" 

688 # Verify base edge is in DAG 

689 if not self._labels_obj._digraph.has_edge(this_source, this_target): 

690 logger.info(f"Edge: {this_source}:{this_target} not found! Adding to graph.") 

691 self._labels_obj._digraph.add_edge(this_source, this_target) 

692 

693 # For now logic around tags/stores with deductions flow is undefined - skip processing. 

694 if not is_deduction: 

695 # Check for store match 

696 store_matches = False 

697 if self._app_settings.stores: 

698 this_name = self._df.at[k, "Description"] 

699 if self._app_settings.stores and this_name in self._app_settings.stores: 

700 store_matches = True 

701 

702 # Check for tag match(es) 

703 # Note currently we will only ever use the first match. 

704 tag_matches = DataRow.tag_matches(self._df.at[k, "Tags"], self._app_settings.tags) 

705 # None if flag is not enabled or no matches 

706 tag_type = None 

707 if tag_matches: 

708 if self._app_settings.recurring and tag_matches and tag_matches[0] == "Recurring": 

709 raise Exception("Not double processing recurring tags!") # TODO: handle more quietly 

710 if self._app_settings.verbose: 

711 logger.info(f"Got tag matches: {tag_matches}") 

712 # Check for s-tags 

713 # Get tag type 

714 tag_type = self._labels_obj.get_attribute(tag_matches[0], "type") 

715 if tag_type == "s-tag": 

716 # If the flow is directly to/from "Income", replace "Income" with the Tag 

717 if this_source == "Income": 

718 this_source = tag_matches[0] 

719 elif this_target == "Income": 

720 this_target = tag_matches[0] 

721 else: 

722 self._labels_obj._digraph.add_edge(this_source, this_target, type="s-tag") 

723 # NOTE: if tag is distant from "Income", we'll need to handle it while reconciling DAG 

724 elif self._app_settings.tag_override: 

725 # If overriding tags, we'll use the labels sheet to determine placement 

726 this_source = self._labels_obj.get_attribute(tag_matches[0], "source", labeltype="tag", 

727 use_default=False) 

728 this_target = self._labels_obj.get_attribute(tag_matches[0], "target", labeltype="tag", 

729 use_default=False) 

730 if not this_source: 

731 # If we don't have matching tag defined in sources-targets sheet, 

732 # just create it to/from income 

733 # lookup the target we would have without tag matching 

734 def_target = self._labels_obj.get_attribute(v, "target", use_default=False) 

735 if def_target == "Income": # Note: Breaks if we have income flows more than one deep 

736 this_source = tag_matches[0] 

737 this_target = "Income" 

738 else: 

739 this_source = "Income" 

740 this_target = tag_matches[0] 

741 else: 

742 # We are appending the tags as new target to the end of the flow 

743 this_source = this_target 

744 this_target = tag_matches[0] 

745 if self._app_settings.verbose: 

746 logger.info(f"Adding edge to graph for tag ({tag_matches}[0]): \ 

747 {this_source} -> {this_target}") 

748 # self._labels_obj._digraph.add_edge(this_source, this_target) 

749 

750 if store_matches: 

751 this_source = this_target 

752 this_target = self._df.at[k, "Description"] 

753 if self._app_settings.verbose: 

754 logger.info(f"Adding edge to graph for store ({this_name}): {this_source} -> {this_target}") 

755 # self._labels_obj._digraph.add_edge(this_source, this_target) 

756 

757 self.process_report += f"[{this_step_id}] RESOLVED src:target for {v} -> {this_source}:{this_target}\n" 

758 if self._app_settings.verbose: 

759 logger.info(f"RESOLVED source/target > {this_source}:{this_target}") 

760 # Circuit breaker 

761 if is_empty(this_source) or is_empty(this_target): 

762 raise Exception(f"Got empty source or target for category {v}! ({this_source}:{this_target})") 

763 

764 # Check for final edge in DAG and add if necessary 

765 if not self._labels_obj._digraph.has_edge(this_source, this_target): 

766 logger.info(f"Edge: {this_source}:{this_target} not found! Adding to graph.") 

767 self._labels_obj._digraph.add_edge(this_source, this_target) 

768 

769 # Sanity check that we haven't created an orphan edge 

770 if not (is_deduction or tag_type == 's-tag') and \ 

771 "Income" not in nx.ancestors(self._labels_obj._digraph, this_target) and \ 

772 "Income" not in nx.descendants(self._labels_obj._digraph, this_source): 

773 logger.debug(f"{self._df.loc[k]}") 

774 raise Exception(f"No path to \'Income\' from {this_source}:{this_target}") 

775 

776 # Set source-target + classification on original transaction 

777 self._df.at[k, "Source"] = this_source 

778 self._df.at[k, "Target"] = this_target 

779 self._df.at[k, "Classification"] = this_classification 

780 

781 self.process_report += f"[{this_step_id}] FINISHED processing labels\n" 

782 

783 def process_rows(self): 

784 """ 

785 Process individual transactions, creating synthetic transactions as needed to satisfy flows 

786 """ 

787 msg = f"Processing row data on {len(self._df)} rows" 

788 self.process_report += f"\n{'-' * 60}\nRunning Transactions.process_rows()\n{'-' * 60}\n" 

789 self.process_report += msg + "\n" 

790 if self._app_settings.verbose: 

791 logger.info(msg) 

792 for k, v in enumerate(self._df["Source"]): 

793 this_row = self._df.loc[k] 

794 this_step_id = str(uuid4())[:8] 

795 is_recurring = False 

796 self.process_report += f"[{this_step_id}] START Processing {self._df.at[k, 'Date']} | \ 

797 {self._df.at[k, 'Description']} | ${self._df.at[k, 'Amount']}\n" 

798 if self._app_settings.verbose: 

799 logger.info(f"{'-' * 40}\nGot a transaction: {self._df.at[k, 'Date']} | \ 

800 {self._df.at[k, 'Description']} | {self._df.at[k, 'Source']}:{self._df.at[k, 'Target']} | \ 

801 ${self._df.at[k, 'Amount']}\n") 

802 

803 # Check for tag match(es) 

804 # Note currently we will only ever use the first match. 

805 # TODO: discard tag if tag is "Recurring" AND _app_settings.recurring is True 

806 # None if flag is not enabled or no matches 

807 tag_matches = DataRow.tag_matches(self._df.at[k, "Tags"], self._app_settings.tags) 

808 tag_type = None 

809 if tag_matches: 

810 tag_type = self._labels_obj.get_attribute(tag_matches[0], "type") # Get tag type 

811 

812 # Check for recurring tag 

813 has_recurring = DataRow.tag_matches(self._df.at[k, "Tags"], ["Recurring"]) 

814 if self._app_settings.recurring and has_recurring and "Recurring" in has_recurring: 

815 is_recurring = True 

816 if self._app_settings.verbose: 

817 logger.info(">> Processing recurring transaction") 

818 

819 # Handle taxes 

820 if not is_empty(this_row["Sales Tax"], True): 

821 if self._app_settings.separate_taxes: 

822 # Add sales tax to it's own root category as a new row 

823 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \ 

824 'Income' -> 'Sales Tax' | ${this_row['Sales Tax']}\n" 

825 self.add_row(DataRow.create( 

826 date=this_row.Date, 

827 category_name="Sales Tax", 

828 amount=this_row["Sales Tax"], 

829 source="Income", 

830 target="Sales Tax", 

831 description=this_row.Description, 

832 tags=this_row.Tags, 

833 comment='Synthetic row for sales tax', 

834 distribution=this_row.Distribution, 

835 classification=self._app_settings.sales_tax_classification 

836 ), True) 

837 else: 

838 # Create new sales tax child target from this original target row & 

839 # add sales tax back to original row amount 

840 # Note: if store or tag processing is being done, this may already be one removed from 

841 # the original category 

842 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \ 

843 {this_row.Target} -> 'Sales Tax' | ${this_row['Sales Tax']}\n" 

844 if not is_empty(this_row["Sales Tax"], True): 

845 self.add_row(DataRow.create( 

846 date=this_row.Date, 

847 category_name="Sales Tax", 

848 amount=this_row["Sales Tax"], 

849 source=this_row.Target, 

850 target="Sales Tax", 

851 description=this_row.Description, 

852 tags=this_row.Tags, 

853 comment='Synthetic row for sales tax', 

854 distribution=this_row.Distribution, 

855 classification=self._app_settings.sales_tax_classification 

856 ), True) 

857 # For this to behave as expected, it needs to add the sales tax amount back 

858 # to the original Amount 

859 self._df.at[k, "Amount"] = round(this_row.Amount + this_row["Sales Tax"], 2) 

860 self.process_report += f"[{this_step_id}] UPDATED: {this_row.Date} | {this_row.Description} | \ 

861 {this_row.Source} -> {this_row.Target} | \ 

862 ${this_row.Amount} -> ${self._df.at[k, 'Amount']}\n" 

863 

864 # Handle tips by creating new Tips child target from this original target row & add tip back 

865 # to original row amount 

866 # Note: if store or tag processing is being done, this may already be one removed from the original category 

867 if not is_empty(this_row["Tips"], True): 

868 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \ 

869 {this_row.Target} -> 'Tips' | ${this_row['Tips']}\n" 

870 # Sales tax computation may have changed from this_row.Amount value 

871 orig_amount = self._df.at[k, "Amount"] 

872 self.add_row(DataRow.create( 

873 date=this_row.Date, 

874 category_name="Tips", 

875 amount=this_row["Tips"], 

876 source=this_row.Target, 

877 target="Tips", 

878 description=this_row.Description, 

879 tags=this_row.Tags, 

880 comment='Synthetic row for tips', 

881 distribution=this_row.Distribution, 

882 classification=self._app_settings.tip_classification 

883 ), True) 

884 # For this to behave as expected, it needs to add the tips amount back to the original Amount 

885 self._df.at[k, "Amount"] = round(orig_amount + this_row["Tips"], 2) 

886 self.process_report += f"[{this_step_id}] UPDATED: {this_row.Date} | {this_row.Description} | \ 

887 {this_row.Source} -> {this_row.Target} | ${orig_amount} -> ${self._df.at[k, 'Amount']}\n" 

888 

889 # Traverse DAG from row source back to Income, adding a synthetic row for each edge it finds. 

890 # NOTE: if using s-tags, will go back to the tag instead of Income 

891 # TODO: explore cases where we are multiple synthetic rows deep, or a synthetic row has been added that 

892 # flows INTO income, or orphan flows (eg deductions) include synthetic nodes. 

893 s_tag_d1 = False 

894 if self._df.at[k, "Type"] == 'deduction': # deduction types skip DAG processing for now 

895 self.process_report += f"[{this_step_id}] SKIPPING DAG traversal, since this is a deduction type.\n" 

896 if self._app_settings.verbose: 

897 logger.info(f"Skipping DAG checks as this was a deductions type entry: {this_row.Date} | \ 

898 {this_row.Description} | {this_row.Source} -> {this_row.Target} | ${this_row.Amount}") 

899 continue 

900 

901 # traverse graph 

902 if tag_type == 's-tag' and (this_row.Source == tag_matches[0] or this_row.Target == tag_matches[0]): 

903 # First order edge and is s-tag - skip processing 

904 s_tag_d1 = True 

905 elif "Income" in nx.ancestors(self._labels_obj._digraph, this_row.Target): 

906 # Must be an expense category: 

907 start_node = "Income" 

908 # This breaks if there are multiple paths to the end node, eg when using tags/stores flows 

909 end_node = this_row.Source 

910 else: 

911 # Must be an income category 

912 start_node = this_row.Source 

913 end_node = "Income" 

914 

915 if not s_tag_d1: 

916 self.process_report += f"[{this_step_id}] Starting to traverse DAG for {start_node} -> {end_node}\n" 

917 if self._app_settings.verbose: 

918 logger.info(f"Traversing graph for {start_node}:{end_node}...") 

919 

920 pgroups = [i for i in nx.all_simple_edge_paths(self._labels_obj._digraph, start_node, end_node)] 

921 if is_recurring: 

922 new_groups = [[]] 

923 for g in pgroups[0]: 

924 if g[0] == "Income": 

925 if self._app_settings.verbose: 

926 logger.info(f"--- Injecting Income:Recurring and Recurring:{g[1]} nodes ----") 

927 new_groups[0].append(("Income", "Recurring")) 

928 new_groups[0].append(("Recurring", g[1])) 

929 else: 

930 new_groups[0].append(g) 

931 pgroups = new_groups 

932 

933 self.process_report += f"[{this_step_id}] DAG search yielded groups: {pgroups}\n" 

934 if self._app_settings.verbose: 

935 logger.info(f"Searched DAG for {start_node} -> {end_node} and got group: {pgroups}...") 

936 if len(pgroups) != 1: 

937 # Potentially an error condition. Maybe raise an exception 

938 logger.info(f"Edge paths search did not yield the expected number of groups! {pgroups}") 

939 for pgroup in pgroups: 

940 # Each edge path will be an array of tuples, like [(source1,target1), (source2,target2), ...] 

941 # Iterate over the paths (ignoring the one that matches the original entry) and create synthetic 

942 # entries for each one. 

943 for pitem in pgroup: 

944 # Don't need to process the pair we already have 

945 if pitem == (this_row.Source, this_row.Target): 

946 continue 

947 syn_source, syn_target = pitem 

948 if syn_source == "Income" and tag_type == 's-tag': 

949 # Since this is an s-tag flow, the root of the flow should be the tag 

950 syn_source = tag_matches[0] 

951 if syn_target == "Income" and tag_type == 's-tag': 

952 syn_target = tag_matches[0] # TODO: verify that this case is handled as expected 

953 self.process_report += f"[{this_step_id}] ADDED: {this_row.Date} | {this_row.Description} | \ 

954 {syn_source} -> {syn_target} | ${self._df.at[k, 'Amount']}\n" 

955 if self._app_settings.verbose: 

956 logger.info(f"Adding synthetic entry: {this_row.Date} | {this_row.Description} | \ 

957 {syn_source} -> {syn_target} | ${self._df.at[k, 'Amount']}") 

958 self.add_row(DataRow.create( 

959 date=this_row.Date, 

960 category_name=this_row.Category, 

961 amount=self._df.at[k, "Amount"], 

962 source=syn_source, 

963 target=syn_target, 

964 description=this_row.Description, 

965 tags=this_row.Tags, 

966 comment='Synthetic row', 

967 distribution=this_row.Distribution, 

968 classification="Uncategorized" 

969 ), True) 

970 

971 self.process_report += f"[{this_step_id}] DONE processing.\n{'-' * 40}\n" 

972 

973 def collapse(self): 

974 self.process_report += f"\n{'-' * 40}\nStepping into Transactions.collapse()\n{'-' * 40}\n" 

975 if self._app_settings.verbose: 

976 logger.info("Aggregating all source-target pairs") 

977 # Collapse all the pairs down for cleaner flows 

978 grouped_df = self._df.groupby(['Source', 'Target']).agg({'Amount': 'sum'}) 

979 # Resetting an index appears to just create a new one unless the drop argument is passed in, 

980 # but that's fine in this case. 

981 grouped_df.reset_index(inplace=True) 

982 self._grouped_df = grouped_df # TODO: Review grouped_df vs _df 

983 if self._app_settings.verbose: 

984 logger.info(f"Collapsed {len(self._df)} transactions down to {len(self._grouped_df)}") 

985 

986 def create_surplus_deficit_flows(self): 

987 self.process_report += f"\n{'-' * 40}\nStepping into Transactions.create_surplus_deficit_flows()\n{'-' * 40}\n" 

988 if self.surplus_deficit_processed: 

989 logger.info("Surplus/deficit flows have already been processed!") 

990 return 

991 self.surplus_deficit_processed = True 

992 if self._app_settings.verbose: 

993 logger.info("Computing source/deficit flows") 

994 # Check for s-tag nodes 

995 # Returns a dict like: {'a': 's-tag', 'd': 's-tag, 'c': 'tag', ...} 

996 node_types = nx.get_node_attributes(self._labels_obj._digraph, 'type') 

997 s_nodes = [i for i in node_types if node_types[i] == 's-tag'] # A list of s-nodes 

998 s_nodes.append("Income") 

999 

1000 for s_node in s_nodes: 

1001 # Create synthetic entries showing difference between flows into and out of Income as either a 

1002 # surplus or deficit. 

1003 # Date should always be within the current filter range, if used. 

1004 # TODO: review for race conditions with feed_in arg and computing surpluses 

1005 total_income = self._df.loc[self._df["Target"] == s_node].agg({'Amount': 'sum'})["Amount"] 

1006 total_expenses = self._df.loc[self._df["Source"] == s_node].agg({'Amount': 'sum'})["Amount"] 

1007 if total_income > total_expenses: 

1008 surplus = total_income - total_expenses 

1009 if s_node != "Income" and self._app_settings.feed_in: 

1010 # Feeding s-tag surplus back to Income 

1011 self.process_report += f"ADDED: {self.default_date} | '{s_node} Surplus' | {s_node} -> 'Income' | \ 

1012 ${surplus}\n" 

1013 self.add_row(DataRow.create( 

1014 date=self.default_date, 

1015 category_name=f"{s_node} Surplus", 

1016 amount=surplus, 

1017 source=s_node, 

1018 target=f"Income", 

1019 comment=f"Synthetic {s_node} surplus entry" 

1020 ), True) 

1021 else: 

1022 # Keeping s-tag surplus(es) as distinct flow 

1023 self.process_report += f"ADDED: {self.default_date} | '{s_node} Surplus' | {s_node} -> \ 

1024 '{s_node} Surplus' | ${surplus}\n" 

1025 self.add_row(DataRow.create( 

1026 date=self.default_date, 

1027 category_name=f"{s_node} Surplus", 

1028 amount=surplus, 

1029 source=s_node, 

1030 target=f"{s_node} Surplus", 

1031 comment=f"Synthetic {s_node} surplus entry" 

1032 ), True) 

1033 

1034 # Copy 'Surplus' color information to new entry 

1035 this_label = self._labels_obj._lookup.get("Surplus") 

1036 if this_label: 

1037 this_label["source"] = {s_node} 

1038 this_label["target"] = f'{s_node} Surplus' 

1039 self._labels_obj._lookup[f'{s_node} Surplus'] = this_label 

1040 

1041 elif total_expenses > total_income: 

1042 # TODO: If using feed_in arg, copy Income surplus (if any) to s-tag?? 

1043 # (or, more accurately, s-tag deficit from income) 

1044 deficit = total_expenses - total_income 

1045 self.process_report += f"ADDED: {self.default_date} | '{s_node} Deficit' | '{s_node} Deficit' -> \ 

1046 {s_node} | ${deficit}\n" 

1047 self.add_row(DataRow.create( 

1048 date=self.default_date, 

1049 category_name=f"{s_node} Deficit", 

1050 amount=deficit, 

1051 source=f"{s_node} Deficit", 

1052 target=s_node, 

1053 comment=f"Synthetic {s_node} deficit entry" 

1054 ), True) 

1055 # Copy 'Deficit' color information to new entry 

1056 this_label = self._labels_obj._lookup.get("Deficit") 

1057 if this_label: 

1058 this_label["source"] = {s_node} 

1059 this_label["target"] = f'{s_node} Deficit' 

1060 self._labels_obj._lookup[f'{s_node} Deficit'] = this_label 

1061 

1062 def filter_dates(self, start_date, end_date): 

1063 self.process_report += f"\n{'-' * 40}\nStepping into Transactions.filter_dates({start_date}, \ 

1064 {end_date})\n{'-' * 40}\n" 

1065 # All times should be pandas._libs.tslibs.timestamps.Timestamp 

1066 # Will discard data outside supplied daterange... TODO: preserve original df?? 

1067 

1068 if self._app_settings.verbose: 

1069 logger.info(f"Filtering data from {start_date} .. {end_date}...") 

1070 

1071 if start_date is None and end_date is None: 

1072 return # no op. 

1073 

1074 # Coerce to timestamp 

1075 if type(start_date) is not timestamps.Timestamp: 

1076 start_date = pd.to_datetime(start_date) # pd.to_datetime(None) returns None 

1077 if type(end_date) is not timestamps.Timestamp: 

1078 end_date = pd.to_datetime(end_date) 

1079 

1080 if end_date: # Set up a default date guaranteed to be within the filter range. 

1081 self.default_date = end_date - datetime.timedelta(days=1) # One day before our end date 

1082 elif start_date: 

1083 self.default_date = start_date + datetime.timedelta(days=1) # One day ater our start date 

1084 

1085 # Start or end date is unbounded, set it to the earliest (or latest) date in the fetched data. 

1086 if not start_date: 

1087 start_date = self.earliest_date 

1088 if not end_date: 

1089 end_date = self.latest_date 

1090 

1091 if start_date > end_date: 

1092 raise Exception(f"Start date ({start_date.date()}) is after end date ({end_date.date()})!") 

1093 

1094 self.process_report += f">> final dates to use for filtering: {start_date} - {end_date} <<\n{'-' * 60}\n" 

1095 

1096 dt_mask = (self._df["Date"] >= start_date) & (self._df["Date"] <= end_date) # Boolean sum of the two masks 

1097 self._df = self._df[dt_mask] 

1098 self._df = self._df.reset_index(drop=True) 

1099 if len(self._df) == 0: 

1100 raise Exception(f"Supplied date range ({start_date.date()} - {end_date.date()}) does not contain \ 

1101 any transactions!") 

1102 self.earliest_date = self._df["Date"].sort_values().iloc[0] 

1103 self.latest_date = self._df["Date"].sort_values().iloc[len(self._df) - 1] 

1104 self.default_date = self.latest_date - datetime.timedelta(days=1) 

1105 self.process_report += f"DONE filtering dates. Earliest date is: {self.earliest_date}, latest date is: \ 

1106 {self.latest_date}, default date is: {self.default_date}, \ 

1107 and the dataset now contains {len(self._df)} transactions.\n{'-' * 60}\n" 

1108 

1109 def explode_tags(self): 

1110 # Split each tag out to its own column, with true/false value for a given row 

1111 # Note: currently unused but possible future functionality around tags. 

1112 result = {} 

1113 unique_df_tags = [val.strip() for sublist in self._df["Tags"].str.split(",").tolist() for val in sublist] 

1114 unique_df_tags = list(set(unique_df_tags)) 

1115 if '' in unique_df_tags: 

1116 unique_df_tags.remove('') 

1117 for tag in unique_df_tags: 

1118 # TODO: fix edge case if you had a tag 'foo' and another tag 'foot' where 'foot' is marked as having 'foo' 

1119 self._df[tag] = self._df["Tags"].str.contains(tag).to_list() 

1120 

1121 def distribute_amounts(self): 

1122 # Distribute a payment over a time period 

1123 # Note: this creates synthetic transactions in the future, which will affect latest date. 

1124 # TODO: verify that this will fall within the current date filters, if being used. 

1125 # current method is to just call this before filter_dates() would need to refactor to be more robust 

1126 # TODO: handle negative values to distribute backwards (as in, a charge that represents past costs) 

1127 if self.amount_distributions: 

1128 logger.info("Amounts have already been distributed!") 

1129 return 

1130 self.amount_distributions = True 

1131 self.process_report += f"{'-' * 60}\nRunning Transactions.distribute_amounts()\n{'-' * 60}\n" 

1132 df_idx = len(self._df) 

1133 # Loop through dataset looking for distributed rows 

1134 for k, v in enumerate(self._df["Distribution"]): 

1135 if not is_empty(v, True): 

1136 reverse_distribution = False 

1137 v = int(v) 

1138 if v < 0: 

1139 # Negative distribution 

1140 reverse_distribution = True 

1141 v = abs(v) 

1142 # A tuple with (Amount, Sales Tax) 

1143 original_amount = float(self._df.at[k, "Amount"]), self._df.at[k, "Sales Tax"] 

1144 original_date = self._df.at[k, "Date"] 

1145 dist_amount = original_amount[0] / int(v) # Calculate total amount / distributions 

1146 dist_sales_tax = 0 

1147 dists = [] 

1148 if not is_empty(original_amount[1], True): 

1149 dist_sales_tax = float(original_amount[1]) / int(v) # Calculate sales tax amount / distributions 

1150 # Reset original transaction to distirbution amount 

1151 self.process_report += f"UPDATED: {self._df.at[k, 'Date']} | {self._df.at[k, 'Description']} | \ 

1152 {self._df.at[k, 'Source']} -> {self._df.at[k, 'Target']} | ${dist_amount} (+ ${dist_sales_tax})\n" 

1153 self._df.at[k, "Amount"] = dist_amount 

1154 self._df.at[k, "Sales Tax"] = dist_sales_tax 

1155 # Create Synthetic entries for distributed transactions 

1156 counter = v 

1157 while counter > 1: # Don't need to do the first one, as we changed it in place 

1158 if reverse_distribution: 

1159 # We assume that the distrubtion value is in months. 

1160 new_date = original_date - datetime.timedelta(weeks=(counter - 1) * 4.33) 

1161 else: 

1162 # We assume that the distrubtion value is in months. 

1163 new_date = original_date + datetime.timedelta(weeks=(counter - 1) * 4.33) 

1164 self.process_report += f"ADDED: {new_date} | {self._df.at[k, 'Description']} | \ 

1165 {self._df.at[k, 'Source']} -> {self._df.at[k, 'Target']} | \ 

1166 ${dist_amount} (+ ${dist_sales_tax})\n" 

1167 # create(date, category_name, source, target, amount, description="", sales_tax=0, tips=0, 

1168 # comment="", tags="", row_type="", distribution=0): 

1169 # Assuming no tips on distributed transactions for now 

1170 dists.append(DataRow.create( 

1171 new_date, 

1172 self._df.at[k, "Category"], 

1173 self._df.at[k, "Source"], 

1174 self._df.at[k, "Target"], 

1175 dist_amount, 

1176 self._df.at[k, "Description"], 

1177 dist_sales_tax, 

1178 0, 

1179 f"Synthetic transaction from original transaction on {original_date} of {original_amount[0]} \ 

1180 (+{original_amount[1]})", 

1181 self._df.at[k, "Tags"], 

1182 self._df.at[k, "Type"], 

1183 0, 

1184 self._df.at[k, "Classification"] 

1185 )) 

1186 counter -= 1 

1187 for row in dists: 

1188 self._df.loc[df_idx] = row # Add check_data_row here? 

1189 df_idx += 1 

1190 self.latest_date = self._df["Date"].sort_values()[len(self._df) - 1] # Reset latest date value 

1191 

1192 def update_title(self): 

1193 # TODO: add flag information to title 

1194 # TODO: Move to sankeyutils class 

1195 self.title = f"{self._app_settings.base_title} ({self.earliest_date.month}/{self.earliest_date.day}/\ 

1196 {self.earliest_date.year} - {self.latest_date.month}/{self.latest_date.day}/{self.latest_date.year}) \ 

1197 [{(self.latest_date - self.earliest_date).days} days]" 

1198 if self._app_settings.distribute_amounts: 

1199 self.title += "<br> Multi-month transactions are being distributed" 

1200 if self._app_settings.exclude_tags: 

1201 self.title += f"<br> Tags being excluded: {', '.join(self._app_settings.exclude_tags)}" 

1202 if self._app_settings.tags: 

1203 self.title += f"<br> Tags being used: {', '.join(self._app_settings.tags)}" 

1204 if self._app_settings.recurring: 

1205 self.title += f"<br> Recurring transactions are being split out" 

1206 

1207 

1208class TransactionRow: 

1209 """ 

1210 WIP: not in use at this time. 

1211 """ 

1212 def __init__(self, df, key): 

1213 self.key = key 

1214 self.data = {} 

1215 for col in DataRow.fields: 

1216 val = df.at[key, col] 

1217 if is_null(val) and DataRow.fields[col]["required"]: 

1218 raise Exception(f"Required column {col} was null for {repr(df[key])}") 

1219 self.data[col] = val 

1220 

1221 class TransactionDate: 

1222 def __init__(self, value): 

1223 self._required = True 

1224 self._nullable = True 

1225 self._datatype = timestamps.Timestamp 

1226 self._coerce_type = True # TODO: switch to a function that coerces. Or just use getters and setters. 

1227 self._comment = "" 

1228 self._value = pd.to_datetime(value) 

1229 

1230 @property 

1231 def value(self): 

1232 return self._value 

1233 

1234 @value.setter 

1235 def value(self, val): 

1236 self._value = pd.to_datetime(val) 

1237 

1238 

1239class DataRow: 

1240 # static class - just a container for some related methods around single rows of expense data. 

1241 # The columns we expect to see in the data 

1242 fields = { 

1243 "Date": { 

1244 "required": True, 

1245 "nullable": False, 

1246 "type": timestamps.Timestamp, 

1247 "force_type": False, 

1248 "comment": "" 

1249 }, 

1250 "Category": { 

1251 "required": True, 

1252 "nullable": False, 

1253 "type": str, 

1254 "force_type": False, 

1255 "comment": "" 

1256 }, 

1257 "Description": { 

1258 "required": False, 

1259 "nullable": True, 

1260 "type": str, 

1261 "force_type": False, 

1262 "comment": "" 

1263 }, 

1264 "Tags": { 

1265 "required": False, 

1266 "nullable": True, 

1267 "type": str, 

1268 "force_type": False, 

1269 "comment": "" 

1270 }, 

1271 "Comments": { 

1272 "required": False, 

1273 "nullable": True, 

1274 "type": str, 

1275 "force_type": False, 

1276 "comment": "" 

1277 }, 

1278 "Source": { 

1279 "required": False, 

1280 "nullable": True, 

1281 "type": str, 

1282 "force_type": False, 

1283 "comment": "" 

1284 }, 

1285 "Target": { 

1286 "required": False, 

1287 "nullable": True, 

1288 "type": str, 

1289 "force_type": False, 

1290 "comment": "" 

1291 }, 

1292 "Type": { 

1293 "required": False, 

1294 "nullable": True, 

1295 "type": str, 

1296 "allowed_values": ["computed", "tag", ""], 

1297 "force_type": False, 

1298 "comment": "" 

1299 }, 

1300 "Distribution": { 

1301 "required": False, 

1302 "nullable": True, 

1303 "type": int, 

1304 "force_type": True, 

1305 "comment": "Value in whole months to distribute the row amount over" 

1306 }, 

1307 "Amount": { 

1308 "required": True, 

1309 "nullable": False, 

1310 "type": float, 

1311 "force_type": True, 

1312 "comment": "" 

1313 }, 

1314 "Sales Tax": { 

1315 "required": False, 

1316 "nullable": True, 

1317 "type": float, 

1318 "force_type": True, 

1319 "comment": "" 

1320 }, 

1321 "Tips": { 

1322 "required": False, 

1323 "nullable": True, 

1324 "type": float, 

1325 "force_type": True, 

1326 "comment": "" 

1327 } 

1328 } 

1329 

1330 @staticmethod 

1331 def validate(drow, header_only=False, include_classifications=False): 

1332 # Validate that data rows are correct 

1333 this_fields = DataRow.fields 

1334 if include_classifications: 

1335 # Add classification to the fields 

1336 this_fields["Classification"] = { 

1337 "required": True, 

1338 "nullable": False, 

1339 "type": str, 

1340 "force_type": False, 

1341 "comment": "" 

1342 } 

1343 if len(drow) != len(DataRow.fields): 

1344 raise Exception(f"Data rows should contain {len(DataRow.fields)} elements") 

1345 if header_only: 

1346 if drow != list(DataRow.fields.keys()): 

1347 return False, f"Data rows need to be in the form: {list(DataRow.fields.keys())}" 

1348 return True, None 

1349 vkeys = list(DataRow.fields.keys()) 

1350 counter = 0 

1351 while counter < len(drow): 

1352 this_validator = DataRow.fields[vkeys[counter]] 

1353 this_value = drow[counter] 

1354 counter += 1 

1355 if is_null(this_value): # Also check for length = 0? 

1356 if this_validator["nullable"]: 

1357 pass 

1358 else: 

1359 raise Exception(f"Non-nullable field {vkeys[counter - 1]} was nulled in {drow}") 

1360 else: 

1361 if type(this_value) is not this_validator["type"]: 

1362 if this_validator["force_type"]: 

1363 try: 

1364 this_value = this_validator["type"](this_value) 

1365 except ValueError: 

1366 raise Exception(f"Could not coerce \'{this_value}\' at idx {counter - 1} to \ 

1367 {this_validator['type']} for this row: {drow}") 

1368 if type(this_value) is not this_validator["type"]: 

1369 raise Exception(f"Invalid type at idx {counter - 1} for {this_value} in {drow}") 

1370 if this_validator.get("allowed_values") and this_value not in this_validator["allowed_values"]: 

1371 raise Exception(f"Non-allowed value of {this_value} in {drow}") 

1372 return drow 

1373 

1374 @staticmethod 

1375 def create( 

1376 date, 

1377 category_name, 

1378 source, target, 

1379 amount, 

1380 description="", 

1381 sales_tax=0, 

1382 tips=0, 

1383 comment="", 

1384 tags="", 

1385 row_type="", 

1386 distribution=0, 

1387 classification="Uncategorized"): 

1388 if is_null(amount): 

1389 amount = 0 

1390 else: 

1391 try: 

1392 amount = float(amount) 

1393 except ValueError: 

1394 amount = 0 

1395 if is_null(tips): 

1396 tips = 0 

1397 else: 

1398 try: 

1399 tips = float(tips) 

1400 except ValueError: 

1401 tips = 0 

1402 if is_null(distribution): 

1403 distribution = 0 

1404 else: 

1405 try: 

1406 distribution = int(distribution) 

1407 except ValueError: 

1408 distribution = 0 

1409 if is_null(sales_tax): 

1410 sales_tax = 0 

1411 else: 

1412 try: 

1413 sales_tax = float(sales_tax) 

1414 except ValueError: 

1415 sales_tax = 0 

1416 return DataRow.validate([ 

1417 date, 

1418 category_name, 

1419 description, 

1420 tags, 

1421 comment, 

1422 source, 

1423 target, 

1424 row_type, 

1425 distribution, 

1426 amount, 

1427 sales_tax, 

1428 tips, 

1429 classification], False, True) 

1430 

1431 @staticmethod 

1432 def tag_matches(row_tags, search_tags): 

1433 # Tag logic 

1434 this_exploded_tags = None 

1435 this_tag_matches = None 

1436 this_store_matches = False 

1437 if search_tags and not is_empty(search_tags) and not is_empty(row_tags): 

1438 this_exploded_tags = [i.strip() for i in row_tags.split(',')] # TODO: Do this case insensitively? 

1439 if this_exploded_tags: 

1440 return [i for i in set(search_tags).intersection(set(this_exploded_tags))] 

1441 return None 

1442 

1443# WIP: Move utils into separate classes 

1444 

1445 

1446class AuditUtils: 

1447 pass 

1448 

1449 

1450class DiagramUtils: 

1451 """ 

1452 Static class, holds utility functions used for all diagram types. 

1453 """ 

1454 pass 

1455 

1456 

1457class SankeyUtils(DiagramUtils): 

1458 """ 

1459 Static class, holds utility functions used for generating Sankey diagrams. 

1460 """ 

1461 @staticmethod 

1462 def update_title(transaction_obj): 

1463 # Mutate passed object or return a value? 

1464 # TODO: add flag information to title 

1465 transaction_obj.title = f"{transaction_obj._app_settings.base_title} ({transaction_obj.earliest_date.month}/\ 

1466 {transaction_obj.earliest_date.day}/{transaction_obj.earliest_date.year} - \ 

1467 {transaction_obj.latest_date.month}/{transaction_obj.latest_date.day}/{transaction_obj.latest_date.year})" 

1468 if transaction_obj._app_settings.distribute_amounts: 

1469 transaction_obj.title += "<br> Multi-month transactions are being distributed" 

1470 if transaction_obj._app_settings.exclude_tags: 

1471 transaction_obj.title += f"<br> Tags being excluded: \ 

1472 {', '.join(transaction_obj._app_settings.exclude_tags)}" 

1473 if transaction_obj._app_settings.tags: 

1474 transaction_obj.title += f"<br> Tags being used: {', '.join(transaction_obj._app_settings.tags)}" 

1475 if transaction_obj._app_settings.recurring: 

1476 transaction_obj.title += f"<br> Recurring transactions are being split out" 

1477 

1478 @staticmethod 

1479 def build_dag(transaction_obj: Transactions): 

1480 """ 

1481 Create directed acyclic graph of all transactions 

1482 """ 

1483 if transaction_obj._app_settings.recurring: 

1484 if transaction_obj._app_settings.verbose: 

1485 logger.info("Recurring transactions to be split out") # TODO: precludes tag:recurring handling. 

1486 # Add edge from Income to Recurring 

1487 transaction_obj._labels_obj._digraph.add_edge("Income", "Recurring") 

1488 

1489 

1490class LineUtils(DiagramUtils): 

1491 """ 

1492 Static class, holds utility functions used for generating line charts. 

1493 """ 

1494 pass 

1495 

1496# Define some helper functions ======================================================================================= 

1497 

1498 

1499def is_null(obj): 

1500 # Just using numpy.isnan() will throw errors for types that cannot be coerced to float64. 

1501 # Could also use a try...catch 

1502 # use as a general purpose null/none/NaN catch 

1503 if obj is None: 

1504 return True 

1505 if type(obj) is float64 and isnan(obj): 

1506 return True 

1507 if type(obj) is nattype.NaTType: 

1508 return True 

1509 obj_as_str = None 

1510 try: 

1511 obj_as_str = str(obj) 

1512 except Exception as e: 

1513 pass 

1514 if obj_as_str in ["None", "none", "NaN", "nan", "Null", "null"]: 

1515 return True 

1516 return False 

1517 

1518 

1519def is_empty(obj, nonzero=False): 

1520 # Check for null, nan, none, etc as well as empty string. Optionally check for zero values. 

1521 # Swallow errors casting to values 

1522 if is_null(obj): 

1523 return True 

1524 obj_as_str = None 

1525 try: 

1526 obj_as_str = str(obj) 

1527 except Exception as e: 

1528 pass 

1529 if obj_as_str == "": 

1530 return True 

1531 if nonzero: 

1532 obj_as_float = None # int() truncates values like 0.25 to 0 

1533 try: 

1534 obj_as_float = float(obj) 

1535 except Exception as e: 

1536 pass 

1537 if obj_as_float == 0: 

1538 return True 

1539 return False 

1540 

1541 

1542def df_date_filter(df, start_date, end_date): 

1543 # Filter a dataframe by date 

1544 if is_empty(start_date) and is_empty(end_date): 

1545 return df 

1546 if is_empty(start_date): 

1547 df = df[df["Date"] <= end_date] 

1548 elif is_empty(end_date): 

1549 df = df[df["Date"] >= start_date] 

1550 else: 

1551 df = df[(df["Date"] >= start_date) & (df["Date"] <= end_date)] 

1552 df.reset_index(drop=True) 

1553 if len(df) == 0: 

1554 # TODO: this will error if start_date or end_date are not dates 

1555 raise Exception(f"Supplied date range ({start_date.date()} - {end_date.date()}) does not contain any \ 

1556 transactions!") 

1557 earliest_date = df["Date"].sort_values().iloc[0] 

1558 latest_date = df["Date"].sort_values().iloc[len(df) - 1] 

1559 return df 

1560 

1561 

1562def save_report(report_data, basename): 

1563 dtnow = pd.Timestamp.today() 

1564 fname = f"{basename}-{dtnow}.txt" 

1565 if path.isfile(fname): 

1566 raise Exception(f"File named {fname} already exists!") 

1567 with open(fname, 'wt') as f: 

1568 f.write(report_data) 

1569 

1570 

1571def validate_date_string(input, allow_empty=False): 

1572 # Pandas will accept YYYY-MM-DD or MM/DD/YYYY 

1573 if is_empty(input, True) and allow_empty: 

1574 return True 

1575 match_obj = re.match(r"^([\d]{4})-([\d]{1,2})-([\d]{1,2})$", input) 

1576 if match_obj: 

1577 match_year = int(match_obj.groups()[0]) 

1578 match_month = int(match_obj.groups()[1]) 

1579 match_day = int(match_obj.groups()[2]) 

1580 else: 

1581 match_obj = re.match(r"^([\d]{1,2})/([\d]{1,2})/([\d]{4})$", input) 

1582 if match_obj: 

1583 match_year = int(match_obj.groups()[2]) 

1584 match_month = int(match_obj.groups()[0]) 

1585 match_day = int(match_obj.groups()[1]) 

1586 if match_obj: 

1587 if not (1900 < match_year < 2100): # Update if doing historical work! 

1588 logger.warn(f"Supplied year doesn\t look right: {match_year}") 

1589 return False 

1590 if not (1 <= match_month <= 12): 

1591 logger.warn(f"Invalid month value: {match_month}") 

1592 return False 

1593 if not (1 <= match_day <= 31): 

1594 logger.warn(f"Invalid day value: {match_day}") 

1595 return False 

1596 return True 

1597 return False 

1598 

1599 

1600def func_Convert_Gsheet_dates(g_timestamp, default_date): 

1601 # Note: currently unused 

1602 if g_timestamp: 

1603 try: 

1604 g_timestamp_as_int = int(g_timestamp) 

1605 # Likely means we got an unformatted timestamp from gsheets. 

1606 # See also: https://developers.google.com/sheets/api/guides/formats for information about 

1607 # Google sheets timestamp format 

1608 # See: http://www.cpearson.com/excel/datetime.htm for why Dec 30, 1899 

1609 return pd.to_datetime(g_timestamp_as_int * 86400 * 1000, unit='ms', origin="1899-12-30") 

1610 except ValueError: 

1611 # We'll assume this means we got a formatted date string 

1612 return pd.to_datetime(g_timestamp) 

1613 else: 

1614 if not default_date: 

1615 return pd.Timestamp.today() 

1616 return pd.to_datetime(default_date) # default_date 

1617 

1618 

1619def fetch_data(app_settings_obj): # source_spreadsheet, source_worksheet, csv_src_target, service_account_credentials): 

1620 # app_settings_obj.data_source,app_settings_obj.data_sheet,app_settings_obj.labels_source,app_settings_obj.g_creds 

1621 # WIP: Handle source data in mutiple sheets, eg. 1 sheet per year... 

1622 

1623 """ 

1624 self.data_source = args.source # A csv file or a google Sheets document containing transactions data. 

1625 (In the case of the latter, a sheet name must be provided as well) 

1626 self.data_sheet = args.sheet or "Transactions_*" # The name (or prefix plus wildcard) of the sheet containing 

1627 the transactions data (if data_source is a google Sheets document 

1628 self._labels_source = args.srcmap or "Sources-Targets" #A csv file or sheet name containing sources and targets 

1629 """ 

1630 def data_source_router( 

1631 filename: str, 

1632 file_kind: str, 

1633 gcreds, 

1634 sheetname: Union[str, None] = None, 

1635 gcreds_obj=None, 

1636 gsheets_obj=None): 

1637 """ 

1638 Look for data sources either locally or in Google Sheets. Handle wildcards expressions for multiple sheets. 

1639 Return a list of filenames. 

1640 :param filename: A filename or wildcard expression, or None (Note: only csv files will use the wildcard 

1641 expression for this value) 

1642 :param file_kind: "sources-targets" or "transactions" 

1643 :param gcreds: Google credentials object 

1644 :param sheetname: A sheet name or wildcard expression, or None (only applicable to Google Sheets) 

1645 :param gc: Google client object (optional) 

1646 

1647 Returns: { 

1648 "filename" -> list of strings, 

1649 "sheetname" -> list of strings, 

1650 "filetype" -> ["csv", "gsheet"], 

1651 "file_kind" -> ["sources-targets","transactions"], 

1652 "gcreds_obj" -> Google credentials object", 

1653 "gsheets_obj" -> Google sheets object 

1654 } 

1655 """ 

1656 file_kind = file_kind.lower() 

1657 if file_kind not in ["sources-targets", "transactions"]: 

1658 raise Exception(f"Invalid file kind: {file_kind}") 

1659 if filename is None: 

1660 filename = input(f"Enter location for {file_kind} data: ") 

1661 return data_source_router(filename, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj) 

1662 if filename.endswith(".csv"): 

1663 if path.isfile(filename): # Switch to pathlib? 

1664 return { 

1665 "filename": [filename], 

1666 "sheetname": [None], 

1667 "filetype": "csv", 

1668 "file_kind": file_kind, 

1669 "gcreds_obj": gcreds_obj, 

1670 "gsheets_obj": gsheets_obj} 

1671 logger.warn(f"File not found: {filename}") 

1672 return data_source_router(None, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj) 

1673 if filename.endswith("*"): 

1674 # Wildcards on filenames only supported for csvs, Gsheets would use sheetnames for wildcard. 

1675 fileparts = filename.split("/") 

1676 filepattern = f"{fileparts[-1]}.csv" 

1677 if len(fileparts[0]) == 0: 

1678 # This must be an absolute path 

1679 this_dir = "/" + "/".join(fileparts[1:-1]) 

1680 else: 

1681 # Possibly a relative path 

1682 if len(fileparts) > 1: 

1683 this_dir = "./" + "/".join(fileparts[:-1]) 

1684 else: 

1685 this_dir = "." 

1686 dir_obj = Path(this_dir) 

1687 if dir_obj.is_dir(): 

1688 files = list(dir_obj.glob(filepattern)) 

1689 if len(files) > 0: 

1690 return {"filename": [str(f) for f in files], 

1691 "sheetname": [None], 

1692 "filetype": "csv", 

1693 "file_kind": file_kind, 

1694 "gcreds_obj": gcreds_obj, 

1695 "gsheets_obj": gsheets_obj} 

1696 logger.warn("No files found at: {filename}") 

1697 return data_source_router(None, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj) 

1698 # If we've gotten this far, we must be looking for a Google Sheets file (late binding of the 

1699 # gc object to reduce calls to authorize) 

1700 if not gcreds_obj: 

1701 gcreds_obj = pygsheets.authorize(service_file=gcreds) # trying to avoid calling this multiple times 

1702 if filename not in gcreds_obj.spreadsheet_titles(): 

1703 logger.warn(f"Spreadsheet not found: {filename}") 

1704 return data_source_router(None, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj) 

1705 gsheets_obj = gcreds_obj.open(filename) # TODO: error handling. 

1706 if not sheetname: 

1707 sheetname = input(f"Enter worksheet title for spreadsheet {filename}: ") 

1708 return data_source_router(filename, file_kind, gcreds, sheetname, gcreds_obj, gsheets_obj) 

1709 gsheet_titles = [i.title for i in gsheets_obj.worksheets()] 

1710 if sheetname.endswith("*"): 

1711 # Wildcard handling for sheetnames 

1712 results = [] 

1713 for sheet in gsheet_titles: 

1714 if sheet.startswith(sheetname[:-1]): 

1715 results.append(sheet) 

1716 if len(results) > 0: 

1717 return { 

1718 "filename": [filename], 

1719 "sheetname": results, 

1720 "filetype": "gsheet", 

1721 "file_kind": file_kind, 

1722 "gcreds_obj": gcreds_obj, 

1723 "gsheets_obj": gsheets_obj} 

1724 logger.warn(f"No spreadsheets found matching pattern: \"{sheetname}\" in Google Sheets: \"{filename}\"") 

1725 return data_source_router(filename, file_kind, gcreds, None, gcreds_obj, gsheets_obj) 

1726 if sheetname in gsheet_titles: 

1727 return {"filename": [filename], 

1728 "sheetname": [sheetname], 

1729 "filetype": "gsheet", 

1730 "file_kind": file_kind, 

1731 "gcreds_obj": gcreds_obj, 

1732 "gsheets_obj": gsheets_obj} 

1733 logger.warn(f"Spreadsheet \"{sheetname}\" not found in Google Sheets: \"{filename}\"") 

1734 return data_source_router(filename, file_kind, gcreds, None, gcreds_obj, gsheets_obj) 

1735 

1736 try: 

1737 # Get sources-targets file location data 

1738 # See sample_data/expenses.csv and labels.csv for examples of data format. 

1739 # TODO: validation & error handling 

1740 # TODO: there should only ever be one sources-targets file, so data_source_router should handle that 

1741 src_target = None 

1742 if app_settings_obj.labels_source.endswith(".csv"): 

1743 src_target_file = data_source_router(app_settings_obj.labels_source, 

1744 "sources-targets", 

1745 app_settings_obj.g_creds, None, None, None) 

1746 else: 

1747 src_target_file = data_source_router(app_settings_obj.data_source, 

1748 "sources-targets", 

1749 app_settings_obj.g_creds, 

1750 app_settings_obj.labels_source, None, None) 

1751 if src_target_file["filetype"] == "csv": 

1752 # Sources-targets data is returned differently than transactions data, so we need to handle it differently. 

1753 with open(src_target_file["filename"][0], 'r') as f: 

1754 dr = DictReader(f) 

1755 src_target = list(dr) 

1756 else: 

1757 # sh = src_target_file["greds_obj"].open(src_target_file["filename"][0]) # TODO: error handling 

1758 # Fetch source-target info and colors. 

1759 src_target = src_target_file["gsheets_obj"].\ 

1760 worksheet_by_title(src_target_file["sheetname"][0]).get_all_records() 

1761 

1762 # Get transactions file(s) location data 

1763 df = None 

1764 transaction_data_file = data_source_router(app_settings_obj.data_source, 

1765 "transactions", 

1766 app_settings_obj.g_creds, 

1767 app_settings_obj.data_sheet, None, None) 

1768 if transaction_data_file["filetype"] == "csv": 

1769 # open one or multiple csv files and return a pandas dataframe 

1770 df = read_csv_as_df(transaction_data_file["filename"]) 

1771 else: 

1772 # Open one or multiple gsheet worksheets and return a pandas dataframe 

1773 df = read_gsheet_as_df(transaction_data_file["sheetname"], transaction_data_file["gsheets_obj"]) 

1774 

1775 except Exception as e: 

1776 logger.error(f"Unable to open data source: {app_settings_obj.data_source}. \ 

1777 Please check your names and try again. Error was {e}") 

1778 raise SystemExit 

1779 

1780 # Clean up value formatting 

1781 df.reset_index(inplace=True, drop=True) 

1782 df = df.transform(normalize_amounts, axis=1) 

1783 

1784 is_valid = DataRow.validate(df.columns.to_list(), True) 

1785 if not is_valid[0]: 

1786 raise Exception(f"Source data is not in correct format! Message was: {is_valid[1]}") 

1787 

1788 return src_target, df 

1789 

1790 

1791def read_csv_as_df(csv_file): 

1792 if type(csv_file) is list: 

1793 main_csv = csv_file[0] 

1794 addl_csvs = csv_file[1:] 

1795 else: 

1796 main_csv = csv_file 

1797 addl_csvs = [] 

1798 if not main_csv.endswith('.csv') or not path.isfile(main_csv): 

1799 raise Exception(f"Supplied CSV file ({main_csv}) was not found or is the wrong format.") 

1800 main_df = pd.DataFrame(pd.read_csv(main_csv)) 

1801 for i in addl_csvs: 

1802 if not i.endswith('.csv') or not path.isfile(i): 

1803 raise Exception(f"Supplied CSV file ({i}) was not found or is the wrong format.") 

1804 # df = df.append(pd.DataFrame(pd.read_csv(i)), ignore_index=True) 

1805 add_df = pd.DataFrame(pd.read_csv(i)) 

1806 main_df = pd.concat([main_df, add_df], axis=0) 

1807 return main_df 

1808 

1809 

1810def read_gsheet_as_df(gsheet_file, gsheet_obj): 

1811 # .get_as_df(value_render=pygsheets.ValueRenderOption.UNFORMATTED_VALUE) 

1812 # # <-- Using unformatted value rounded decimal amounts - not sure why 

1813 if type(gsheet_file) is list: 

1814 main_sheet = gsheet_file[0] 

1815 addl_sheets = gsheet_file[1:] 

1816 else: 

1817 main_sheet = gsheet_file 

1818 addl_sheets = [] 

1819 main_df = gsheet_obj.worksheet_by_title(main_sheet).get_as_df() 

1820 for i in addl_sheets: 

1821 add_df = gsheet_obj.worksheet_by_title(i).get_as_df() 

1822 main_df = pd.concat([main_df, add_df], axis=0) 

1823 return main_df 

1824 

1825 

1826def normalize_amounts(df_row): 

1827 for atype in ["Amount", "Sales Tax", "Tips"]: 

1828 val = df_row[atype] 

1829 if is_empty(val): 

1830 continue 

1831 try: 

1832 val = float(val) 

1833 except ValueError: 

1834 if '$' in val or ',' in val: 

1835 val = float(val.replace('$', '').replace(',', '')) 

1836 df_row[atype] = val 

1837 return df_row