Source code for pcapgraph.draw_graph

# -*- coding: utf-8 -*-
# Copyright 2018 Ross Jacobs All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Draw graph will draw a text or image graph."""

import datetime
import os
import subprocess as sp

import matplotlib.pyplot as plt
import numpy as np

import pcapgraph.manipulate_framehex as mfh
import pcapgraph.manipulate_framebytes as mfb


[docs]def draw_graph(pcap_packets, input_files, output_fmts, graph_opts): """Draw a graph using matplotlib and numpy. Args: pcap_packets (dict): All packets, where key is pcap filename/operation. input_files (list): List of input files that shouldn't be deleted. output_fmts (list): The save file type. Supported formats are dependent on the capabilites of the system: [png, pdf, ps, eps, and svg]. See https://matplotlib.org/api/pyplot_api.html for more information. graph_opts (dict): Graph options. """ # Reformat main pcap dictionary to meet expectations. Make consistent # at some point. for file in input_files: pcap_packets[file] = { k: v for k, v in zip(pcap_packets[file]['frames'], pcap_packets[file] ['timestamps']) } # So that if no save format is specified, print to screen and stdout if not output_fmts: print('No output formats selected. Showing graph.') output_fmts = ['show'] pcap_filenames = list(pcap_packets) open_in_wireshark = False if 'wireshark' in output_fmts: output_fmts.remove('wireshark') open_in_wireshark = True new_files = sorted(set(pcap_filenames) - set(input_files)) for save_format in output_fmts: output_file(save_format, pcap_packets, new_files, graph_opts) if open_in_wireshark: for file in new_files: print( "Opening", file, "in wireshark (you must close this " + "wireshark window to look at the next).") sp.Popen(['wireshark', file])
[docs]def output_file(save_format, pcap_packets, new_files, graph_opts): """Save the specified file with the specified format. Args: save_format (str): Extension of file to be saved. pcap_packets (dict): All packets, where key is pcap filename/operation. new_files (list): Files generated by PcapGraph. graph_opts (dict): All user CLI options exclude_empty (bool): Whether to exclude empty pcaps from graph. anonymize_names (bool): Whether to change filenames to random vals. show_packets (bool): Whether to use a horizontal bar to show a pcap or vertical lines to show packets. """ pcap_filenames = list(pcap_packets) if save_format == 'txt': output_text = generate_text(pcap_packets) print(output_text) with open('pcap_graph.txt', 'w') as file: file.write(output_text) file.close() print("Text file successfully created!") elif 'pcap' in save_format or 'pcapng' in save_format: # Output files will only have packets from the first file # so use that link type for file in new_files: # todo refactor this data structure # output_link_type = pcap_packets[file]['link_type'] output_link_type = 1 if pcap_packets[file]: print('Saving ' + file + ' as ' + save_format + '...') mfb.write_pcap( file, list(pcap_packets[file]), list(pcap_packets[file].values()), output_link_type) else: print('=> Excluding empty ' + file) else: graph_startstop_dict = get_graph_vars(pcap_packets, new_files) empty_files = [] if '--exclude-empty' in graph_opts: for pcap in pcap_packets: pcap_name = os.path.basename(os.path.splitext(pcap)[0]) if not pcap_packets[pcap] \ or not graph_startstop_dict[pcap_name]['packet_count']: empty_files += [pcap_name] generate_graph(pcap_packets, graph_startstop_dict, empty_files, '--anonymize' in graph_opts, '--show-packets' in graph_opts) if save_format != 'show': export_graph(pcap_filenames, save_format) else: # Print text version because it's possible. print(generate_text(graph_startstop_dict)) plt.show()
[docs]def get_graph_vars(pcap_packets, new_files): """Get Graph start-stop dict Args: pcap_packets (dict): Frames and timestamps by pcap new_files (list): User inputted filenames """ input_files = sorted(set(pcap_packets) - set(new_files)) graph_startstop_dict = mfh.get_pcap_info(input_files) for pcap in new_files: float_timestamps = [] if pcap_packets[pcap]: # Convert timestamp timestamp_list = list(pcap_packets[pcap].values()) for timestamp in timestamp_list: float_timestamps.append(mfb.get_ts_as_float(timestamp)) pcap_name = os.path.splitext(pcap)[0] if float_timestamps: graph_startstop_dict[pcap_name] = { 'packet_count': len(pcap_packets[pcap]), 'pcap_start': min(float_timestamps), 'pcap_end': max(float_timestamps) } return graph_startstop_dict
[docs]def generate_graph(pcap_packets, pcap_vars, empty_files, anonymize_names, show_packets): """Generate the matplotlib graph. Args: pcap_packets (dict): Dict returned by get_pcap_frame_dict() {<pcap>: {'FRAME': 'TIMESTAMP', ...}, ...} pcap_vars (dict): Contains all data required for the graph. {<pcap>: {'pcap_start': <timestamp>, 'pcap_end': <timestamp>}, ...} empty_files (list): List of filenames of empty files. anonymize_names (bool): Whether to change filenames to random values. show_packets (bool): Whether to show each packet or the entire pcap. """ # first and last are the first and last timestamp of all pcaps. pcap_names = list(pcap_vars) if anonymize_names: pcap_names = mfh.anonymous_pcap_names(len(pcap_names)) # Each line has text that is 12 point high; 72 point = 1 inch, so for each # additional pcap, add 1/6 inch. Default graph is 3 in high, so y tick text # should start overlapping at 18 lines. # If number of pcaps is greater than 18, remove the names # adjusted_height = (len(pcap_names) - 18) * 2 # fig.set_figheight(5.5 + adjusted_height) if len(pcap_names) > 18: pcap_names = len(pcap_names) * [''] start_times = np.array( [pcap_vars[pcap]['pcap_start'] for pcap in pcap_vars]) end_times = np.array([pcap_vars[pcap]['pcap_end'] for pcap in pcap_vars]) pcap_names += empty_files x_min, x_max = get_x_minmax(start_times, end_times) fig, axes = plt.subplots() if show_packets: print("Loading packets as lines...") plt.xlim(x_min, x_max) set_horiz_barlines(pcap_packets) print("Done loading packets!") else: # Default is to show horizontal bars for bar graph barlist = plt.barh( range(len(start_times)), end_times - start_times, left=start_times) set_horiz_bars(barlist) set_graph_vars(x_min, x_max, pcap_names, fig, axes)
[docs]def get_x_minmax(start_times, end_times): """Determine the horizontal (x) min and max values. This function adds 1% to either side for padding. Args: start_times (np.array): First packet unix timestamps of all pcaps. end_times (np.array): Last packet unix timestamps of all pcaps. Returns: (tuple): min_x, max_x to be used for graph """ first_time = min(start_times) last_time = max(end_times) # Force padding on left and right sides graph_one_percent_width = (last_time - first_time) / 100 first = first_time - graph_one_percent_width last = last_time + graph_one_percent_width return first, last
[docs]def set_horiz_bars(barlist): """Set the horizontal bar colors. Color theme is Metro UI, with an emphasis on darker colors. If there are more horiz bars than in the color array, loop and continue to set colors. Args: barlist (list): List of the horizontal bars. """ colors = [ '#2d89ef', '#603cba', '#2b5797', '#008B8B', '#3145b4', '#36648B', '#38b0de', '#4d4dff', '#3299cc', '#7f00ff', '#03b4c8', '#5959ab', ] color_count = len(colors) for i, hbar in enumerate(barlist): color = colors[i % color_count] hbar.set_color(color)
[docs]def set_horiz_barlines(pcap_packets): """Set horizontal bar vertical lines instead of a fully-colored bar.""" colors = [ '#2d89ef', '#603cba', '#2b5797', '#008B8B', '#3145b4', '#36648B', '#38b0de', '#4d4dff', '#3299cc', '#7f00ff', '#03b4c8', '#5959ab', ] hbar_height = 1 / len(pcap_packets) for index, pcap in enumerate(pcap_packets): # Create a line in a bar graph with 10% pad on each side ymin = index * hbar_height + .1 * hbar_height ymax = ymin + hbar_height - .2 * hbar_height for packet in pcap_packets[pcap]: timestamp = float(pcap_packets[pcap][packet]) plt.axvline( x=timestamp, ymin=ymin, ymax=ymax, color=colors[index], linewidth='1')
[docs]def set_xticks(first, last): """Generate the x ticks and return a list of them. Args: first (float): Earliest timestamp of pcaps. last (float): Latest timestamp of pcaps. Returns: (tuple): x_ticks (list(float)): List of unix epoch time values as xticks. x_label (string): Text to be used to label X-axis. """ # 10 x ticks chosen for aesthetic reasons. xticks_qty = 10 x_ticks = xticks_qty * [''] offset = first step = (last - first) / (xticks_qty - 1) # If first and last timestamps are in different years, add year to xtick # xlabel will be 'Time' if different years, and 'Time (YYYY)' if same year. strftime_string = '%b-%d %H:%M:%S' first_time_year = datetime.datetime.fromtimestamp(first).strftime('%Y') last_time_year = datetime.datetime.fromtimestamp(last).strftime('%Y') if first_time_year != last_time_year: strftime_string = '%Y-' + '%b-%d %H:%M:%S' xlabel = 'Time' else: xlabel = 'Time (' + first_time_year + ')' for i in range(xticks_qty): x_ticks[i] = datetime.datetime.fromtimestamp(offset).strftime( strftime_string) offset += step return x_ticks, xlabel
[docs]def set_graph_vars(x_min, x_max, pcap_names, fig, axes): """Set matplotlib's plt object with appropriate graph parameters.""" # xticks will look like 'Dec-31 23:59:59' x_ticks, xlabel = set_xticks(x_min, x_max) # Print all x labels that aren't at the lower corners plt.xticks(rotation=45) axes.set_xticks(np.round(np.linspace(x_min, x_max, 10))) axes.tick_params(axis='y', labelsize=12) # Set ytick fontsize to 10 axes.set_xticklabels(x_ticks) for tick in axes.xaxis.get_majorticklabels(): tick.set_horizontalalignment("right") # Pcap names as y ticks. Position them halfway up the bar. plt.yticks(np.arange(len(pcap_names), step=1), pcap_names) axes.set_ylim(-0.5, len(pcap_names) - 0.5) # xlabel will be 'Time' if different years, and 'Time (YYYY)' if same year. axes.set_xlabel(xlabel, fontsize=16) axes.set_ylabel('Pcap Name', fontsize=16) fig.suptitle('Pcap Time Analysis', fontsize=20) # Use 0.95 for top because tight_layout does not consider suptitle plt.tight_layout(rect=[0, 0, 1, 0.95])
[docs]def export_graph(pcap_names, save_fmt): """Exports the graph to the screen or to a file. Args: pcap_names (list): List of pcap_names save_fmt (str): File extension of output file """ this_folder = os.getcwd() pcap_name = pcap_names[-1].split('.pcap')[0] last_operation_file = pcap_name.split(' ')[0] + '.' plt.savefig( 'pcap_graph-' + last_operation_file + save_fmt, format=save_fmt, transparent=True) print(save_fmt, "file successfully created in ", this_folder, "!")
[docs]def generate_text(pcap_times): """Make useful text given pcap times. Args: pcap_times (dict): Packet capture names and start/stop timestamps. Returns: (str): Full textstring of text to written to file/stdout """ result_string = '\nPCAP NAME YEAR DATE 0 DATE $' \ ' TIME 0 TIME $ UTC 0' + 14*' ' + 'UTC $' for pcap in sorted(pcap_times.keys()): pcap_year = datetime.datetime.fromtimestamp( pcap_times[pcap]['pcap_start']).strftime('%Y') pcap_pretty_startdate = datetime.datetime.fromtimestamp( pcap_times[pcap]['pcap_start']).strftime('%b-%d') pcap_pretty_enddate = datetime.datetime.fromtimestamp( pcap_times[pcap]['pcap_end']).strftime('%b-%d') pcap_pretty_starttime = datetime.datetime.fromtimestamp( pcap_times[pcap]['pcap_start']).strftime('%H:%M:%S') pcap_pretty_endtime = datetime.datetime.fromtimestamp( pcap_times[pcap]['pcap_end']).strftime('%H:%M:%S') pcap_name_string = pcap[:17] # Truncate if too long # Formatter creates a bunch of columns aligned left with num spacing. format_string = "\n{: <19} {: <5} {: <7} " \ "{: <10} {: <9} {: <12} {: <18} {: <18}" result_string += format_string.format( pcap_name_string, pcap_year, pcap_pretty_startdate, pcap_pretty_enddate, pcap_pretty_starttime, pcap_pretty_endtime, pcap_times[pcap]['pcap_start'], pcap_times[pcap]['pcap_end'], ) return result_string